#!/usr/bin/env python3 """ UTR Pipe-Delimited File Parser (SFTP feed) - Robust parsing for files with '|' separator and inconsistent whitespace. - Returns (transactions, file_metadata, summary_data) exactly like UIHParser style. - TXN_DATE is left as-is from the file (no time concatenation or conversion). """ import csv import os import re from decimal import Decimal, InvalidOperation from typing import Dict, List, Tuple, Optional from logging_config import get_logger logger = get_logger(__name__) # ------------------------- # Helpers & Normalization # ------------------------- WS_COLLAPSE_RE = re.compile(r'[ \t\u00A0]+') def normalize_text(s: Optional[str]) -> str: """ Normalize internal whitespace to single spaces, strip ends. Keep None as ''. """ if s is None: return '' s = s.replace('\u00A0', ' ') s = WS_COLLAPSE_RE.sub(' ', s) return s.strip() def to_decimal(value: str) -> Optional[Decimal]: if value is None: return None v = value.replace(',', '').strip() if v == '': return None try: return Decimal(v) except InvalidOperation: logger.warning(f"Amount not a valid decimal: {value!r}") return None IFSC_RE = re.compile(r'^[A-Z]{4}0[A-Z0-9]{6}$', re.IGNORECASE) def validate_ifsc(code: str) -> bool: """ Gentle IFSC validation: standard format is 11 chars (AAAA0XXXXXX). Returns False if it doesn't match; NEVER rejects the record. """ if not code: return False return bool(IFSC_RE.match(code)) # ------------------------- # Parser # ------------------------- class RTGS_INWARD_Parser: """ Parser for SFTP UTR pipe-delimited files. Returns: (transactions, file_metadata, summary_data) """ # Canonical order (maps to snake_case keys) as they appear in the file header EXPECTED_HEADER = [ "utr", "amount", "sender_acct_name", "remitter_detail", "remmiter_info", "benef_address", "reject_code", "reject_reason", "journal_no", "status", "sub_msg_type", "tran_date", "tran_time", "ifsc_sender", "ifsc_recvr", "remitter_acct_no", "benef_acct_no", "remitter_details", "beneficiary_details", ] def __init__(self, file_path: str, encoding_priority: Optional[List[str]] = None): self.file_path = file_path self.encoding_priority = encoding_priority or ["utf-8-sig", "cp1252", "latin-1"] self.transactions: List[Dict] = [] self.file_metadata: Dict = {} self.summary_data: Dict = {} def parse(self) -> Tuple[List[Dict], Dict, Dict]: """ Main parse method: returns (transactions, file_metadata, summary_data) """ try: rows, header = self._read_rows_with_fallback() header_map = self._prepare_header_map(header) self.file_metadata = { "source_file": os.path.basename(self.file_path), "columns_detected": header, "row_count": len(rows), } for idx, raw in enumerate(rows, start=1): rec = self._row_to_transaction(raw, header_map, row_num=idx) if rec: self.transactions.append(rec) self.summary_data = self._build_summary(self.transactions) logger.info( f"Parsed {len(self.transactions)} rows from {self.file_path}" ) return self.transactions, self.file_metadata, self.summary_data except Exception as e: logger.error(f"Error parsing SFTP UTR file: {e}", exc_info=True) raise # ------------------------- # Internals # ------------------------- def _read_rows_with_fallback(self) -> Tuple[List[List[str]], List[str]]: """ Try multiple encodings. Return (rows, header) """ last_err = None for enc in self.encoding_priority: try: with open(self.file_path, 'r', encoding=enc, errors='replace', newline='') as f: reader = csv.reader(f, delimiter='|') all_rows = list(reader) if not all_rows: raise ValueError("Empty file") header = [normalize_text(c) for c in all_rows[0]] rows = [r for r in all_rows[1:]] logger.info(f"Read {len(rows)} data rows using encoding {enc}") return rows, header except Exception as e: last_err = e logger.warning(f"Failed to read with encoding={enc}: {e}") continue # If we fall through all encodings raise last_err or RuntimeError("File read failed for all encodings") def _prepare_header_map(self, header: List[str]) -> Dict[int, str]: """ Map column index -> canonical snake_case key. Unknown/extra headers become normalized snake_case as-is. """ def canon(name: str) -> str: name = name.strip() name = name.replace('/', '_').replace('-', '_').replace(' ', '_') return name.lower() header_norm = [canon(h) for h in header] if len(header_norm) < len(self.EXPECTED_HEADER): logger.warning( f"Header has fewer columns ({len(header_norm)}) than expected ({len(self.EXPECTED_HEADER)}). " f"Will pad rows defensively." ) idx_to_key: Dict[int, str] = {} for i, h in enumerate(header_norm): idx_to_key[i] = h return idx_to_key def _row_to_transaction(self, row: List[str], header_map: Dict[int, str], row_num: int) -> Optional[Dict]: """ Convert raw CSV row to a normalized dict (no data-model mapping here). """ # Pad or trim to header length (defensive) max_idx = max(header_map.keys()) if header_map else -1 if len(row) - 1 < max_idx: row = row + [''] * (max_idx + 1 - len(row)) elif len(row) - 1 > max_idx: logger.debug(f"Row {row_num} has extra fields; trimming to header size") # Build base dict with normalized text raw = {header_map[i]: normalize_text(row[i] if i < len(row) else '') for i in range(max_idx + 1)} # Collect expected keys; leave as strings except amount where we coerce safely txn: Dict[str, object] = {k: raw.get(k, '') for k in self.EXPECTED_HEADER} txn['creditor_amt'] = raw.get('amount', '') # Amount normalization amt = to_decimal(str(txn.get('amount', '') or '')) txn['amount'] = amt if amt is not None else '' # IFSC checks (gentle logs only) ifsc_sender = str(txn.get('ifsc_sender') or '') ifsc_recvr = str(txn.get('ifsc_recvr') or '') if ifsc_sender and not validate_ifsc(ifsc_sender): logger.debug(f"Row {row_num} sender IFSC looks non-standard: {ifsc_sender}") if ifsc_recvr and not validate_ifsc(ifsc_recvr): logger.debug(f"Row {row_num} receiver IFSC looks non-standard: {ifsc_recvr}") # TXN_DATE: keep as-is from file; ignore time entirely txn['tran_date'] = str(txn.get('tran_date') or '') txn['tran_time'] = '' # explicitly blank to signal unused # Basic sanity: UTR presence if not str(txn.get('utr') or '').strip(): logger.debug(f"Row {row_num} skipped: missing UTR") return None return txn def _build_summary(self, txns: List[Dict]) -> Dict: """ Build compact summary: - total_count - amount_total - by_status: count, amount """ total_count = len(txns) amount_total = Decimal('0') by_status: Dict[str, Dict[str, object]] = {} for t in txns: amt = t.get('amount') if isinstance(amt, Decimal): pass elif isinstance(amt, str): try: amt = Decimal(amt) except InvalidOperation: amt = Decimal('0') elif amt is None: amt = Decimal('0') amount_total += amt st = (str(t.get('status') or '')).upper() if st not in by_status: by_status[st] = {'count': 0, 'amount': Decimal('0')} by_status[st]['count'] += 1 by_status[st]['amount'] = by_status[st]['amount'] + amt by_status_str = {k: {'count': v['count'], 'amount': f"{v['amount']:.2f}"} for k, v in by_status.items()} return { 'total_count': total_count, 'amount_total': f"{amount_total:.2f}", 'by_status': by_status_str } # ------------------------- # Printing Utilities # ------------------------- def print_transactions(transactions: List[Dict], limit: Optional[int] = 50): """ Console print (raw transaction dict view similar to UIH print). Includes all fields except time, REJECT_CODE, and REJECT_REASON. """ cols = [ ('utr', 20), ('amount', 12), ('status', 8), ('journal_no', 14), ('tran_date', 10), ('sender_acct_name', 28), ('remitter_acct_no', 22), ('benef_acct_no', 22), ('ifsc_sender', 12), ('ifsc_recvr', 12), ('remitter_detail', 28), ('remmiter_info', 24), ('beneficiary_details', 30), ('benef_address', 30), ('sub_msg_type', 10), ] header = " ".join([f"{name.upper():<{w}}" for name, w in cols]) print("\n" + "=" * len(header)) print(header) print("=" * len(header)) shown = 0 for txn in transactions: row = [] for name, w in cols: val = txn.get(name, '') if isinstance(val, Decimal): val = f"{val:.2f}" row.append(f"{str(val)[:w]:<{w}}") print(" ".join(row)) shown += 1 if limit and shown >= limit: print(f"... ({len(transactions) - shown} more rows not shown)") break print("=" * len(header)) print(f"Total transactions parsed: {len(transactions)}\n") def print_metadata(metadata: Dict): """Print file metadata (UIH-like).""" print("\n" + "=" * 80) print("FILE METADATA") print("=" * 80) for key, value in metadata.items(): print(f"{key.upper():<20}: {value}") print("=" * 80 + "\n") def print_summary(summary: Dict): """Print summary data.""" if summary: print("\n" + "=" * 80) print("SUMMARY DATA") print("=" * 80) for key, value in summary.items(): print(f"{key.upper()}: {value}") print("=" * 80 + "\n") # ------------------------- # Runner # ------------------------- if __name__ == '__main__': from logging_config import setup_logging setup_logging() parser = SFTPUtrParser('/home/bishwajeet/test_parser/06032026_14_NEFT_INWARD.TXT') transactions, metadata, summary = parser.parse() print_metadata(metadata) print_transactions(transactions, limit=80) print_summary(summary) logger.info(f"Parsing complete. Extracted {len(transactions)} transactions")