commit 58d8329dbd192ca1d740b87fed73cff5cf7910f7 Author: Bishwajeet Kumar Rajak Date: Wed Apr 22 01:34:12 2026 +0530 neft_outward diff --git a/__pycache__/config.cpython-314.pyc b/__pycache__/config.cpython-314.pyc new file mode 100644 index 0000000..1525439 Binary files /dev/null and b/__pycache__/config.cpython-314.pyc differ diff --git a/__pycache__/db.cpython-314.pyc b/__pycache__/db.cpython-314.pyc new file mode 100644 index 0000000..3c2918c Binary files /dev/null and b/__pycache__/db.cpython-314.pyc differ diff --git a/__pycache__/logging_config.cpython-314.pyc b/__pycache__/logging_config.cpython-314.pyc new file mode 100644 index 0000000..c946e28 Binary files /dev/null and b/__pycache__/logging_config.cpython-314.pyc differ diff --git a/api_client.py b/api_client.py new file mode 100644 index 0000000..e52795b --- /dev/null +++ b/api_client.py @@ -0,0 +1,18 @@ +import requests +from logging_config import get_logger + +logger = get_logger(__name__) + +class FetchQueueClient: + def __init__(self, config): + self.url = config.FETCH_QUEUE_URL + + def fetch(self, payload): + response = requests.post( + self.url, + json=payload, + verify=False, + timeout=20 + ) + response.raise_for_status() + return response.json() \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..2b3853b --- /dev/null +++ b/config.py @@ -0,0 +1,22 @@ +import os + +class Config: + BANK_CODE_MAPPING = { + "9": "0015", "12": "0014", "13": "0016", "14": "0009", + "15": "0005", "16": "0001", "7": "0013", "6": "0002", + "5": "0007", "17": "0020", "1": "0012", "10": "0017", + "11": "0018", "2": "0003", "4": "0004", + "8": "0006", "3": "0021" + } + + FETCH_QUEUE_URL = "https://43.225.3.224:443/IPKS_Queue_Generation/fetchAllQueue" + + def __init__(self): + self.db_user = os.getenv("DB_USER", "pacs_db") + self.db_password = os.getenv("DB_PASSWORD", "pacs_db") + self.db_dsn = os.getenv("DB_DSN", "testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB") + self.poll_interval_minutes = int(os.getenv("POLL_INTERVAL_MINUTES", "30")) + + def validate(self): + if not self.db_user or not self.db_password or not self.db_dsn: + raise RuntimeError("Database configuration missing") \ No newline at end of file diff --git a/db.py b/db.py new file mode 100644 index 0000000..4e01805 --- /dev/null +++ b/db.py @@ -0,0 +1,38 @@ +import oracledb +from logging_config import get_logger + +logger = get_logger(__name__) + +# Initialize thick mode (uses Oracle Instant Client) +oracledb.init_oracle_client() + +class OracleDB: + def __init__(self, config): + self.config = config + self.conn = None + + def connect(self): + try: + self.conn = oracledb.connect( + user=self.config.db_user, + password=self.config.db_password, + dsn=self.config.db_dsn + ) + logger.info("Connected to Oracle DB using oracledb") + except Exception as e: + logger.error("Oracle connection failed", exc_info=True) + raise + + def fetch_all(self, sql): + cur = self.conn.cursor() + cur.execute(sql) + cols = [c[0] for c in cur.description] + rows = [dict(zip(cols, r)) for r in cur.fetchall()] + cur.close() + return rows + + def execute(self, sql, params): + cur = self.conn.cursor() + cur.execute(sql, params) + self.conn.commit() + cur.close() \ No newline at end of file diff --git a/logging_config.py b/logging_config.py new file mode 100644 index 0000000..9ac480f --- /dev/null +++ b/logging_config.py @@ -0,0 +1,10 @@ +import logging + +def setup_logging(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s - %(message)s" + ) + +def get_logger(name): + return logging.getLogger(name) \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..fba88b1 --- /dev/null +++ b/main.py @@ -0,0 +1,27 @@ +from logging_config import setup_logging +from config import Config +from db import OracleDB +from repository import Repository +from api_client import FetchQueueClient +from processor import Processor +from scheduler import Scheduler + +def main(): + setup_logging() + + config = Config() + config.validate() + + db = OracleDB(config) + db.connect() + + repo = Repository(db) + api = FetchQueueClient(config) + + processor = Processor(config, repo, api) + scheduler = Scheduler(processor, config) + + scheduler.run() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/processor.py b/processor.py new file mode 100644 index 0000000..87b894e --- /dev/null +++ b/processor.py @@ -0,0 +1,62 @@ +from datetime import datetime +from logging_config import get_logger + +logger = get_logger(__name__) + +class Processor: + def __init__(self, config, repo, api): + self.config = config + self.repo = repo + self.api = api + + def process(self): + rows = self.repo.get_pending_rrns_today() + logger.info("Pending transactions: %s", len(rows)) + + for r in rows: + bank_code = self.config.BANK_CODE_MAPPING.get(r["DCCB_CODE"]) + if not bank_code: + logger.warning("No bank mapping for DCCB=%s", r["DCCB_CODE"]) + continue + + payload = { + "bankCode": bank_code, + "rrn": r["RRN"], + "branchCode": r["BR_CODE"], + "itemType": 0, + "queueType": "02", + "reterieveTop": "20" + } + + response = self.api.fetch(payload) + items = response.get("response", {}).get("VwQueueItems", []) + + if not items or items[0].get("status") != "PROCESSED": + continue + + item = items[0] + + log_data = { + "bankcode": bank_code, + "jrnl_id": item["journalId"], + "ref_no": item.get("remarks"), + "tran_date": datetime.now().strftime("%d%m%Y"), + "txn_amt": r["TXN_AMT"], + "recv_ac": r["DEST_AC_NO"], + "send_ac": r["SRC_AC_NO"], + "ifsc": r["IFSC_CODE"], + "sender_name": r["REMITTER_NAME"], + "receiver_name": r["BENEFICIARY_NAME"], + "beneficiary_address": r["BENEFICIARY_ADD"], + "sender_info": item["sendeReceiverInfo"] + } + + # INSERT FIRST + self.repo.insert_outward_log(log_data) + + # THEN MARK AS PROCESSED + self.repo.mark_processed(r["TXN_NO"]) + + logger.info( + "Inserted outward log and marked TXN_NO=%s", r["TXN_NO"] + ) \ No newline at end of file diff --git a/repository.py b/repository.py new file mode 100644 index 0000000..c47d328 --- /dev/null +++ b/repository.py @@ -0,0 +1,49 @@ +class Repository: + def __init__(self, db): + self.db = db + + def get_pending_rrns_today(self): + sql = """ + SELECT r.TXN_NO, r.RRN, + q.DCCB_CODE, q.BR_CODE, + q.SRC_AC_NO, q.DEST_AC_NO, + q.IFSC_CODE, q.TXN_AMT, + q.BENEFICIARY_NAME, q.BENEFICIARY_ADD, + q.REMITTER_NAME + FROM NEFT_RTGS_TXN_RRN r + JOIN NEFT_RTGS_TXN_QUEUE q ON q.TXN_NO = r.TXN_NO + WHERE r.MARK_TYPE = 'N' + AND q.TXN_DATE >= TRUNC(SYSDATE) + AND q.TXN_DATE < TRUNC(SYSDATE) + 1 + """ + return self.db.fetch_all(sql) + + def insert_outward_log(self, d): + sql = """ + INSERT INTO outward_neft_api_log ( + BANKCODE, TXNCODE, TXNIND, + JRNL_ID, REF_NO, TRAN_DATE, + TXN_AMT, RECVR_ACCT_NO, SENDER_ACCT_NO, + RECIEVER_IFSC, SENDER_IFSC, + SENDER_NAME, RECIEVER_NAME, + BENEFICIARY_ADDRESS, + SENDER_TO_RECIVER_INFO, + MSG_TYPE + ) VALUES ( + :bankcode, 'OUTWARD NEFT', 'DR', + :jrnl_id, :ref_no, :tran_date, + :txn_amt, :recv_ac, :send_ac, + :ifsc, ' ', + :sender_name, :receiver_name, + :beneficiary_address, + :sender_info, + 'N06' + ) + """ + self.db.execute(sql, d) + + def mark_processed(self, txn_no): + self.db.execute( + "UPDATE NEFT_RTGS_TXN_RRN SET MARK_TYPE='Y' WHERE TXN_NO=:txn", + {"txn": txn_no} + ) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0e3667f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +oracledb==2.0.0 +requests \ No newline at end of file diff --git a/schedular.py b/schedular.py new file mode 100644 index 0000000..42ed6e9 --- /dev/null +++ b/schedular.py @@ -0,0 +1,23 @@ +import time +from logging_config import get_logger + +logger = get_logger(__name__) + +class Scheduler: + def __init__(self, processor, config): + self.processor = processor + self.config = config + + def run(self): + logger.info("Scheduler started") + while True: + try: + self.processor.process() + except Exception: + logger.exception("Processing cycle failed") + + logger.info( + "Sleeping for %s minutes", self.config.poll_interval_minutes + ) + time.sleep(self.config.poll_interval_minutes * 60) +`` \ No newline at end of file