commit a2be502225505e7637546e64e7b9eb8b5036d9d5 Author: Bishwajeet Kumar Rajak Date: Thu Mar 12 12:46:33 2026 +0530 Initial commit diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..54559c9 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,10 @@ +{ + "permissions": { + "allow": [ + "Bash(python3 -m venv:*)", + "Bash(source venv/bin/activate)", + "Bash(python:*)", + "Bash(pip install:*)" + ] + } +} diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ed5239f --- /dev/null +++ b/.env.example @@ -0,0 +1,23 @@ +# Database Configuration +DB_USER=pacs_db +DB_PASSWORD=pacs_db +DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com +DB_PORT=1521 +DB_SERVICE_NAME=IPKSDB +DB_POOL_MIN=2 +DB_POOL_MAX=10 + +# SFTP Configuration +SFTP_HOST=192.168.1.100 +SFTP_PORT=22 +SFTP_USERNAME=ipks +SFTP_PASSWORD=secure_password +SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS + +# Processing Configuration +POLL_INTERVAL_MINUTES=30 +BATCH_SIZE=100 +BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB + +# Logging Configuration +LOG_LEVEL=INFO diff --git a/config.py b/config.py new file mode 100644 index 0000000..835d932 --- /dev/null +++ b/config.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +Configuration management for ACH file processing pipeline. +Loads and validates environment variables. +""" + +import os +from pathlib import Path +from logging_config import get_logger + +logger = get_logger(__name__) + + +class Config: + """Application configuration from environment variables.""" + + def __init__(self): + """Initialize configuration from environment.""" + self._validate_env_file() + self._load_database_config() + self._load_sftp_config() + self._load_processing_config() + + def _validate_env_file(self): + """Check if .env file exists.""" + if not Path('.env').exists(): + logger.warning(".env file not found. Using environment variables or defaults.") + + def _load_database_config(self): + """Load database configuration.""" + self.db_user = os.getenv('DB_USER', 'pacs_db') + self.db_password = os.getenv('DB_PASSWORD', 'pacs_db') + self.db_host = os.getenv('DB_HOST', 'testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com') + self.db_port = int(os.getenv('DB_PORT', '1521')) + self.db_service_name = os.getenv('DB_SERVICE_NAME', 'IPKSDB') + self.db_pool_min = int(os.getenv('DB_POOL_MIN', '2')) + self.db_pool_max = int(os.getenv('DB_POOL_MAX', '10')) + + def _load_sftp_config(self): + """Load SFTP configuration.""" + self.sftp_host = os.getenv('SFTP_HOST', '43.225.3.224') + self.sftp_port = int(os.getenv('SFTP_PORT', '4650')) + self.sftp_username = os.getenv('SFTP_USERNAME', 'ipkssftp') + self.sftp_password = os.getenv('SFTP_PASSWORD', 'Wnb10U11BE7N26') + self.sftp_base_path = os.getenv('SFTP_BASE_PATH', '/home/ipks/IPKS_FILES/REPORTS') + + def _load_processing_config(self): + """Load processing configuration.""" + self.poll_interval_minutes = int(os.getenv('POLL_INTERVAL_MINUTES', '30')) + self.batch_size = int(os.getenv('BATCH_SIZE', '100')) + self.bank_codes = self._parse_bank_codes() + self.log_level = os.getenv('LOG_LEVEL', 'INFO') + + def _parse_bank_codes(self): + """Parse bank codes from comma-separated environment variable.""" + codes_str = os.getenv('BANK_CODES', '0001,0002,0003,0004,0005,0006,0007,0009,0012,0013,0014,0015,0016,0017,0018,0020,0021') + return [code.strip() for code in codes_str.split(',') if code.strip()] + + def get_db_connection_string(self): + """Generate Oracle connection string.""" + return f"{self.db_user}/{self.db_password}@{self.db_host}:{self.db_port}/{self.db_service_name}" + + def validate(self): + """Validate critical configuration.""" + if not self.db_user or not self.db_password: + raise ValueError("Database credentials not configured") + if not self.sftp_username: + logger.warning("SFTP username not configured") + if not self.bank_codes: + raise ValueError("No bank codes configured") + logger.info(f"Configuration validated. Bank codes: {', '.join(self.bank_codes)}") + + +# Global config instance +config = None + + +def get_config(): + """Get or create global config instance.""" + global config + if config is None: + config = Config() + return config + + +if __name__ == '__main__': + cfg = get_config() + cfg.validate() + print(f"Bank Codes: {cfg.bank_codes}") + print(f"SFTP Host: {cfg.sftp_host}:{cfg.sftp_port}") + print(f"Database: {cfg.db_host}:{cfg.db_port}/{cfg.db_service_name}") + print(f"Poll Interval: {cfg.poll_interval_minutes} minutes") diff --git a/db/__init__.py b/db/__init__.py new file mode 100644 index 0000000..701b306 --- /dev/null +++ b/db/__init__.py @@ -0,0 +1,6 @@ +"""Database module for ACH file processing.""" + +from .oracle_connector import OracleConnector +from .repository import Repository + +__all__ = ['OracleConnector', 'Repository'] diff --git a/db/models.py b/db/models.py new file mode 100644 index 0000000..a1a3b11 --- /dev/null +++ b/db/models.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +""" +Data models for ACH file processing. +Represents database records and transactions. +""" + +from dataclasses import dataclass, asdict +from datetime import date, datetime +from decimal import Decimal +from typing import Optional + + +@dataclass +class NEFTOutwardRecord: + """Represents a parsed NEFT Inward transaction mapped to DB columns.""" + + txnind: str # VARCHAR2(2), default "DR" + jrnl_id: str # VARCHAR2(4000), NOT NULL + ref_no: str # VARCHAR2(400), NOT NULL + txn_date: str # VARCHAR2(100), NOT NULL + txn_amt: Optional[Decimal] # NUMBER(17,2) + sender_ifsc: str # VARCHAR2(400) + reciever_ifsc: str # VARCHAR2(400) + sender_acct_no: str # VARCHAR2(400) + sender_acct_name: str # VARCHAR2(400) + recvr_acct_no: str # VARCHAR2(400) + recvr_acct_name: str # VARCHAR2(400) + reject_code: str # VARCHAR2(400) + reject_reason: str # VARCHAR2(400) + benef_address: str # VARCHAR2(400) + msg_type: str # VARCHAR2(400) + bank_code: str + + def to_dict(self): + """Convert to dictionary for DB insertion.""" + return { + "TXNIND": self.txnind, + "BANKCODE": self.bank_code, + "JRNL_ID": self.jrnl_id, + "REF_NO": self.ref_no, + "TRAN_DATE": self.txn_date, + "TXN_AMT": self.txn_amt, + "SENDER_IFSC": self.sender_ifsc, + "RECIEVER_IFSC": self.reciever_ifsc, + "SENDER_ACCT_NO": self.sender_acct_no, + "SENDER_NAME": self.sender_acct_name, + "RECVR_ACCT_NO": self.recvr_acct_no, + "RECIEVER_NAME": self.recvr_acct_name, + "REJECT_CODE": self.reject_code, + "REJECT_REASON": self.reject_reason, + "BENEFICIARY_ADDRESS": self.benef_address, + "MSG_TYPE": self.msg_type, + } + + +@dataclass +class ProcessedFile: + """Represents a processed file record for ach_processed_files table.""" + filename: str + bankcode: str + file_path: str + transaction_count: int + status: str = 'SUCCESS' + error_message: Optional[str] = None + processed_at: Optional[datetime] = None + + def to_dict(self): + """Convert to dictionary for database insertion.""" + return { + 'filename': self.filename, + 'bankcode': self.bankcode, + 'file_path': self.file_path, + 'transaction_count': self.transaction_count, + 'status': self.status, + 'error_message': self.error_message, + 'processed_at': self.processed_at or datetime.now(), + } diff --git a/db/oracle_connector.py b/db/oracle_connector.py new file mode 100644 index 0000000..15ab91b --- /dev/null +++ b/db/oracle_connector.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +""" +Oracle database connection pool manager using oracledb. +Manages connections with pooling and health checks. + +oracledb is the modern, simpler replacement for cx_Oracle. +No Oracle Instant Client required - uses Thick or Thin mode. +""" + +import oracledb +from logging_config import get_logger +from config import get_config + +logger = get_logger(__name__) + + +class OracleConnector: + """Manages Oracle database connections with pooling.""" + + def __init__(self): + """Initialize connection pool.""" + self.pool = None + self.config = get_config() + + def initialize_pool(self): + """Create connection pool.""" + try: + # Build connection string for oracledb + # Format: user/password@host:port/service_name + connection_string = ( + f"{self.config.db_user}/{self.config.db_password}@" + f"{self.config.db_host}:{self.config.db_port}/{self.config.db_service_name}" + ) + + # Create connection pool using oracledb API + # Note: oracledb uses 'min' and 'max' for pool sizing + self.pool = oracledb.create_pool( + dsn=connection_string, + min=self.config.db_pool_min, + max=self.config.db_pool_max, + increment=1, + ) + + logger.debug(f"Oracle connection pool initialized: min={self.config.db_pool_min}, max={self.config.db_pool_max}") + return True + except oracledb.DatabaseError as e: + logger.error(f"Failed to initialize connection pool: {e}", exc_info=True) + return False + except Exception as e: + logger.error(f"Unexpected error initializing pool: {e}", exc_info=True) + return False + + def get_connection(self): + """Get connection from pool.""" + if not self.pool: + self.initialize_pool() + + try: + conn = self.pool.acquire() + logger.debug("Connection acquired from pool") + return conn + except oracledb.DatabaseError as e: + logger.error(f"Failed to acquire connection: {e}", exc_info=True) + raise + except Exception as e: + logger.error(f"Unexpected error acquiring connection: {e}", exc_info=True) + raise + + def close_pool(self): + """Close connection pool.""" + if self.pool: + try: + self.pool.close() + logger.info("Connection pool closed") + except Exception as e: + logger.error(f"Error closing pool: {e}") + + def test_connection(self): + """Test database connectivity.""" + try: + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute("SELECT 1 FROM dual") + result = cursor.fetchone() + cursor.close() + conn.close() + logger.debug("Database connection test successful") + return True + except Exception as e: + logger.error(f"Database connection test failed: {e}") + return False + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.close_pool() + + +# Global connector instance +_connector = None + + +def get_connector(): + """Get or create global connector instance.""" + global _connector + if _connector is None: + _connector = OracleConnector() + return _connector diff --git a/db/repository.py b/db/repository.py new file mode 100644 index 0000000..cb448c8 --- /dev/null +++ b/db/repository.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +""" +Data access layer for NEFT inward file processing. +Handles CRUD operations and transaction management. +""" + +from typing import List, Optional +from logging_config import get_logger +from .oracle_connector import get_connector +from .models import NEFTInwardRecord, ProcessedFile + +logger = get_logger(__name__) + + +class Repository: + """Data access layer for NEFT inward processing.""" + + def __init__(self): + """Initialize repository with connector.""" + self.connector = get_connector() + + # --------------------------------------------------------- + # ADDED: Account validation using last 12 digits of RECVR_ACCT_NO + # --------------------------------------------------------- + def validate_account_exists(self, account_number: str) -> bool: + """ + Validate if account number exists in dep_account table. + + Args: + account_number: Beneficiary account number (RECVR_ACCT_NO) + + Returns: + True if account exists in dep_account.link_accno, False otherwise + """ + if not account_number: + return False + + last12 = str(account_number)[-12:] + + conn = self.connector.get_connection() + try: + cursor = conn.cursor() + cursor.execute( + "SELECT COUNT(*) FROM dep_account WHERE link_accno = :accno", + {'accno': last12} + ) + count = cursor.fetchone()[0] + return count > 0 + except Exception as e: + logger.warning(f"Error validating account {account_number}: {e}") + return False + finally: + cursor.close() + conn.close() + + # --------------------------------------------------------- + # UPDATED: bulk_insert_transactions WITH VALIDATION + # --------------------------------------------------------- + def bulk_insert_transactions(self, transactions: List[NEFTInwardRecord]) -> tuple: + """ + Bulk insert NEFT transactions into inward_neft_api_log. + Records with invalid beneficiary account numbers are skipped. + + Args: + transactions: List of NEFTOutwardRecord objects + + Returns: + (inserted_count, skipped_count) + """ + if not transactions: + logger.warning("No transactions to insert") + return 0, 0 + + valid_transactions = [] + skipped_count = 0 + + + for txn in transactions: + acct = txn.sender_acct_no + + if self.validate_account_exists(acct): + valid_transactions.append(txn) + else: + skipped_count += 1 + + if not valid_transactions: + logger.debug(f"All {skipped_count} transactions skipped (invalid Remitter accounts)") + return 0, skipped_count + + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + + batch_data = [txn.to_dict() for txn in valid_transactions] + logger.info(batch_data) + + insert_sql = """ + INSERT INTO outward_neft_api_log ( + TXNIND, + JRNL_ID, + BANKCODE, + REF_NO, + TRAN_DATE, + TXN_AMT, + SENDER_IFSC, + RECIEVER_IFSC, + SENDER_ACCT_NO, + SENDER_NAME, + RECVR_ACCT_NO, + RECIEVER_NAME, + REJECT_CODE, + REJECT_REASON, + BENEFICIARY_ADDRESS, + MSG_TYPE + ) VALUES ( + :TXNIND, + :JRNL_ID, + :BANKCODE, + :REF_NO, + :TRAN_DATE, + :TXN_AMT, + :SENDER_IFSC, + :RECIEVER_IFSC, + :SENDER_ACCT_NO, + :SENDER_NAME, + :RECVR_ACCT_NO, + :RECIEVER_NAME, + :REJECT_CODE, + :REJECT_REASON, + :BENEFICIARY_ADDRESS, + :MSG_TYPE + ) + """ + + cursor.executemany(insert_sql, batch_data) + conn.commit() + + inserted_count = len(valid_transactions) + logger.info(f"Inserted {inserted_count} NEFT transactions into outward_neft_api_log") + return inserted_count, skipped_count + + except Exception as e: + if conn: + conn.rollback() + logger.error(f"Error inserting NEFT transactions: {e}", exc_info=True) + raise + finally: + if cursor: + cursor.close() + conn.close() + + # --------------------------------------------------------- + # NOTHING ELSE BELOW THIS LINE WAS TOUCHED + # --------------------------------------------------------- + + def is_file_processed(self, filename: str, bankcode: str) -> bool: + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + cursor.execute( + """ + SELECT COUNT(*) + FROM neft_processed_files + WHERE filename = :filename + AND bankcode = :bankcode + """, + {'filename': filename, 'bankcode': bankcode} + ) + count = cursor.fetchone()[0] + return count > 0 + except Exception as e: + logger.error(f"Error checking processed file: {e}", exc_info=True) + return False + finally: + if cursor: + cursor.close() + conn.close() + + def mark_file_processed(self, processed_file: ProcessedFile) -> bool: + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + + file_data = processed_file.to_dict() + insert_sql = """ + INSERT INTO neft_processed_files ( + filename, bankcode, file_path, transaction_count, + status, error_message, processed_at + ) VALUES ( + :filename, :bankcode, :file_path, :transaction_count, + :status, :error_message, :processed_at + ) + """ + + cursor.execute(insert_sql, file_data) + conn.commit() + + logger.info(f"Marked file as processed: {processed_file.filename}") + return True + + except Exception as e: + if conn: + conn.rollback() + logger.error(f"Error marking file as processed: {e}", exc_info=True) + return False + finally: + if cursor: + cursor.close() + conn.close() + + def get_processed_files(self, bankcode: Optional[str] = None) -> List[str]: + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + + if bankcode: + cursor.execute( + """ + SELECT filename + FROM neft_processed_files + WHERE bankcode = :bankcode + ORDER BY processed_at DESC + """, + {'bankcode': bankcode} + ) + else: + cursor.execute( + """ + SELECT filename + FROM neft_processed_files + ORDER BY processed_at DESC + """ + ) + + filenames = [row[0] for row in cursor.fetchall()] + return filenames + + except Exception as e: + logger.error(f"Error retrieving processed files: {e}", exc_info=True) + return [] + finally: + if cursor: + cursor.close() + conn.close() + + def call_neft_api_txn_post(self) -> bool: + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + logger.info("Calling neft_api_txn_post procedure to process all inserted transactions...") + + try: + cursor.callproc('neft_api_txn_post') + except Exception: + cursor.execute("BEGIN neft_api_txn_post; END;") + + conn.commit() + logger.info("neft_api_txn_post procedure executed successfully") + return True + except Exception as e: + logger.error(f"Error calling neft_api_txn_post procedure: {e}", exc_info=True) + return False + finally: + if cursor: + cursor.close() + conn.close() + + def verify_tables_exist(self): + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + + try: + cursor.execute("SELECT COUNT(*) FROM inward_neft_api_log WHERE ROWNUM = 1") + logger.info("✓ inward_neft_api_log table exists") + except Exception as e: + logger.error(f"✗ inward_neft_api_log table not found: {e}") + raise SystemExit( + "FATAL: inward_neft_api_log table must be created manually before running this application" + ) + + try: + cursor.execute("SELECT COUNT(*) FROM neft_processed_files WHERE ROWNUM = 1") + logger.info("✓ neft_processed_files table exists") + except Exception as e: + logger.error(f"✗ neft_processed_files table not found: {e}") + raise SystemExit( + "FATAL: neft_processed_files table must be created manually before running this application" + ) + + logger.info("Database tables verified successfully") + + except SystemExit: + raise + except Exception as e: + logger.error(f"Error verifying tables: {e}", exc_info=True) + raise SystemExit(f"FATAL: Error verifying database tables: {e}") + finally: + if cursor: + cursor.close() + conn.close() \ No newline at end of file diff --git a/logging_config.py b/logging_config.py new file mode 100644 index 0000000..1b22df3 --- /dev/null +++ b/logging_config.py @@ -0,0 +1,51 @@ +import logging +import logging.handlers +import os +from pathlib import Path + +def setup_logging(log_level=logging.INFO, log_dir="logs"): + """ + Configure logging with both console and file handlers. + + Args: + log_level: logging level (default: logging.INFO) + log_dir: directory to store log files + """ + # Create logs directory if it doesn't exist + Path(log_dir).mkdir(exist_ok=True) + + # Get root logger + logger = logging.getLogger() + logger.setLevel(log_level) + + # Clear existing handlers + logger.handlers.clear() + + # Create formatter + formatter = logging.Formatter( + fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(log_level) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + # File handler (rotating) + log_file = os.path.join(log_dir, 'app.log') + file_handler = logging.handlers.RotatingFileHandler( + log_file, + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5 + ) + file_handler.setLevel(log_level) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger + +def get_logger(name): + """Get a logger instance for a specific module.""" + return logging.getLogger(name) diff --git a/neft_outward.py b/neft_outward.py new file mode 100644 index 0000000..c282ee3 --- /dev/null +++ b/neft_outward.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +""" +Main application entry point. +Runs ACH file processing scheduler. +""" + +import logging +from logging_config import setup_logging, get_logger +from scheduler import Scheduler + +# Initialize logging +logging.getLogger("paramiko").setLevel(logging.WARNING) +logger = setup_logging(log_level=logging.INFO) +app_logger = get_logger(__name__) + + +def main(): + """Main application function.""" + app_logger.info("Application started") + + try: + # Run the scheduler + scheduler = Scheduler() + scheduler.run() + app_logger.info("Application completed successfully") + except KeyboardInterrupt: + app_logger.info("Application interrupted by user") + except Exception as e: + app_logger.error(f"An error occurred: {e}", exc_info=True) + raise + + +if __name__ == "__main__": + main() diff --git a/neft_outward_parser.py b/neft_outward_parser.py new file mode 100644 index 0000000..adcc310 --- /dev/null +++ b/neft_outward_parser.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +""" +UTR Pipe-Delimited File Parser (SFTP feed) +- Robust parsing for files with '|' separator and inconsistent whitespace. +- Returns (transactions, file_metadata, summary_data) exactly like UIHParser style. +- TXN_DATE is left as-is from the file (no time concatenation or conversion). +""" + +import csv +import os +import re +from decimal import Decimal, InvalidOperation +from typing import Dict, List, Tuple, Optional + +from logging_config import get_logger + +logger = get_logger(__name__) + + +# ------------------------- +# Helpers & Normalization +# ------------------------- + +WS_COLLAPSE_RE = re.compile(r'[ \t\u00A0]+') + + +def normalize_text(s: Optional[str]) -> str: + """ + Normalize internal whitespace to single spaces, strip ends. + Keep None as ''. + """ + if s is None: + return '' + s = s.replace('\u00A0', ' ') + s = WS_COLLAPSE_RE.sub(' ', s) + return s.strip() + + +def to_decimal(value: str) -> Optional[Decimal]: + if value is None: + return None + v = value.replace(',', '').strip() + if v == '': + return None + try: + return Decimal(v) + except InvalidOperation: + logger.warning(f"Amount not a valid decimal: {value!r}") + return None + + +IFSC_RE = re.compile(r'^[A-Z]{4}0[A-Z0-9]{6}$', re.IGNORECASE) + + +def validate_ifsc(code: str) -> bool: + """ + Gentle IFSC validation: standard format is 11 chars (AAAA0XXXXXX). + Returns False if it doesn't match; NEVER rejects the record. + """ + if not code: + return False + return bool(IFSC_RE.match(code)) + + +# ------------------------- +# Parser +# ------------------------- + +class NEFT_OUTWARD_Parser: + """ + Parser for SFTP UTR pipe-delimited files. + Returns: (transactions, file_metadata, summary_data) + """ + + # Canonical order (maps to snake_case keys) as they appear in the file header + EXPECTED_HEADER = [ + "utr", "amount", "sender_acct_name", "remitter_detail", "remmiter_info", + "benef_address", "reject_code", "reject_reason", "journal_no", + "status", "sub_msg_type", "tran_date", "tran_time", "ifsc_sender", + "ifsc_recvr", "remitter_acct_no", "benef_acct_no", + "remitter_details", "beneficiary_details", + ] + + def __init__(self, file_path: str, encoding_priority: Optional[List[str]] = None): + self.file_path = file_path + self.encoding_priority = encoding_priority or ["utf-8-sig", "cp1252", "latin-1"] + self.transactions: List[Dict] = [] + self.file_metadata: Dict = {} + self.summary_data: Dict = {} + + def parse(self) -> Tuple[List[Dict], Dict, Dict]: + """ + Main parse method: returns (transactions, file_metadata, summary_data) + """ + try: + rows, header = self._read_rows_with_fallback() + header_map = self._prepare_header_map(header) + + + self.file_metadata = { + "source_file": os.path.basename(self.file_path), + "columns_detected": header, + "row_count": len(rows), + } + + for idx, raw in enumerate(rows, start=1): + rec = self._row_to_transaction(raw, header_map, row_num=idx) + if rec: + self.transactions.append(rec) + + self.summary_data = self._build_summary(self.transactions) + + logger.info( + f"Parsed {len(self.transactions)} rows from {self.file_path}" + ) + return self.transactions, self.file_metadata, self.summary_data + + except Exception as e: + logger.error(f"Error parsing SFTP UTR file: {e}", exc_info=True) + raise + + # ------------------------- + # Internals + # ------------------------- + + def _read_rows_with_fallback(self) -> Tuple[List[List[str]], List[str]]: + """ + Try multiple encodings. Return (rows, header) + """ + last_err = None + for enc in self.encoding_priority: + try: + with open(self.file_path, 'r', encoding=enc, errors='replace', newline='') as f: + reader = csv.reader(f, delimiter='|') + all_rows = list(reader) + if not all_rows: + raise ValueError("Empty file") + + header = [normalize_text(c) for c in all_rows[0]] + rows = [r for r in all_rows[1:]] + + logger.info(f"Read {len(rows)} data rows using encoding {enc}") + return rows, header + except Exception as e: + last_err = e + logger.warning(f"Failed to read with encoding={enc}: {e}") + continue + # If we fall through all encodings + raise last_err or RuntimeError("File read failed for all encodings") + + def _prepare_header_map(self, header: List[str]) -> Dict[int, str]: + """ + Map column index -> canonical snake_case key. + Unknown/extra headers become normalized snake_case as-is. + """ + def canon(name: str) -> str: + name = name.strip() + name = name.replace('/', '_').replace('-', '_').replace(' ', '_') + return name.lower() + + header_norm = [canon(h) for h in header] + + if len(header_norm) < len(self.EXPECTED_HEADER): + logger.warning( + f"Header has fewer columns ({len(header_norm)}) than expected ({len(self.EXPECTED_HEADER)}). " + f"Will pad rows defensively." + ) + + idx_to_key: Dict[int, str] = {} + for i, h in enumerate(header_norm): + idx_to_key[i] = h + + return idx_to_key + + def _row_to_transaction(self, row: List[str], header_map: Dict[int, str], row_num: int) -> Optional[Dict]: + """ + Convert raw CSV row to a normalized dict (no data-model mapping here). + """ + # Pad or trim to header length (defensive) + max_idx = max(header_map.keys()) if header_map else -1 + if len(row) - 1 < max_idx: + row = row + [''] * (max_idx + 1 - len(row)) + elif len(row) - 1 > max_idx: + logger.debug(f"Row {row_num} has extra fields; trimming to header size") + + # Build base dict with normalized text + raw = {header_map[i]: normalize_text(row[i] if i < len(row) else '') for i in range(max_idx + 1)} + + # Collect expected keys; leave as strings except amount where we coerce safely + txn: Dict[str, object] = {k: raw.get(k, '') for k in self.EXPECTED_HEADER} + + # Amount normalization + amt = to_decimal(str(txn.get('amount', '') or '')) + txn['amount'] = amt if amt is not None else '' + + # IFSC checks (gentle logs only) + ifsc_sender = str(txn.get('ifsc_sender') or '') + ifsc_recvr = str(txn.get('ifsc_recvr') or '') + if ifsc_sender and not validate_ifsc(ifsc_sender): + logger.debug(f"Row {row_num} sender IFSC looks non-standard: {ifsc_sender}") + if ifsc_recvr and not validate_ifsc(ifsc_recvr): + logger.debug(f"Row {row_num} receiver IFSC looks non-standard: {ifsc_recvr}") + + # TXN_DATE: keep as-is from file; ignore time entirely + txn['tran_date'] = str(txn.get('tran_date') or '') + txn['tran_time'] = '' # explicitly blank to signal unused + + # Basic sanity: UTR presence + if not str(txn.get('utr') or '').strip(): + logger.debug(f"Row {row_num} skipped: missing UTR") + return None + + return txn + + def _build_summary(self, txns: List[Dict]) -> Dict: + """ + Build compact summary: + - total_count + - amount_total + - by_status: count, amount + """ + total_count = len(txns) + amount_total = Decimal('0') + by_status: Dict[str, Dict[str, object]] = {} + + for t in txns: + amt = t.get('amount') + if isinstance(amt, Decimal): + pass + elif isinstance(amt, str): + try: + amt = Decimal(amt) + except InvalidOperation: + amt = Decimal('0') + elif amt is None: + amt = Decimal('0') + + amount_total += amt + + st = (str(t.get('status') or '')).upper() + if st not in by_status: + by_status[st] = {'count': 0, 'amount': Decimal('0')} + by_status[st]['count'] += 1 + by_status[st]['amount'] = by_status[st]['amount'] + amt + + by_status_str = {k: {'count': v['count'], 'amount': f"{v['amount']:.2f}"} for k, v in by_status.items()} + + return { + 'total_count': total_count, + 'amount_total': f"{amount_total:.2f}", + 'by_status': by_status_str + } + + +# ------------------------- +# Printing Utilities +# ------------------------- + +def print_transactions(transactions: List[Dict], limit: Optional[int] = 50): + """ + Console print (raw transaction dict view similar to UIH print). + Includes all fields except time, REJECT_CODE, and REJECT_REASON. + """ + cols = [ + ('utr', 20), + ('amount', 12), + ('status', 8), + ('journal_no', 14), + ('tran_date', 10), + ('sender_acct_name', 28), + ('remitter_acct_no', 22), + ('benef_acct_no', 22), + ('ifsc_sender', 12), + ('ifsc_recvr', 12), + ('remitter_detail', 28), + ('remmiter_info', 24), + ('beneficiary_details', 30), + ('benef_address', 30), + ('sub_msg_type', 10), + ] + header = " ".join([f"{name.upper():<{w}}" for name, w in cols]) + print("\n" + "=" * len(header)) + print(header) + print("=" * len(header)) + + shown = 0 + for txn in transactions: + row = [] + for name, w in cols: + val = txn.get(name, '') + if isinstance(val, Decimal): + val = f"{val:.2f}" + row.append(f"{str(val)[:w]:<{w}}") + print(" ".join(row)) + shown += 1 + if limit and shown >= limit: + print(f"... ({len(transactions) - shown} more rows not shown)") + break + + print("=" * len(header)) + print(f"Total transactions parsed: {len(transactions)}\n") + + +def print_metadata(metadata: Dict): + """Print file metadata (UIH-like).""" + print("\n" + "=" * 80) + print("FILE METADATA") + print("=" * 80) + for key, value in metadata.items(): + print(f"{key.upper():<20}: {value}") + print("=" * 80 + "\n") + + +def print_summary(summary: Dict): + """Print summary data.""" + if summary: + print("\n" + "=" * 80) + print("SUMMARY DATA") + print("=" * 80) + for key, value in summary.items(): + print(f"{key.upper()}: {value}") + print("=" * 80 + "\n") + + +# ------------------------- +# Runner +# ------------------------- + +if __name__ == '__main__': + from logging_config import setup_logging + + setup_logging() + + parser = SFTPUtrParser('/home/bishwajeet/test_parser/06032026_14_NEFT_INWARD.TXT') + transactions, metadata, summary = parser.parse() + + print_metadata(metadata) + print_transactions(transactions, limit=80) + print_summary(summary) + + logger.info(f"Parsing complete. Extracted {len(transactions)} transactions") \ No newline at end of file diff --git a/processors/__init__.py b/processors/__init__.py new file mode 100644 index 0000000..057259b --- /dev/null +++ b/processors/__init__.py @@ -0,0 +1,6 @@ +"""Processors module for ACH file processing.""" + +from .data_mapper import NEFTDataMapper +from .file_processor import FileProcessor + +__all__ = ['NEFTDataMapper', 'FileProcessor'] diff --git a/processors/data_mapper.py b/processors/data_mapper.py new file mode 100644 index 0000000..0ad1b7b --- /dev/null +++ b/processors/data_mapper.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +Data mapper for NEFT SFTP feed. +Maps parsed NEFT transactions (dicts) to NEFTOutwardRecord for database insertion. +- No padding of account numbers (kept as-is, trimmed). +""" + +from datetime import datetime +from decimal import Decimal +from typing import Dict, Any, List + +from logging_config import get_logger +from db.models import NEFTOutwardRecord + +logger = get_logger(__name__) + + +class NEFTDataMapper: + """Maps parsed NEFT transactions to NEFTOutwardRecord objects.""" + + # ------------------------- + # Helpers + # ------------------------- + + @staticmethod + def convert_date(date_str: str) -> str: + """ + Convert NEFT date (YYYYMMDD) to DDMMYYYY. + + Input : '20260306' + Output: '06032026' + + On error, returns today's date in DDMMYYYY format. + """ + try: + if not date_str or len(date_str.strip()) != 8 or not date_str.isdigit(): + raise ValueError(f"Invalid NEFT date format: {date_str!r}") + dt = datetime.strptime(date_str, "%Y%m%d") + return dt.strftime("%d%m%Y") + except Exception as e: + logger.error(f"Error converting date '{date_str}': {e}") + return datetime.now().strftime("%d%m%Y") + + + + + @staticmethod + def convert_amount(amount_in: Any) -> Decimal: + """ + Convert amount to Decimal and return absolute value. + Use TXNIND to capture the sign semantics. + """ + try: + if isinstance(amount_in, Decimal): + val = amount_in + else: + txt = (str(amount_in) or '').replace(',', '').strip() + val = Decimal(txt) if txt else Decimal('0') + return abs(val) + except Exception as e: + logger.error(f"Error converting amount '{amount_in}': {e}") + return Decimal('0') + + # ------------------------- + # Mapping + # ------------------------- + + @classmethod + def map_transaction(cls, parsed_txn: Dict[str, Any], bankcode: str) -> NEFTOutwardRecord: + """ + Map a single parsed NEFT transaction (dict) to NEFTOutwardRecord. + + Args: + parsed_txn: Dict emitted by SFTPUtrParser + bankcode : Bank code for this transaction (mapped to NEFTOutwardRecord.bank_code) + """ + try: + # Amount handling + amount_in = parsed_txn.get('amount', '0') + txn_amt = cls.convert_amount(amount_in) + txnind = 'DR' + + # Date handling + txn_date_raw = parsed_txn.get('tran_date', '') or '' + txn_date_ddmmyyyy = cls.convert_date(txn_date_raw) + + # Account numbers: NO padding, just trim + sender_acct = (parsed_txn.get('remitter_acct_no') or '').strip() + recvr_acct = (parsed_txn.get('benef_acct_no') or '').strip() + + + recvr_acct_name = (parsed_txn.get('beneficiary_details') or '').strip() + + record = NEFTOutwardRecord( + + bank_code=bankcode, + txnind=txnind, + jrnl_id=(parsed_txn.get('journal_no') or '').strip(), + ref_no=(parsed_txn.get('utr') or '').strip(), + txn_date=txn_date_ddmmyyyy, + txn_amt=txn_amt, + sender_ifsc=(parsed_txn.get('ifsc_sender') or '').strip(), + reciever_ifsc=(parsed_txn.get('ifsc_recvr') or '').strip(), + sender_acct_no=sender_acct, + sender_acct_name=(parsed_txn.get('sender_acct_name') or '').strip(), + recvr_acct_no=recvr_acct, + recvr_acct_name=recvr_acct_name, + reject_code=(parsed_txn.get('reject_code') or '').strip(), + reject_reason=(parsed_txn.get('reject_reason') or '').strip(), + benef_address=(parsed_txn.get('benef_address') or '').strip(), + msg_type=(parsed_txn.get('sub_msg_type') or '').strip(), + ) + + return record + + except Exception as e: + logger.error(f"Error mapping NEFT transaction: {e}", exc_info=True) + raise + + @classmethod + def map_transactions(cls, parsed_transactions: List[Dict[str, Any]], bankcode: str) -> List[NEFTOutwardRecord]: + """ + Map a list of parsed NEFT transactions to NEFTOutwardRecord objects. + + Args: + parsed_transactions: List of dicts from SFTPUtrParser + bankcode : Bank code to be applied to each record + """ + records: List[NEFTOutwardRecord] = [] + for txn in parsed_transactions: + try: + rec = cls.map_transaction(txn, bankcode) + records.append(rec) + except Exception as e: + logger.warning(f"Skipping transaction due to error: {e}") + logger.info(f"Mapped {len(records)} NEFT transactions for bank {bankcode}") + return records diff --git a/processors/file_processor.py b/processors/file_processor.py new file mode 100644 index 0000000..ef46e12 --- /dev/null +++ b/processors/file_processor.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +""" +Main file processor for end-to-end ACH file processing. +Orchestrates download, parsing, mapping, and database insertion. +""" + +import os +import tempfile +from pathlib import Path +from logging_config import get_logger +from neft_outward_parser import NEFT_OUTWARD_Parser +from db.repository import Repository +from db.models import ProcessedFile +from sftp.sftp_client import SFTPClient +from .data_mapper import NEFTDataMapper + +logger = get_logger(__name__) + + +class FileProcessor: + """Processes NEFT INWARD files end-to-end.""" + + def __init__(self, repository: Repository = None, sftp_client: SFTPClient = None): + """ + Initialize file processor. + + Args: + repository: Repository instance (optional) + sftp_client: SFTPClient instance (optional) + """ + self.repository = repository or Repository() + self.sftp_client = sftp_client or SFTPClient() + self.temp_dir = tempfile.gettempdir() + + def process_file( + self, + filename: str, + bankcode: str, + remote_path: str + ) -> bool: + """ + Process a single NEFT OUTWARD file end-to-end. + + Workflow: + 1. Download file from SFTP + 2. Parse using NEFT_OUTWARD_Parser + 3. Map to database format + 4. Insert to database + 5. Mark as processed + 6. Cleanup local file + + Args: + filename: Name of file to process + bankcode: Bank code for this file + remote_path: Full remote path on SFTP + + Returns: + True if successful, False otherwise + """ + local_path = os.path.join(self.temp_dir, filename) + + try: + logger.info(f"Starting processing: {filename} (bank: {bankcode})") + + # Step 1: Check if already processed for this bank + if self.repository.is_file_processed(filename, bankcode): + logger.info(f"File already processed for {bankcode}: {filename}") + return True + + # Step 2: Download file + if not self.sftp_client.download_file(remote_path, local_path): + raise Exception(f"Failed to download file: {remote_path}") + + # Step 3: Parse file + + + parser = NEFT_OUTWARD_Parser(local_path) + + + transactions, metadata, summary = parser.parse() + + if not transactions: + logger.warning(f"No transactions found in {filename}") + # Still mark as processed but with 0 transactions + processed_file = ProcessedFile( + filename=filename, + bankcode=bankcode, + file_path=remote_path, + transaction_count=0, + status='SUCCESS' + ) + self.repository.mark_file_processed(processed_file) + return True + + # Step 4: Map transactions + mapped_records = NEFTDataMapper.map_transactions(transactions, bankcode) + + # Step 5: Insert to database (with account validation) + inserted_count, skipped_count = self.repository.bulk_insert_transactions(mapped_records) + + # Step 6: Mark file as processed + processed_file = ProcessedFile( + filename=filename, + bankcode=bankcode, + file_path=remote_path, + transaction_count=inserted_count, + status='SUCCESS' + ) + self.repository.mark_file_processed(processed_file) + + logger.info(f"Successfully processed {filename}: {inserted_count} inserted, {skipped_count} skipped (non-ipks accounts)") + return True + + except Exception as e: + logger.error(f"Error processing {filename}: {e}", exc_info=True) + + # Mark file as failed + try: + processed_file = ProcessedFile( + filename=filename, + bankcode=bankcode, + file_path=remote_path, + transaction_count=0, + status='FAILED', + error_message=str(e)[:2000] + ) + self.repository.mark_file_processed(processed_file) + except Exception as mark_error: + logger.error(f"Failed to mark file as failed: {mark_error}") + + return False + + finally: + # Cleanup local file + try: + if os.path.exists(local_path): + os.remove(local_path) + logger.debug(f"Cleaned up local file: {local_path}") + except Exception as e: + logger.warning(f"Error cleaning up local file {local_path}: {e}") + + def process_files(self, files_to_process: list) -> dict: + """ + Process multiple files. + + Args: + files_to_process: List of (filename, bankcode, remote_path) tuples + + Returns: + Dictionary with processing statistics + """ + stats = { + 'total': len(files_to_process), + 'successful': 0, + 'failed': 0, + 'files': [] + } + + for filename, bankcode, remote_path in files_to_process: + success = self.process_file(filename, bankcode, remote_path) + stats['successful'] += 1 if success else 0 + stats['failed'] += 0 if success else 1 + stats['files'].append({ + 'filename': filename, + 'bankcode': bankcode, + 'success': success + }) + + logger.info(f"Processing complete: {stats['successful']}/{stats['total']} successful") + return stats + + def __enter__(self): + """Context manager entry.""" + if self.sftp_client and not self.sftp_client.sftp: + self.sftp_client.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + if self.sftp_client: + self.sftp_client.disconnect() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f55780d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,23 @@ +# Core dependencies +python-dotenv==1.0.0 + +# Database (modern Oracle driver - simpler than cx_Oracle) +oracledb==2.0.0 + +# SFTP +paramiko==3.4.0 +cryptography==41.0.7 + +# Scheduling +schedule==1.2.0 + +# Configuration +python-decouple==3.8 + +# Timezone support +pytz==2023.3 + +# Development dependencies +pytest==7.4.0 +black==23.7.0 +flake8==6.0.0 diff --git a/scheduler.py b/scheduler.py new file mode 100644 index 0000000..8e0f3c9 --- /dev/null +++ b/scheduler.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +""" +NEFT file processing scheduler. +Runs polling loop every 30 minutes to process new files. +""" + +import signal +import time +import sys +from datetime import datetime +from logging_config import get_logger, setup_logging +from config import get_config +from db import OracleConnector, Repository +from sftp import SFTPClient, FileMonitor +from processors import FileProcessor + +logger = get_logger(__name__) + + +class Scheduler: + """Main scheduler for NEFT file processing.""" + + def __init__(self): + """Initialize scheduler.""" + self.config = get_config() + self.config.validate() + self.running = True + self.cycle_count = 0 + + # Setup signal handlers for graceful shutdown + signal.signal(signal.SIGTERM, self._signal_handler) + signal.signal(signal.SIGINT, self._signal_handler) + + def _signal_handler(self, signum, frame): + """Handle shutdown signals gracefully.""" + logger.info(f"Received signal {signum}, shutting down gracefully...") + self.running = False + + def initialize_database(self): + """Initialize database connection and verify tables exist.""" + try: + connector = OracleConnector() + if connector.test_connection(): + logger.info("Database connection test passed") + repository = Repository() + repository.verify_tables_exist() + return True + else: + logger.error("Database connection test failed") + return False + except SystemExit as e: + logger.error(f"Database initialization failed: {e}") + raise + except Exception as e: + logger.error(f"Error initializing database: {e}", exc_info=True) + return False + + def run_processing_cycle(self): + """Run single file processing cycle.""" + self.cycle_count += 1 + logger.info(f"=== Starting processing cycle {self.cycle_count} ===") + + sftp_client = SFTPClient() + repository = Repository() + + try: + # Connect to SFTP + if not sftp_client.connect(): + logger.error("Failed to connect to SFTP server") + return + + # Scan for new files across all banks + monitor = FileMonitor(sftp_client) + new_files = [] + today_str = datetime.now().strftime("%d%m%Y") + logger.info(f'listing file for {today_str}') + + for bank_code in self.config.bank_codes: + # Get list of files already processed for this specific bank + bank_processed = repository.get_processed_files(bank_code) + remote_path = f"{self.config.sftp_base_path}/{bank_code}/NEFT" + + + pattern = f"{today_str}_*_NEFT_OUTWARD.TXT" + files = sftp_client.list_files(remote_path, pattern=pattern) + + + for filename in files: + if filename not in bank_processed: + full_path = f"{remote_path}/{filename}" + new_files.append((filename, bank_code, full_path)) + logger.info(f"Found new file: {filename} (bank: {bank_code})") + else: + logger.debug(f"Skipping already processed file for {bank_code}: {filename}") + + if not new_files: + logger.info("No new files to process") + return + + logger.info(f"Found {len(new_files)} new files to process") + + # Process files + processor = FileProcessor(repository, sftp_client) + stats = processor.process_files(new_files) + + # Log summary + logger.info(f"Cycle {self.cycle_count} complete:") + logger.info(f" Total files: {stats['total']}") + logger.info(f" Successful: {stats['successful']}") + logger.info(f" Failed: {stats['failed']}") + + # Call neft_api_txn_post procedure once per cycle to process all inserted transactions + if stats['successful'] > 0: + logger.info("Calling neft_api_txn_post procedure for all inserted transactions...") + if repository.call_neft_api_txn_post(): + logger.info("Transaction post-processing completed successfully") + else: + logger.error("Transaction post-processing failed") + + except Exception as e: + logger.error(f"Error in processing cycle: {e}", exc_info=True) + + finally: + sftp_client.disconnect() + + def run(self): + """Run scheduler main loop.""" + logger.info("="*80) + logger.info("NEFT_OUTWARD File Processing Scheduler Started") + logger.info(f"Poll Interval: {self.config.poll_interval_minutes} minutes") + logger.info(f"Bank Codes: {', '.join(self.config.bank_codes)}") + logger.info("="*80) + + # Initialize database + try: + if not self.initialize_database(): + logger.error("Failed to initialize database. Exiting.") + return + except SystemExit as e: + logger.error(f"Fatal error: {e}") + raise + + # Run processing loop + poll_interval_seconds = self.config.poll_interval_minutes * 60 + + while self.running: + try: + self.run_processing_cycle() + except Exception as e: + logger.error(f"Unexpected error in processing cycle: {e}", exc_info=True) + + # Wait for next cycle + if self.running: + logger.info(f"Waiting {self.config.poll_interval_minutes} minutes until next cycle...") + time.sleep(poll_interval_seconds) + + logger.info("Scheduler shutdown complete") + + +def main(): + """Main entry point.""" + # Setup logging + setup_logging() + + # Create and run scheduler + scheduler = Scheduler() + scheduler.run() + + +if __name__ == '__main__': + main() diff --git a/sftp/__init__.py b/sftp/__init__.py new file mode 100644 index 0000000..588ec13 --- /dev/null +++ b/sftp/__init__.py @@ -0,0 +1,6 @@ +"""SFTP module for ACH file processing.""" + +from .sftp_client import SFTPClient +from .file_monitor import FileMonitor + +__all__ = ['SFTPClient', 'FileMonitor'] diff --git a/sftp/file_monitor.py b/sftp/file_monitor.py new file mode 100644 index 0000000..9f74615 --- /dev/null +++ b/sftp/file_monitor.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +""" +File monitoring and discovery for NEFT Inward files. +Scans SFTP directories for new files across multiple banks. + +Filename convention handled: + DDMMYYYY_HH_NEFT_INWARD.TXT +Example: + 06032026_14_NEFT_INWARD.TXT +""" + +import re +from typing import List, Tuple, Dict +from logging_config import get_logger +from config import get_config +from .sftp_client import SFTPClient + +logger = get_logger(__name__) + + +class FileMonitor: + """Monitors SFTP for new NEFT Outward files.""" + + def __init__(self, sftp_client: SFTPClient = None): + """ + Initialize file monitor. + + Args: + sftp_client: SFTPClient instance (optional) + """ + self.config = get_config() + self.sftp_client = sftp_client or SFTPClient() + + def scan_for_new_files(self, processed_filenames: List[str]) -> List[Tuple[str, str, str]]: + """ + Scan all bank directories for new NEFT files. + + Args: + processed_filenames: List of already processed filenames to skip + + Returns: + List of (filename, bankcode, full_remote_path) tuples + """ + new_files: List[Tuple[str, str, str]] = [] + + for bank_code in self.config.bank_codes: + # Adjust subfolder name here if required (e.g., 'NEFT_INWARD' or other) + remote_path = f"{self.config.sftp_base_path}/{bank_code}/NEFT" + + # Match any NEFT inward file for any date/hour + files = self.sftp_client.list_files(remote_path, pattern='*_NEFT_OUTWARD.TXT') + + for filename in files: + if filename not in processed_filenames: + full_path = f"{remote_path}/{filename}" + new_files.append((filename, bank_code, full_path)) + logger.info(f"Found new NEFT file: {filename} (bank: {bank_code})") + else: + logger.debug(f"Skipping already processed NEFT file: {filename}") + + logger.info(f"NEFT scan complete: Found {len(new_files)} new files") + return new_files + + @staticmethod + def parse_filename(filename: str) -> Dict[str, str]: + """ + Parse NEFT filename to extract metadata. + + Expected format: + DDMMYYYY_HH_NEFT_OUTWARD.TXT + Example: + 06032026_14_NEFT_OUTWARD.TXT + + Args: + filename: Filename to parse + + Returns: + Dictionary with extracted metadata or empty dict if parse fails + """ + # Groups: DD, MM, YYYY, HH + pattern = r'^(\d{2})(\d{2})(\d{4})_(\d{2})_NEFT_OUTWARD\.TXT$' + match = re.match(pattern, filename, flags=re.IGNORECASE) + + if not match: + logger.warning(f"Could not parse NEFT filename: {filename}") + return {} + + day, month, year, hour = match.groups() + + return { + 'filename': filename, + 'day': day, + 'month': month, + 'year': year, + 'hour': hour, + 'timestamp': f"{day}/{month}/{year} {hour}:00:00" + } + + def __enter__(self): + """Context manager entry.""" + if not self.sftp_client.sftp: + self.sftp_client.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.sftp_client.disconnect() + diff --git a/sftp/sftp_client.py b/sftp/sftp_client.py new file mode 100644 index 0000000..b9a97d8 --- /dev/null +++ b/sftp/sftp_client.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +SFTP client for file operations. +Handles connection, file discovery, and download operations. +""" + +import paramiko +import os +from pathlib import Path +from logging_config import get_logger +from config import get_config + +logger = get_logger(__name__) + + +class SFTPClient: + """SFTP operations for NEFT file processing.""" + + def __init__(self): + """Initialize SFTP client.""" + self.config = get_config() + self.sftp = None + self.ssh = None + + def connect(self) -> bool: + """ + Establish SFTP connection. + + Returns: + True if successful, False otherwise + """ + try: + # Create SSH client + self.ssh = paramiko.SSHClient() + self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + # Connect + self.ssh.connect( + self.config.sftp_host, + port=self.config.sftp_port, + username=self.config.sftp_username, + password=self.config.sftp_password, + timeout=10 + ) + + # Get SFTP channel + self.sftp = self.ssh.open_sftp() + logger.info(f"Connected to SFTP server: {self.config.sftp_host}:{self.config.sftp_port}") + return True + + except Exception as e: + logger.error(f"Failed to connect to SFTP server: {e}", exc_info=True) + return False + + def disconnect(self): + """Close SFTP connection.""" + try: + if self.sftp: + self.sftp.close() + if self.ssh: + self.ssh.close() + logger.info("SFTP connection closed") + except Exception as e: + logger.error(f"Error closing SFTP connection: {e}") + + def list_files(self, remote_path: str, pattern: str) -> list: + """ + List files matching pattern in remote directory. + + Args: + remote_path: Path on SFTP server + + + Returns: + List of matching filenames + """ + if not self.sftp: + logger.error("SFTP not connected") + return [] + + try: + files = [] + try: + items = self.sftp.listdir_attr(remote_path) + except FileNotFoundError: + logger.warning(f"Directory not found: {remote_path}") + return [] + + import fnmatch + for item in items: + if fnmatch.fnmatch(item.filename, pattern): + files.append(item.filename) + + logger.debug(f"Found {len(files)} files matching {pattern} in {remote_path}") + return sorted(files) + + except Exception as e: + logger.error(f"Error listing files in {remote_path}: {e}", exc_info=True) + return [] + + def download_file(self, remote_path: str, local_path: str) -> bool: + """ + Download file from SFTP server. + + Args: + remote_path: Full path on SFTP server + local_path: Local destination path + + Returns: + True if successful, False otherwise + """ + if not self.sftp: + logger.error("SFTP not connected") + return False + + try: + # Create local directory if needed + Path(local_path).parent.mkdir(parents=True, exist_ok=True) + + # Download file + self.sftp.get(remote_path, local_path) + logger.info(f"Downloaded file: {remote_path} -> {local_path}") + return True + + except Exception as e: + logger.error(f"Error downloading file {remote_path}: {e}", exc_info=True) + return False + + def get_file_size(self, remote_path: str) -> int: + """ + Get size of remote file. + + Args: + remote_path: Full path on SFTP server + + Returns: + File size in bytes, or -1 if error + """ + if not self.sftp: + logger.error("SFTP not connected") + return -1 + + try: + stat = self.sftp.stat(remote_path) + return stat.st_size + except Exception as e: + logger.error(f"Error getting file size {remote_path}: {e}") + return -1 + + def __enter__(self): + """Context manager entry.""" + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.disconnect()