This commit is contained in:
2026-05-05 23:18:54 +05:30
commit fe29ff0c57
13 changed files with 345 additions and 0 deletions
Binary file not shown.
Binary file not shown.
Binary file not shown.
+18
View File
@@ -0,0 +1,18 @@
import requests
from logging_config import get_logger
logger = get_logger(__name__)
class FetchQueueClient:
def __init__(self, config):
self.url = config.FETCH_QUEUE_URL
def fetch(self, payload):
response = requests.post(
self.url,
json=payload,
verify=False,
timeout=20
)
response.raise_for_status()
return response.json()
+22
View File
@@ -0,0 +1,22 @@
import os
class Config:
BANK_CODE_MAPPING = {
"9": "0015", "12": "0014", "13": "0016", "14": "0009",
"15": "0005", "16": "0001", "7": "0013", "6": "0002",
"5": "0007", "17": "0020", "1": "0012", "10": "0017",
"11": "0018", "2": "0003", "4": "0004",
"8": "0006", "3": "0021"
}
FETCH_QUEUE_URL = "https://43.225.3.224:443/IPKS_Queue_Generation/fetchAllQueue"
def __init__(self):
self.db_user = os.getenv("DB_USER", "pacs_db")
self.db_password = os.getenv("DB_PASSWORD", "pacs_db")
self.db_dsn = os.getenv("DB_DSN", "testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB")
self.poll_interval_minutes = int(os.getenv("POLL_INTERVAL_MINUTES", "30"))
def validate(self):
if not self.db_user or not self.db_password or not self.db_dsn:
raise RuntimeError("Database configuration missing")
+69
View File
@@ -0,0 +1,69 @@
from logging_config import get_logger
logger = get_logger(__name__)
class OracleDB:
def __init__(self, connector):
self.connector = connector
def fetch_all(self, sql):
conn = cur = None
try:
conn = self.connector.get_connection()
cur = conn.cursor()
cur.execute(sql)
cols = [c[0] for c in cur.description]
return [dict(zip(cols, r)) for r in cur.fetchall()]
except Exception:
logger.error("Error executing fetch_all", exc_info=True)
raise
finally:
if cur:
cur.close()
if conn:
conn.close()
def execute(self, sql, params):
conn = cur = None
try:
conn = self.connector.get_connection()
cur = conn.cursor()
cur.execute(sql, params)
conn.commit()
except Exception:
if conn:
conn.rollback()
logger.error("Error executing SQL", exc_info=True)
raise
finally:
if cur:
cur.close()
if conn:
conn.close()
def call_procedure(self, proc_name):
conn = cur = None
try:
conn = self.connector.get_connection()
cur = conn.cursor()
logger.info("Calling stored procedure: %s", proc_name)
try:
cur.callproc(proc_name)
except Exception:
cur.execute(f"BEGIN {proc_name}; END;")
conn.commit()
logger.info("Stored procedure %s executed successfully", proc_name)
return True
except Exception as e:
logger.error(
"Stored procedure %s failed: %s",
proc_name,
str(e),
exc_info=True
)
return False
finally:
if cur:
cur.close()
if conn:
conn.close()
+10
View File
@@ -0,0 +1,10 @@
import logging
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s"
)
def get_logger(name):
return logging.getLogger(name)
+26
View File
@@ -0,0 +1,26 @@
from logging_config import setup_logging
from config import Config
from oracle_connector import OracleConnector
from db import OracleDB
from repository import Repository
from api_client import FetchQueueClient
from processor import Processor
from scheduler import Scheduler
def main():
setup_logging()
config = Config()
config.validate()
connector = OracleConnector(config)
db = OracleDB(connector)
repo = Repository(db)
api = FetchQueueClient(config)
processor = Processor(config, repo, api)
scheduler = Scheduler(processor, config)
scheduler.run()
if __name__ == "__main__":
main()
+37
View File
@@ -0,0 +1,37 @@
import oracledb
from logging_config import get_logger
logger = get_logger(__name__)
class OracleConnector:
def __init__(self, config):
self.config = config
self.pool = None
def initialize_pool(self):
try:
self.pool = oracledb.create_pool(
user=self.config.db_user,
password=self.config.db_password,
dsn=self.config.db_dsn,
min=1,
max=5,
increment=1
)
logger.info("Oracle connection pool initialized")
except Exception:
logger.error("Failed to initialize Oracle pool", exc_info=True)
raise
def get_connection(self):
if not self.pool:
self.initialize_pool()
return self.pool.acquire()
def close(self):
if self.pool:
try:
self.pool.close()
logger.info("Oracle pool closed")
except Exception:
logger.error("Error closing pool", exc_info=True)
+61
View File
@@ -0,0 +1,61 @@
from datetime import datetime
from logging_config import get_logger
logger = get_logger(__name__)
class Processor:
def __init__(self, config, repo, api):
self.config = config
self.repo = repo
self.api = api
def process(self):
rows = self.repo.get_pending_rrns_today()
logger.info("Pending transactions count: %s", len(rows))
insert_count = 0
for r in rows:
bank_code = self.config.BANK_CODE_MAPPING.get(r["DCCB_CODE"])
if not bank_code:
continue
payload = {
"bankCode": bank_code,
"rrn": r["RRN_TWO"],
"branchCode": r["BR_CODE"],
"itemType": 0,
"queueType": "02",
"reterieveTop": "20"
}
response = self.api.fetch(payload)
items = response.get("response", {}).get("VwQueueItems", [])
if not items or items[0].get("status") != "PROCESSED":
continue
tran_date_str = r["TXN_DATE"].strftime("%d%m%Y")
item = items[0]
log_data = {
"bankcode": bank_code,
"jrnl_id": item["journalId"],
"ref_no": item.get("remarks"),
"tran_date": tran_date_str,
"txn_amt": r["TXN_AMT"],
"recv_ac": r["DEST_AC_NO"],
"send_ac": r["SRC_AC_NO"],
"ifsc": r["IFSC_CODE"],
"sender_ifsc": item.get("sender_ifsc"),
"sender_name": r["REMITTER_NAME"],
"receiver_name": r["BENEFICIARY_NAME"],
"beneficiary_address": r["BENEFICIARY_ADD"],
"sender_info": item["sendeReceiverInfo"]
}
self.repo.insert_outward_log(log_data)
self.repo.mark_processed(r["TXN_NO"])
insert_count += 1
return insert_count
+60
View File
@@ -0,0 +1,60 @@
from logging_config import get_logger
logger = get_logger(__name__)
class Repository:
def __init__(self, db):
self.db = db
def get_pending_rrns_today(self):
sql = """
SELECT r.TXN_NO,
r.RRN_TWO,
q.TXN_DATE,
q.DCCB_CODE,
q.BR_CODE,
q.SRC_AC_NO,
q.DEST_AC_NO,
q.IFSC_CODE,
q.TXN_AMT,
q.BENEFICIARY_NAME,
q.BENEFICIARY_ADD,
q.REMITTER_NAME
FROM NEFT_RTGS_TXN_RRN r
JOIN NEFT_RTGS_TXN_QUEUE q ON q.TXN_NO = r.TXN_NO
WHERE r.MARK_TYPE = 'N'
"""
return self.db.fetch_all(sql)
def insert_outward_log(self, d):
sql = """
INSERT INTO outward_neft_api_log (
BANKCODE, TXNCODE, TXNIND,
JRNL_ID, REF_NO, TRAN_DATE,
TXN_AMT, RECVR_ACCT_NO, SENDER_ACCT_NO,
RECIEVER_IFSC, SENDER_IFSC,
SENDER_NAME, RECIEVER_NAME,
BENEFICIARY_ADDRESS,
SENDER_TO_RECIVER_INFO,
MSG_TYPE
) VALUES (
:bankcode, 'OUTWARD NEFT', 'DR',
:jrnl_id, :ref_no, :tran_date,
:txn_amt, :recv_ac, :send_ac,
:ifsc, :sender_ifsc,
:sender_name, :receiver_name,
:beneficiary_address,
:sender_info,
'N06'
)
"""
self.db.execute(sql, d)
def mark_processed(self, txn_no):
self.db.execute(
"UPDATE NEFT_RTGS_TXN_RRN SET MARK_TYPE='Y' WHERE TXN_NO=:txn",
{"txn": txn_no}
)
def call_neft_api_txn_post(self):
return self.db.call_procedure("neft_api_txn_post")
+2
View File
@@ -0,0 +1,2 @@
oracledb==2.0.0
requests
+40
View File
@@ -0,0 +1,40 @@
import time
from datetime import datetime, date
from logging_config import get_logger
logger = get_logger(__name__)
class Scheduler:
def __init__(self, processor, config):
self.processor = processor
self.config = config
self.last_cycle_date = None
self.cycle_no = 0
def _update_cycle(self):
today = date.today()
if self.last_cycle_date != today:
self.cycle_no = 1
self.last_cycle_date = today
else:
self.cycle_no += 1
def run(self):
logger.info("NEFT OUTWARD Scheduler started")
while True:
try:
self._update_cycle()
insert_count = self.processor.process()
if insert_count > 0:
logger.info("Calling neft_api_txn_post procedure...")
if self.processor.repo.call_neft_api_txn_post():
logger.info("Transaction post-processing completed successfully")
else:
logger.error("Transaction post-processing failed")
except Exception:
logger.exception("Scheduler cycle failed")
time.sleep(self.config.poll_interval_minutes * 60)