184 lines
6.7 KiB
Python
184 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Data mapper for NEFT SFTP feed.
|
|
Maps parsed NEFT transactions (dicts) to NEFTInwardRecord for database insertion.
|
|
- No padding of account numbers (kept as-is, trimmed).
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from typing import Dict, Any, List
|
|
|
|
from logging_config import get_logger
|
|
from db.models import NEFTInwardRecord # Ensure NEFTInwardRecord has bank_code field
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class NEFTDataMapper:
|
|
"""Maps parsed NEFT transactions to NEFTInwardRecord objects."""
|
|
|
|
# -------------------------
|
|
# Helpers
|
|
# -------------------------
|
|
|
|
@staticmethod
|
|
def convert_date(date_str: str) -> str:
|
|
"""
|
|
Convert NEFT date (YYYYMMDD) to DDMMYYYY.
|
|
|
|
Input : '20260306'
|
|
Output: '06032026'
|
|
|
|
On error, returns today's date in DDMMYYYY format.
|
|
"""
|
|
try:
|
|
if not date_str or len(date_str.strip()) != 8 or not date_str.isdigit():
|
|
raise ValueError(f"Invalid NEFT date format: {date_str!r}")
|
|
dt = datetime.strptime(date_str, "%Y%m%d")
|
|
return dt.strftime("%d%m%Y")
|
|
except Exception as e:
|
|
logger.error(f"Error converting date '{date_str}': {e}")
|
|
return datetime.now().strftime("%d%m%Y")
|
|
|
|
@staticmethod
|
|
def calculate_txnind(amount_in: Any) -> str:
|
|
"""
|
|
Calculate transaction indicator from amount.
|
|
'CR' for credit (>= 0), 'DR' for debit (< 0).
|
|
|
|
Accepts Decimal or string. Defaults to 'CR' on error.
|
|
"""
|
|
try:
|
|
if isinstance(amount_in, Decimal):
|
|
amount = amount_in
|
|
else:
|
|
txt = (str(amount_in) or "").strip()
|
|
amount = Decimal(txt) if txt else Decimal("0")
|
|
return 'DR' if amount < 0 else 'CR'
|
|
except Exception as e:
|
|
logger.error(f"Error calculating TXNIND for amount '{amount_in}': {e}")
|
|
return 'CR'
|
|
|
|
@staticmethod
|
|
def process_status(status: str) -> str:
|
|
"""
|
|
Normalize status field.
|
|
|
|
- If contains 'processed' (case-insensitive) -> 'Processed'
|
|
- If equals 'PROS' (common NEFT code) -> 'Processed'
|
|
- If equals 'WAIT' -> 'Waiting'
|
|
- Else return original status (trimmed)
|
|
"""
|
|
try:
|
|
if not status:
|
|
return ''
|
|
s = status.strip()
|
|
sl = s.lower()
|
|
if 'processed' in sl or s.upper() == 'PROS':
|
|
return 'PROCESSED'
|
|
if s.upper() == 'WAIT':
|
|
return 'Waiting'
|
|
return s
|
|
except Exception as e:
|
|
logger.error(f"Error processing status: {e}")
|
|
return status
|
|
|
|
@staticmethod
|
|
def convert_amount(amount_in: Any) -> Decimal:
|
|
"""
|
|
Convert amount to Decimal and return absolute value.
|
|
Use TXNIND to capture the sign semantics.
|
|
"""
|
|
try:
|
|
if isinstance(amount_in, Decimal):
|
|
val = amount_in
|
|
else:
|
|
txt = (str(amount_in) or '').replace(',', '').strip()
|
|
val = Decimal(txt) if txt else Decimal('0')
|
|
return abs(val)
|
|
except Exception as e:
|
|
logger.error(f"Error converting amount '{amount_in}': {e}")
|
|
return Decimal('0')
|
|
|
|
# -------------------------
|
|
# Mapping
|
|
# -------------------------
|
|
|
|
@classmethod
|
|
def map_transaction(cls, parsed_txn: Dict[str, Any], bankcode: str) -> NEFTInwardRecord:
|
|
"""
|
|
Map a single parsed NEFT transaction (dict) to NEFTInwardRecord.
|
|
|
|
Args:
|
|
parsed_txn: Dict emitted by SFTPUtrParser
|
|
bankcode : Bank code for this transaction (mapped to NEFTInwardRecord.bank_code)
|
|
"""
|
|
try:
|
|
# Amount handling
|
|
amount_in = parsed_txn.get('amount', '0')
|
|
txn_amt = cls.convert_amount(amount_in)
|
|
txnind = cls.calculate_txnind(amount_in)
|
|
|
|
# Date handling
|
|
txn_date_raw = parsed_txn.get('tran_date', '') or ''
|
|
txn_date_ddmmyyyy = cls.convert_date(txn_date_raw)
|
|
|
|
# Account numbers: NO padding, just trim
|
|
sender_acct = (parsed_txn.get('remitter_acct_no') or '').strip()
|
|
recvr_acct = (parsed_txn.get('benef_acct_no') or '').strip()
|
|
|
|
# Status normalization
|
|
status_norm = cls.process_status(parsed_txn.get('status', ''))
|
|
|
|
# Receiver account name: best available proxy is beneficiary_details
|
|
recvr_acct_name = (parsed_txn.get('beneficiary_details') or '').strip()
|
|
|
|
record = NEFTInwardRecord(
|
|
|
|
bank_code=bankcode,
|
|
txnind=txnind,
|
|
jrnl_id=(parsed_txn.get('journal_no') or '').strip(),
|
|
ref_no=(parsed_txn.get('utr') or '').strip(),
|
|
txn_date=txn_date_ddmmyyyy,
|
|
txn_amt=txn_amt,
|
|
sender_ifsc=(parsed_txn.get('ifsc_sender') or '').strip(),
|
|
reciever_ifsc=(parsed_txn.get('ifsc_recvr') or '').strip(),
|
|
sender_acct_no=sender_acct,
|
|
sender_acct_name=(parsed_txn.get('sender_acct_name') or '').strip(),
|
|
remitter_detail=(parsed_txn.get('remitter_detail') or '').strip(),
|
|
remitter_info=(parsed_txn.get('remmiter_info') or '').strip(),
|
|
recvr_acct_no=recvr_acct,
|
|
recvr_acct_name=recvr_acct_name,
|
|
status=status_norm,
|
|
reject_code=(parsed_txn.get('reject_code') or '').strip(),
|
|
reject_reason=(parsed_txn.get('reject_reason') or '').strip(),
|
|
benef_address=(parsed_txn.get('benef_address') or '').strip(),
|
|
msg_type=(parsed_txn.get('sub_msg_type') or '').strip(),
|
|
)
|
|
|
|
return record
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error mapping NEFT transaction: {e}", exc_info=True)
|
|
raise
|
|
|
|
@classmethod
|
|
def map_transactions(cls, parsed_transactions: List[Dict[str, Any]], bankcode: str) -> List[NEFTInwardRecord]:
|
|
"""
|
|
Map a list of parsed NEFT transactions to NEFTInwardRecord objects.
|
|
|
|
Args:
|
|
parsed_transactions: List of dicts from SFTPUtrParser
|
|
bankcode : Bank code to be applied to each record
|
|
"""
|
|
records: List[NEFTInwardRecord] = []
|
|
for txn in parsed_transactions:
|
|
try:
|
|
rec = cls.map_transaction(txn, bankcode)
|
|
records.append(rec)
|
|
except Exception as e:
|
|
logger.warning(f"Skipping transaction due to error: {e}")
|
|
logger.info(f"Mapped {len(records)} NEFT transactions for bank {bankcode}")
|
|
return records
|