commit fe29ff0c5767edb8e0b9348423a709cbdf04255d Author: Bishwajeet Kumar Rajak Date: Tue May 5 23:18:54 2026 +0530 product diff --git a/__pycache__/config.cpython-314.pyc b/__pycache__/config.cpython-314.pyc new file mode 100644 index 0000000..1525439 Binary files /dev/null and b/__pycache__/config.cpython-314.pyc differ diff --git a/__pycache__/db.cpython-314.pyc b/__pycache__/db.cpython-314.pyc new file mode 100644 index 0000000..3c2918c Binary files /dev/null and b/__pycache__/db.cpython-314.pyc differ diff --git a/__pycache__/logging_config.cpython-314.pyc b/__pycache__/logging_config.cpython-314.pyc new file mode 100644 index 0000000..c946e28 Binary files /dev/null and b/__pycache__/logging_config.cpython-314.pyc differ diff --git a/api_client.py b/api_client.py new file mode 100644 index 0000000..fc6f3d0 --- /dev/null +++ b/api_client.py @@ -0,0 +1,18 @@ +import requests +from logging_config import get_logger + +logger = get_logger(__name__) + +class FetchQueueClient: + def __init__(self, config): + self.url = config.FETCH_QUEUE_URL + + def fetch(self, payload): + response = requests.post( + self.url, + json=payload, + verify=False, + timeout=20 + ) + response.raise_for_status() + return response.json() diff --git a/config.py b/config.py new file mode 100644 index 0000000..2b3853b --- /dev/null +++ b/config.py @@ -0,0 +1,22 @@ +import os + +class Config: + BANK_CODE_MAPPING = { + "9": "0015", "12": "0014", "13": "0016", "14": "0009", + "15": "0005", "16": "0001", "7": "0013", "6": "0002", + "5": "0007", "17": "0020", "1": "0012", "10": "0017", + "11": "0018", "2": "0003", "4": "0004", + "8": "0006", "3": "0021" + } + + FETCH_QUEUE_URL = "https://43.225.3.224:443/IPKS_Queue_Generation/fetchAllQueue" + + def __init__(self): + self.db_user = os.getenv("DB_USER", "pacs_db") + self.db_password = os.getenv("DB_PASSWORD", "pacs_db") + self.db_dsn = os.getenv("DB_DSN", "testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB") + self.poll_interval_minutes = int(os.getenv("POLL_INTERVAL_MINUTES", "30")) + + def validate(self): + if not self.db_user or not self.db_password or not self.db_dsn: + raise RuntimeError("Database configuration missing") \ No newline at end of file diff --git a/db.py b/db.py new file mode 100644 index 0000000..3d3fa0d --- /dev/null +++ b/db.py @@ -0,0 +1,69 @@ +from logging_config import get_logger + +logger = get_logger(__name__) + +class OracleDB: + def __init__(self, connector): + self.connector = connector + + def fetch_all(self, sql): + conn = cur = None + try: + conn = self.connector.get_connection() + cur = conn.cursor() + cur.execute(sql) + cols = [c[0] for c in cur.description] + return [dict(zip(cols, r)) for r in cur.fetchall()] + except Exception: + logger.error("Error executing fetch_all", exc_info=True) + raise + finally: + if cur: + cur.close() + if conn: + conn.close() + + def execute(self, sql, params): + conn = cur = None + try: + conn = self.connector.get_connection() + cur = conn.cursor() + cur.execute(sql, params) + conn.commit() + except Exception: + if conn: + conn.rollback() + logger.error("Error executing SQL", exc_info=True) + raise + finally: + if cur: + cur.close() + if conn: + conn.close() + + def call_procedure(self, proc_name): + conn = cur = None + try: + conn = self.connector.get_connection() + cur = conn.cursor() + logger.info("Calling stored procedure: %s", proc_name) + try: + cur.callproc(proc_name) + except Exception: + cur.execute(f"BEGIN {proc_name}; END;") + conn.commit() + logger.info("Stored procedure %s executed successfully", proc_name) + return True + except Exception as e: + logger.error( + "Stored procedure %s failed: %s", + proc_name, + str(e), + exc_info=True + ) + return False + finally: + if cur: + cur.close() + if conn: + conn.close() diff --git a/logging_config.py b/logging_config.py new file mode 100644 index 0000000..9ac480f --- /dev/null +++ b/logging_config.py @@ -0,0 +1,10 @@ +import logging + +def setup_logging(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s" + ) + +def get_logger(name): + return logging.getLogger(name) \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..3767cbe --- /dev/null +++ b/main.py @@ -0,0 +1,26 @@ +from logging_config import setup_logging +from config import Config +from oracle_connector import OracleConnector +from db import OracleDB +from repository import Repository +from api_client import FetchQueueClient +from processor import Processor +from scheduler import Scheduler + +def main(): + setup_logging() + + config = Config() + config.validate() + + connector = OracleConnector(config) + db = OracleDB(connector) + repo = Repository(db) + api = FetchQueueClient(config) + + processor = Processor(config, repo, api) + scheduler = Scheduler(processor, config) + scheduler.run() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/oracle_connector.py b/oracle_connector.py new file mode 100644 index 0000000..c2adb2f --- /dev/null +++ b/oracle_connector.py @@ -0,0 +1,37 @@ +import oracledb +from logging_config import get_logger + +logger = get_logger(__name__) + +class OracleConnector: + def __init__(self, config): + self.config = config + self.pool = None + + def initialize_pool(self): + try: + self.pool = oracledb.create_pool( + user=self.config.db_user, + password=self.config.db_password, + dsn=self.config.db_dsn, + min=1, + max=5, + increment=1 + ) + logger.info("Oracle connection pool initialized") + except Exception: + logger.error("Failed to initialize Oracle pool", exc_info=True) + raise + + def get_connection(self): + if not self.pool: + self.initialize_pool() + return self.pool.acquire() + + def close(self): + if self.pool: + try: + self.pool.close() + logger.info("Oracle pool closed") + except Exception: + logger.error("Error closing pool", exc_info=True) diff --git a/processor.py b/processor.py new file mode 100644 index 0000000..7394cdd --- /dev/null +++ b/processor.py @@ -0,0 +1,61 @@ +from datetime import datetime +from logging_config import get_logger + +logger = get_logger(__name__) + +class Processor: + def __init__(self, config, repo, api): + self.config = config + self.repo = repo + self.api = api + + def process(self): + rows = self.repo.get_pending_rrns_today() + logger.info("Pending transactions count: %s", len(rows)) + + insert_count = 0 + + for r in rows: + bank_code = self.config.BANK_CODE_MAPPING.get(r["DCCB_CODE"]) + if not bank_code: + continue + + payload = { + "bankCode": bank_code, + "rrn": r["RRN_TWO"], + "branchCode": r["BR_CODE"], + "itemType": 0, + "queueType": "02", + "reterieveTop": "20" + } + + response = self.api.fetch(payload) + items = response.get("response", {}).get("VwQueueItems", []) + + if not items or items[0].get("status") != "PROCESSED": + continue + + tran_date_str = r["TXN_DATE"].strftime("%d%m%Y") + item = items[0] + + log_data = { + "bankcode": bank_code, + "jrnl_id": item["journalId"], + "ref_no": item.get("remarks"), + "tran_date": tran_date_str, + "txn_amt": r["TXN_AMT"], + "recv_ac": r["DEST_AC_NO"], + "send_ac": r["SRC_AC_NO"], + "ifsc": r["IFSC_CODE"], + "sender_ifsc": item.get("sender_ifsc"), + "sender_name": r["REMITTER_NAME"], + "receiver_name": r["BENEFICIARY_NAME"], + "beneficiary_address": r["BENEFICIARY_ADD"], + "sender_info": item["sendeReceiverInfo"] + } + + self.repo.insert_outward_log(log_data) + self.repo.mark_processed(r["TXN_NO"]) + insert_count += 1 + + return insert_count diff --git a/repository.py b/repository.py new file mode 100644 index 0000000..8c63253 --- /dev/null +++ b/repository.py @@ -0,0 +1,60 @@ +from logging_config import get_logger + +logger = get_logger(__name__) + +class Repository: + def __init__(self, db): + self.db = db + + def get_pending_rrns_today(self): + sql = """ + SELECT r.TXN_NO, + r.RRN_TWO, + q.TXN_DATE, + q.DCCB_CODE, + q.BR_CODE, + q.SRC_AC_NO, + q.DEST_AC_NO, + q.IFSC_CODE, + q.TXN_AMT, + q.BENEFICIARY_NAME, + q.BENEFICIARY_ADD, + q.REMITTER_NAME + FROM NEFT_RTGS_TXN_RRN r + JOIN NEFT_RTGS_TXN_QUEUE q ON q.TXN_NO = r.TXN_NO + WHERE r.MARK_TYPE = 'N' + """ + return self.db.fetch_all(sql) + + def insert_outward_log(self, d): + sql = """ + INSERT INTO outward_neft_api_log ( + BANKCODE, TXNCODE, TXNIND, + JRNL_ID, REF_NO, TRAN_DATE, + TXN_AMT, RECVR_ACCT_NO, SENDER_ACCT_NO, + RECIEVER_IFSC, SENDER_IFSC, + SENDER_NAME, RECIEVER_NAME, + BENEFICIARY_ADDRESS, + SENDER_TO_RECIVER_INFO, + MSG_TYPE + ) VALUES ( + :bankcode, 'OUTWARD NEFT', 'DR', + :jrnl_id, :ref_no, :tran_date, + :txn_amt, :recv_ac, :send_ac, + :ifsc, :sender_ifsc, + :sender_name, :receiver_name, + :beneficiary_address, + :sender_info, + 'N06' + ) + """ + self.db.execute(sql, d) + + def mark_processed(self, txn_no): + self.db.execute( + "UPDATE NEFT_RTGS_TXN_RRN SET MARK_TYPE='Y' WHERE TXN_NO=:txn", + {"txn": txn_no} + ) + + def call_neft_api_txn_post(self): + return self.db.call_procedure("neft_api_txn_post") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0e3667f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +oracledb==2.0.0 +requests \ No newline at end of file diff --git a/scheduler.py b/scheduler.py new file mode 100644 index 0000000..b9f97c5 --- /dev/null +++ b/scheduler.py @@ -0,0 +1,40 @@ +import time +from datetime import datetime, date +from logging_config import get_logger + +logger = get_logger(__name__) + +class Scheduler: + def __init__(self, processor, config): + self.processor = processor + self.config = config + self.last_cycle_date = None + self.cycle_no = 0 + + def _update_cycle(self): + today = date.today() + if self.last_cycle_date != today: + self.cycle_no = 1 + self.last_cycle_date = today + else: + self.cycle_no += 1 + + def run(self): + logger.info("NEFT OUTWARD Scheduler started") + + while True: + try: + self._update_cycle() + insert_count = self.processor.process() + + if insert_count > 0: + logger.info("Calling neft_api_txn_post procedure...") + if self.processor.repo.call_neft_api_txn_post(): + logger.info("Transaction post-processing completed successfully") + else: + logger.error("Transaction post-processing failed") + + except Exception: + logger.exception("Scheduler cycle failed") + + time.sleep(self.config.poll_interval_minutes * 60) \ No newline at end of file