diff --git a/upi.py b/upi.py new file mode 100644 index 0000000..4e204f5 --- /dev/null +++ b/upi.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +""" +Main application entry point. +Runs ACH file processing scheduler. +""" + +import logging +from logging_config import setup_logging, get_logger +from scheduler import Scheduler + +# Initialize logging +logging.getLogger("paramiko").setLevel(logging.WARNING) +logger = setup_logging(log_level=logging.INFO) +app_logger = get_logger(__name__) + + +def main(): + """Main application function.""" + app_logger.info("Application started") + + try: + # Run the scheduler + scheduler = Scheduler() + scheduler.run() + app_logger.info("Application completed successfully") + except KeyboardInterrupt: + app_logger.info("Application interrupted by user") + except Exception as e: + app_logger.error(f"An error occurred: {e}", exc_info=True) + raise + + +if __name__ == "__main__": + main() +#!/usr/bin/env python3 +""" +Main application entry point. +Runs ACH file processing scheduler. +""" + +import logging +from logging_config import setup_logging, get_logger +from scheduler import Scheduler + +# Initialize logging +logging.getLogger("paramiko").setLevel(logging.WARNING) +logger = setup_logging(log_level=logging.INFO) +app_logger = get_logger(__name__) + + +def main(): + """Main application function.""" + app_logger.info("Application started") + + try: + # Run the scheduler + scheduler = Scheduler() + scheduler.run() + app_logger.info("Application completed successfully") + except KeyboardInterrupt: + app_logger.info("Application interrupted by user") + except Exception as e: + app_logger.error(f"An error occurred: {e}", exc_info=True) + raise + + +if __name__ == "__main__": + main() + diff --git a/upi_parser.py b/upi_parser.py new file mode 100644 index 0000000..034feb2 --- /dev/null +++ b/upi_parser.py @@ -0,0 +1,270 @@ +import os +import logging +from typing import List, Dict, Tuple, Optional +from decimal import Decimal, InvalidOperation + +# ------------------------- +# Logger +# ------------------------- +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# ------------------------- +# Helpers +# ------------------------- +def normalize_text(val: str) -> str: + return str(val or "").strip() + + +def to_decimal(val: str) -> Optional[Decimal]: + try: + return Decimal(val.strip()) + except Exception: + return None + + +# ------------------------- +# Parser Class +# ------------------------- +class UPIParser: + """ + Parser for UPI Bank Switch Report (NPCI Cycle) + """ + + EXPECTED_HEADER = [ + "rrn", + "type", + "txn_datetime", + "credit_account", + "status", + "amount", + "note", + ] + + def __init__(self, file_path: str): + self.file_path = file_path + self.transactions: List[Dict] = [] + self.file_metadata: Dict = {} + self.summary_data: Dict = {} + + # ------------------------- + # MAIN PARSE + # ------------------------- + def parse(self) -> Tuple[List[Dict], Dict, Dict]: + try: + rows = self._read_rows() + + self.file_metadata = { + "source_file": os.path.basename(self.file_path), + "row_count": len(rows), + "columns": self.EXPECTED_HEADER, + } + + for idx, row in enumerate(rows, start=1): + txn = self._row_to_transaction(row, idx) + if txn: + self.transactions.append(txn) + + self.summary_data = self._build_summary(self.transactions) + + logger.info(f"Parsed {len(self.transactions)} rows from {self.file_path}") + return self.transactions, self.file_metadata, self.summary_data + + except Exception as e: + logger.error(f"Error parsing file: {e}", exc_info=True) + raise + + # ------------------------- + # READ FILE + # ------------------------- + def _read_rows(self) -> List[List[str]]: + rows = [] + + with open(self.file_path, 'r', encoding='utf-8', errors='replace') as f: + for line in f: + line = line.strip() + + # Skip unwanted lines + if ( + not line + or "UPI BANK SWITCH REPORT" in line + or line.startswith("===") + or line.startswith("---") + or line.startswith("RRN") + ): + continue + + parts = line.split('|') + + if len(parts) < 7: + logger.debug(f"Skipping malformed row: {line}") + continue + + rows.append(parts[:7]) + + return rows + + # ------------------------- + # ROW -> TRANSACTION + # ------------------------- + def _row_to_transaction(self, row: List[str], row_num: int) -> Optional[Dict]: + + txn = { + "rrn": normalize_text(row[0]), + "type": normalize_text(row[1]), + "txn_datetime": normalize_text(row[2]), + "credit_account": normalize_text(row[3]), + "status": normalize_text(row[4]), + "amount": "", + "note": normalize_text(row[6]), + } + + # Amount conversion + amt = to_decimal(row[5]) + txn["amount"] = amt if amt is not None else "" + + # Split datetime + if txn["txn_datetime"]: + try: + date, time = txn["txn_datetime"].split(" ") + txn["txn_date"] = date + txn["txn_time"] = time + except Exception: + txn["txn_date"] = txn["txn_datetime"] + txn["txn_time"] = "" + + if not txn["rrn"]: + logger.debug(f"Skipping row {row_num}: Missing RRN") + return None + + return txn + + # ------------------------- + # SUMMARY + # ------------------------- + def _build_summary(self, txns: List[Dict]) -> Dict: + total_count = len(txns) + total_amount = Decimal("0") + + by_status: Dict[str, Dict] = {} + by_type: Dict[str, Dict] = {} + + for t in txns: + amt = t.get("amount") or Decimal("0") + + if isinstance(amt, str): + try: + amt = Decimal(amt) + except: + amt = Decimal("0") + + total_amount += amt + + status = t.get("status", "").upper() + txn_type = t.get("type", "").upper() + + # Status grouping + if status not in by_status: + by_status[status] = {"count": 0, "amount": Decimal("0")} + by_status[status]["count"] += 1 + by_status[status]["amount"] += amt + + # Type grouping + if txn_type not in by_type: + by_type[txn_type] = {"count": 0, "amount": Decimal("0")} + by_type[txn_type]["count"] += 1 + by_type[txn_type]["amount"] += amt + + # Convert Decimal → string + by_status_final = { + k: {"count": v["count"], "amount": f"{v['amount']:.2f}"} + for k, v in by_status.items() + } + + by_type_final = { + k: {"count": v["count"], "amount": f"{v['amount']:.2f}"} + for k, v in by_type.items() + } + + return { + "total_count": total_count, + "total_amount": f"{total_amount:.2f}", + "by_status": by_status_final, + "by_type": by_type_final, + } + + +# ------------------------- +# PRINT FUNCTIONS +# ------------------------- +def print_transactions(transactions: List[Dict], limit: Optional[int] = 50): + + headers = [ + ("RRN", 15), + ("TYPE", 8), + ("DATE", 10), + ("TIME", 10), + ("ACCOUNT", 20), + ("STATUS", 10), + ("AMOUNT", 12), + ("NOTE", 25), + ] + + header_line = " ".join([f"{h:<{w}}" for h, w in headers]) + print("\n" + "=" * len(header_line)) + print(header_line) + print("=" * len(header_line)) + + for i, txn in enumerate(transactions): + row = [ + txn.get("rrn", ""), + txn.get("type", ""), + txn.get("txn_date", ""), + txn.get("txn_time", ""), + txn.get("credit_account", ""), + txn.get("status", ""), + f"{txn.get('amount')}" if txn.get("amount") else "", + txn.get("note", ""), + ] + + print(" ".join(f"{str(val)[:w]:<{w}}" for val, (h, w) in zip(row, headers))) + + if limit and i + 1 >= limit: + print(f"... ({len(transactions) - limit} more rows)") + break + + print("=" * len(header_line)) + print(f"Total: {len(transactions)} records\n") + + +def print_metadata(metadata: Dict): + print("\n===== FILE METADATA =====") + for k, v in metadata.items(): + print(f"{k.upper():20}: {v}") + print("=========================\n") + + +def print_summary(summary: Dict): + print("\n===== SUMMARY =====") + for k, v in summary.items(): + print(f"{k.upper()}: {v}") + print("===================\n") + + +# ------------------------- +# MAIN RUNNER +# ------------------------- +if __name__ == "__main__": + + file_path = r"C:\Users\2780475\Desktop\Test\TUMLUK_31052026_8C_UPI.txt.gz" + + parser = UPISwitchParser(file_path) + + transactions, metadata, summary = parser.parse() + + print_metadata(metadata) + print_transactions(transactions, limit=50) + print_summary(summary) + + logger.info(f"Parsing complete. Extracted {len(transactions)} transactions")