Files
rtgs_inward_file_based/rtgs_inward_parser.py
2026-03-15 16:09:28 +05:30

344 lines
11 KiB
Python

#!/usr/bin/env python3
"""
UTR Pipe-Delimited File Parser (SFTP feed)
- Robust parsing for files with '|' separator and inconsistent whitespace.
- Returns (transactions, file_metadata, summary_data) exactly like UIHParser style.
- TXN_DATE is left as-is from the file (no time concatenation or conversion).
"""
import csv
import os
import re
from decimal import Decimal, InvalidOperation
from typing import Dict, List, Tuple, Optional
from logging_config import get_logger
logger = get_logger(__name__)
# -------------------------
# Helpers & Normalization
# -------------------------
WS_COLLAPSE_RE = re.compile(r'[ \t\u00A0]+')
def normalize_text(s: Optional[str]) -> str:
"""
Normalize internal whitespace to single spaces, strip ends.
Keep None as ''.
"""
if s is None:
return ''
s = s.replace('\u00A0', ' ')
s = WS_COLLAPSE_RE.sub(' ', s)
return s.strip()
def to_decimal(value: str) -> Optional[Decimal]:
if value is None:
return None
v = value.replace(',', '').strip()
if v == '':
return None
try:
return Decimal(v)
except InvalidOperation:
logger.warning(f"Amount not a valid decimal: {value!r}")
return None
IFSC_RE = re.compile(r'^[A-Z]{4}0[A-Z0-9]{6}$', re.IGNORECASE)
def validate_ifsc(code: str) -> bool:
"""
Gentle IFSC validation: standard format is 11 chars (AAAA0XXXXXX).
Returns False if it doesn't match; NEVER rejects the record.
"""
if not code:
return False
return bool(IFSC_RE.match(code))
# -------------------------
# Parser
# -------------------------
class RTGS_INWARD_Parser:
"""
Parser for SFTP UTR pipe-delimited files.
Returns: (transactions, file_metadata, summary_data)
"""
# Canonical order (maps to snake_case keys) as they appear in the file header
EXPECTED_HEADER = [
"utr", "amount", "sender_acct_name", "remitter_detail", "remmiter_info",
"benef_address", "reject_code", "reject_reason", "journal_no",
"status", "sub_msg_type", "tran_date", "tran_time", "ifsc_sender",
"ifsc_recvr", "remitter_acct_no", "benef_acct_no",
"remitter_details", "beneficiary_details",
]
def __init__(self, file_path: str, encoding_priority: Optional[List[str]] = None):
self.file_path = file_path
self.encoding_priority = encoding_priority or ["utf-8-sig", "cp1252", "latin-1"]
self.transactions: List[Dict] = []
self.file_metadata: Dict = {}
self.summary_data: Dict = {}
def parse(self) -> Tuple[List[Dict], Dict, Dict]:
"""
Main parse method: returns (transactions, file_metadata, summary_data)
"""
try:
rows, header = self._read_rows_with_fallback()
header_map = self._prepare_header_map(header)
self.file_metadata = {
"source_file": os.path.basename(self.file_path),
"columns_detected": header,
"row_count": len(rows),
}
for idx, raw in enumerate(rows, start=1):
rec = self._row_to_transaction(raw, header_map, row_num=idx)
if rec:
self.transactions.append(rec)
self.summary_data = self._build_summary(self.transactions)
logger.info(
f"Parsed {len(self.transactions)} rows from {self.file_path}"
)
return self.transactions, self.file_metadata, self.summary_data
except Exception as e:
logger.error(f"Error parsing SFTP UTR file: {e}", exc_info=True)
raise
# -------------------------
# Internals
# -------------------------
def _read_rows_with_fallback(self) -> Tuple[List[List[str]], List[str]]:
"""
Try multiple encodings. Return (rows, header)
"""
last_err = None
for enc in self.encoding_priority:
try:
with open(self.file_path, 'r', encoding=enc, errors='replace', newline='') as f:
reader = csv.reader(f, delimiter='|')
all_rows = list(reader)
if not all_rows:
raise ValueError("Empty file")
header = [normalize_text(c) for c in all_rows[0]]
rows = [r for r in all_rows[1:]]
logger.info(f"Read {len(rows)} data rows using encoding {enc}")
return rows, header
except Exception as e:
last_err = e
logger.warning(f"Failed to read with encoding={enc}: {e}")
continue
# If we fall through all encodings
raise last_err or RuntimeError("File read failed for all encodings")
def _prepare_header_map(self, header: List[str]) -> Dict[int, str]:
"""
Map column index -> canonical snake_case key.
Unknown/extra headers become normalized snake_case as-is.
"""
def canon(name: str) -> str:
name = name.strip()
name = name.replace('/', '_').replace('-', '_').replace(' ', '_')
return name.lower()
header_norm = [canon(h) for h in header]
if len(header_norm) < len(self.EXPECTED_HEADER):
logger.warning(
f"Header has fewer columns ({len(header_norm)}) than expected ({len(self.EXPECTED_HEADER)}). "
f"Will pad rows defensively."
)
idx_to_key: Dict[int, str] = {}
for i, h in enumerate(header_norm):
idx_to_key[i] = h
return idx_to_key
def _row_to_transaction(self, row: List[str], header_map: Dict[int, str], row_num: int) -> Optional[Dict]:
"""
Convert raw CSV row to a normalized dict (no data-model mapping here).
"""
# Pad or trim to header length (defensive)
max_idx = max(header_map.keys()) if header_map else -1
if len(row) - 1 < max_idx:
row = row + [''] * (max_idx + 1 - len(row))
elif len(row) - 1 > max_idx:
logger.debug(f"Row {row_num} has extra fields; trimming to header size")
# Build base dict with normalized text
raw = {header_map[i]: normalize_text(row[i] if i < len(row) else '') for i in range(max_idx + 1)}
# Collect expected keys; leave as strings except amount where we coerce safely
txn: Dict[str, object] = {k: raw.get(k, '') for k in self.EXPECTED_HEADER}
txn['creditor_amt'] = raw.get('amount', '')
# Amount normalization
amt = to_decimal(str(txn.get('amount', '') or ''))
txn['amount'] = amt if amt is not None else ''
# IFSC checks (gentle logs only)
ifsc_sender = str(txn.get('ifsc_sender') or '')
ifsc_recvr = str(txn.get('ifsc_recvr') or '')
if ifsc_sender and not validate_ifsc(ifsc_sender):
logger.debug(f"Row {row_num} sender IFSC looks non-standard: {ifsc_sender}")
if ifsc_recvr and not validate_ifsc(ifsc_recvr):
logger.debug(f"Row {row_num} receiver IFSC looks non-standard: {ifsc_recvr}")
# TXN_DATE: keep as-is from file; ignore time entirely
txn['tran_date'] = str(txn.get('tran_date') or '')
txn['tran_time'] = '' # explicitly blank to signal unused
# Basic sanity: UTR presence
if not str(txn.get('utr') or '').strip():
logger.debug(f"Row {row_num} skipped: missing UTR")
return None
return txn
def _build_summary(self, txns: List[Dict]) -> Dict:
"""
Build compact summary:
- total_count
- amount_total
- by_status: count, amount
"""
total_count = len(txns)
amount_total = Decimal('0')
by_status: Dict[str, Dict[str, object]] = {}
for t in txns:
amt = t.get('amount')
if isinstance(amt, Decimal):
pass
elif isinstance(amt, str):
try:
amt = Decimal(amt)
except InvalidOperation:
amt = Decimal('0')
elif amt is None:
amt = Decimal('0')
amount_total += amt
st = (str(t.get('status') or '')).upper()
if st not in by_status:
by_status[st] = {'count': 0, 'amount': Decimal('0')}
by_status[st]['count'] += 1
by_status[st]['amount'] = by_status[st]['amount'] + amt
by_status_str = {k: {'count': v['count'], 'amount': f"{v['amount']:.2f}"} for k, v in by_status.items()}
return {
'total_count': total_count,
'amount_total': f"{amount_total:.2f}",
'by_status': by_status_str
}
# -------------------------
# Printing Utilities
# -------------------------
def print_transactions(transactions: List[Dict], limit: Optional[int] = 50):
"""
Console print (raw transaction dict view similar to UIH print).
Includes all fields except time, REJECT_CODE, and REJECT_REASON.
"""
cols = [
('utr', 20),
('amount', 12),
('status', 8),
('journal_no', 14),
('tran_date', 10),
('sender_acct_name', 28),
('remitter_acct_no', 22),
('benef_acct_no', 22),
('ifsc_sender', 12),
('ifsc_recvr', 12),
('remitter_detail', 28),
('remmiter_info', 24),
('beneficiary_details', 30),
('benef_address', 30),
('sub_msg_type', 10),
]
header = " ".join([f"{name.upper():<{w}}" for name, w in cols])
print("\n" + "=" * len(header))
print(header)
print("=" * len(header))
shown = 0
for txn in transactions:
row = []
for name, w in cols:
val = txn.get(name, '')
if isinstance(val, Decimal):
val = f"{val:.2f}"
row.append(f"{str(val)[:w]:<{w}}")
print(" ".join(row))
shown += 1
if limit and shown >= limit:
print(f"... ({len(transactions) - shown} more rows not shown)")
break
print("=" * len(header))
print(f"Total transactions parsed: {len(transactions)}\n")
def print_metadata(metadata: Dict):
"""Print file metadata (UIH-like)."""
print("\n" + "=" * 80)
print("FILE METADATA")
print("=" * 80)
for key, value in metadata.items():
print(f"{key.upper():<20}: {value}")
print("=" * 80 + "\n")
def print_summary(summary: Dict):
"""Print summary data."""
if summary:
print("\n" + "=" * 80)
print("SUMMARY DATA")
print("=" * 80)
for key, value in summary.items():
print(f"{key.upper()}: {value}")
print("=" * 80 + "\n")
# -------------------------
# Runner
# -------------------------
if __name__ == '__main__':
from logging_config import setup_logging
setup_logging()
parser = SFTPUtrParser('/home/bishwajeet/test_parser/06032026_14_NEFT_INWARD.TXT')
transactions, metadata, summary = parser.parse()
print_metadata(metadata)
print_transactions(transactions, limit=80)
print_summary(summary)
logger.info(f"Parsing complete. Extracted {len(transactions)} transactions")