neft_outward

This commit is contained in:
2026-04-22 01:34:12 +05:30
commit 58d8329dbd
12 changed files with 251 additions and 0 deletions
Binary file not shown.
Binary file not shown.
Binary file not shown.
+18
View File
@@ -0,0 +1,18 @@
import requests
from logging_config import get_logger
logger = get_logger(__name__)
class FetchQueueClient:
def __init__(self, config):
self.url = config.FETCH_QUEUE_URL
def fetch(self, payload):
response = requests.post(
self.url,
json=payload,
verify=False,
timeout=20
)
response.raise_for_status()
return response.json()
+22
View File
@@ -0,0 +1,22 @@
import os
class Config:
BANK_CODE_MAPPING = {
"9": "0015", "12": "0014", "13": "0016", "14": "0009",
"15": "0005", "16": "0001", "7": "0013", "6": "0002",
"5": "0007", "17": "0020", "1": "0012", "10": "0017",
"11": "0018", "2": "0003", "4": "0004",
"8": "0006", "3": "0021"
}
FETCH_QUEUE_URL = "https://43.225.3.224:443/IPKS_Queue_Generation/fetchAllQueue"
def __init__(self):
self.db_user = os.getenv("DB_USER", "pacs_db")
self.db_password = os.getenv("DB_PASSWORD", "pacs_db")
self.db_dsn = os.getenv("DB_DSN", "testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB")
self.poll_interval_minutes = int(os.getenv("POLL_INTERVAL_MINUTES", "30"))
def validate(self):
if not self.db_user or not self.db_password or not self.db_dsn:
raise RuntimeError("Database configuration missing")
+38
View File
@@ -0,0 +1,38 @@
import oracledb
from logging_config import get_logger
logger = get_logger(__name__)
# Initialize thick mode (uses Oracle Instant Client)
oracledb.init_oracle_client()
class OracleDB:
def __init__(self, config):
self.config = config
self.conn = None
def connect(self):
try:
self.conn = oracledb.connect(
user=self.config.db_user,
password=self.config.db_password,
dsn=self.config.db_dsn
)
logger.info("Connected to Oracle DB using oracledb")
except Exception as e:
logger.error("Oracle connection failed", exc_info=True)
raise
def fetch_all(self, sql):
cur = self.conn.cursor()
cur.execute(sql)
cols = [c[0] for c in cur.description]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
cur.close()
return rows
def execute(self, sql, params):
cur = self.conn.cursor()
cur.execute(sql, params)
self.conn.commit()
cur.close()
+10
View File
@@ -0,0 +1,10 @@
import logging
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s"
)
def get_logger(name):
return logging.getLogger(name)
+27
View File
@@ -0,0 +1,27 @@
from logging_config import setup_logging
from config import Config
from db import OracleDB
from repository import Repository
from api_client import FetchQueueClient
from processor import Processor
from scheduler import Scheduler
def main():
setup_logging()
config = Config()
config.validate()
db = OracleDB(config)
db.connect()
repo = Repository(db)
api = FetchQueueClient(config)
processor = Processor(config, repo, api)
scheduler = Scheduler(processor, config)
scheduler.run()
if __name__ == "__main__":
main()
+62
View File
@@ -0,0 +1,62 @@
from datetime import datetime
from logging_config import get_logger
logger = get_logger(__name__)
class Processor:
def __init__(self, config, repo, api):
self.config = config
self.repo = repo
self.api = api
def process(self):
rows = self.repo.get_pending_rrns_today()
logger.info("Pending transactions: %s", len(rows))
for r in rows:
bank_code = self.config.BANK_CODE_MAPPING.get(r["DCCB_CODE"])
if not bank_code:
logger.warning("No bank mapping for DCCB=%s", r["DCCB_CODE"])
continue
payload = {
"bankCode": bank_code,
"rrn": r["RRN"],
"branchCode": r["BR_CODE"],
"itemType": 0,
"queueType": "02",
"reterieveTop": "20"
}
response = self.api.fetch(payload)
items = response.get("response", {}).get("VwQueueItems", [])
if not items or items[0].get("status") != "PROCESSED":
continue
item = items[0]
log_data = {
"bankcode": bank_code,
"jrnl_id": item["journalId"],
"ref_no": item.get("remarks"),
"tran_date": datetime.now().strftime("%d%m%Y"),
"txn_amt": r["TXN_AMT"],
"recv_ac": r["DEST_AC_NO"],
"send_ac": r["SRC_AC_NO"],
"ifsc": r["IFSC_CODE"],
"sender_name": r["REMITTER_NAME"],
"receiver_name": r["BENEFICIARY_NAME"],
"beneficiary_address": r["BENEFICIARY_ADD"],
"sender_info": item["sendeReceiverInfo"]
}
# INSERT FIRST
self.repo.insert_outward_log(log_data)
# THEN MARK AS PROCESSED
self.repo.mark_processed(r["TXN_NO"])
logger.info(
"Inserted outward log and marked TXN_NO=%s", r["TXN_NO"]
)
+49
View File
@@ -0,0 +1,49 @@
class Repository:
def __init__(self, db):
self.db = db
def get_pending_rrns_today(self):
sql = """
SELECT r.TXN_NO, r.RRN,
q.DCCB_CODE, q.BR_CODE,
q.SRC_AC_NO, q.DEST_AC_NO,
q.IFSC_CODE, q.TXN_AMT,
q.BENEFICIARY_NAME, q.BENEFICIARY_ADD,
q.REMITTER_NAME
FROM NEFT_RTGS_TXN_RRN r
JOIN NEFT_RTGS_TXN_QUEUE q ON q.TXN_NO = r.TXN_NO
WHERE r.MARK_TYPE = 'N'
AND q.TXN_DATE >= TRUNC(SYSDATE)
AND q.TXN_DATE < TRUNC(SYSDATE) + 1
"""
return self.db.fetch_all(sql)
def insert_outward_log(self, d):
sql = """
INSERT INTO outward_neft_api_log (
BANKCODE, TXNCODE, TXNIND,
JRNL_ID, REF_NO, TRAN_DATE,
TXN_AMT, RECVR_ACCT_NO, SENDER_ACCT_NO,
RECIEVER_IFSC, SENDER_IFSC,
SENDER_NAME, RECIEVER_NAME,
BENEFICIARY_ADDRESS,
SENDER_TO_RECIVER_INFO,
MSG_TYPE
) VALUES (
:bankcode, 'OUTWARD NEFT', 'DR',
:jrnl_id, :ref_no, :tran_date,
:txn_amt, :recv_ac, :send_ac,
:ifsc, ' ',
:sender_name, :receiver_name,
:beneficiary_address,
:sender_info,
'N06'
)
"""
self.db.execute(sql, d)
def mark_processed(self, txn_no):
self.db.execute(
"UPDATE NEFT_RTGS_TXN_RRN SET MARK_TYPE='Y' WHERE TXN_NO=:txn",
{"txn": txn_no}
)
+2
View File
@@ -0,0 +1,2 @@
oracledb==2.0.0
requests
+23
View File
@@ -0,0 +1,23 @@
import time
from logging_config import get_logger
logger = get_logger(__name__)
class Scheduler:
def __init__(self, processor, config):
self.processor = processor
self.config = config
def run(self):
logger.info("Scheduler started")
while True:
try:
self.processor.process()
except Exception:
logger.exception("Processing cycle failed")
logger.info(
"Sleeping for %s minutes", self.config.poll_interval_minutes
)
time.sleep(self.config.poll_interval_minutes * 60)
``