Initial commit

This commit is contained in:
2026-03-12 12:46:33 +05:30
commit a2be502225
18 changed files with 1841 additions and 0 deletions

View File

@@ -0,0 +1,10 @@
{
"permissions": {
"allow": [
"Bash(python3 -m venv:*)",
"Bash(source venv/bin/activate)",
"Bash(python:*)",
"Bash(pip install:*)"
]
}
}

23
.env.example Normal file
View File

@@ -0,0 +1,23 @@
# Database Configuration
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB
DB_POOL_MIN=2
DB_POOL_MAX=10
# SFTP Configuration
SFTP_HOST=192.168.1.100
SFTP_PORT=22
SFTP_USERNAME=ipks
SFTP_PASSWORD=secure_password
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS
# Processing Configuration
POLL_INTERVAL_MINUTES=30
BATCH_SIZE=100
BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB
# Logging Configuration
LOG_LEVEL=INFO

92
config.py Normal file
View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""
Configuration management for ACH file processing pipeline.
Loads and validates environment variables.
"""
import os
from pathlib import Path
from logging_config import get_logger
logger = get_logger(__name__)
class Config:
"""Application configuration from environment variables."""
def __init__(self):
"""Initialize configuration from environment."""
self._validate_env_file()
self._load_database_config()
self._load_sftp_config()
self._load_processing_config()
def _validate_env_file(self):
"""Check if .env file exists."""
if not Path('.env').exists():
logger.warning(".env file not found. Using environment variables or defaults.")
def _load_database_config(self):
"""Load database configuration."""
self.db_user = os.getenv('DB_USER', 'pacs_db')
self.db_password = os.getenv('DB_PASSWORD', 'pacs_db')
self.db_host = os.getenv('DB_HOST', 'testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com')
self.db_port = int(os.getenv('DB_PORT', '1521'))
self.db_service_name = os.getenv('DB_SERVICE_NAME', 'IPKSDB')
self.db_pool_min = int(os.getenv('DB_POOL_MIN', '2'))
self.db_pool_max = int(os.getenv('DB_POOL_MAX', '10'))
def _load_sftp_config(self):
"""Load SFTP configuration."""
self.sftp_host = os.getenv('SFTP_HOST', '43.225.3.224')
self.sftp_port = int(os.getenv('SFTP_PORT', '4650'))
self.sftp_username = os.getenv('SFTP_USERNAME', 'ipkssftp')
self.sftp_password = os.getenv('SFTP_PASSWORD', 'Wnb10U11BE7N26')
self.sftp_base_path = os.getenv('SFTP_BASE_PATH', '/home/ipks/IPKS_FILES/REPORTS')
def _load_processing_config(self):
"""Load processing configuration."""
self.poll_interval_minutes = int(os.getenv('POLL_INTERVAL_MINUTES', '30'))
self.batch_size = int(os.getenv('BATCH_SIZE', '100'))
self.bank_codes = self._parse_bank_codes()
self.log_level = os.getenv('LOG_LEVEL', 'INFO')
def _parse_bank_codes(self):
"""Parse bank codes from comma-separated environment variable."""
codes_str = os.getenv('BANK_CODES', '0001,0002,0003,0004,0005,0006,0007,0009,0012,0013,0014,0015,0016,0017,0018,0020,0021')
return [code.strip() for code in codes_str.split(',') if code.strip()]
def get_db_connection_string(self):
"""Generate Oracle connection string."""
return f"{self.db_user}/{self.db_password}@{self.db_host}:{self.db_port}/{self.db_service_name}"
def validate(self):
"""Validate critical configuration."""
if not self.db_user or not self.db_password:
raise ValueError("Database credentials not configured")
if not self.sftp_username:
logger.warning("SFTP username not configured")
if not self.bank_codes:
raise ValueError("No bank codes configured")
logger.info(f"Configuration validated. Bank codes: {', '.join(self.bank_codes)}")
# Global config instance
config = None
def get_config():
"""Get or create global config instance."""
global config
if config is None:
config = Config()
return config
if __name__ == '__main__':
cfg = get_config()
cfg.validate()
print(f"Bank Codes: {cfg.bank_codes}")
print(f"SFTP Host: {cfg.sftp_host}:{cfg.sftp_port}")
print(f"Database: {cfg.db_host}:{cfg.db_port}/{cfg.db_service_name}")
print(f"Poll Interval: {cfg.poll_interval_minutes} minutes")

6
db/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
"""Database module for ACH file processing."""
from .oracle_connector import OracleConnector
from .repository import Repository
__all__ = ['OracleConnector', 'Repository']

77
db/models.py Normal file
View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
"""
Data models for ACH file processing.
Represents database records and transactions.
"""
from dataclasses import dataclass, asdict
from datetime import date, datetime
from decimal import Decimal
from typing import Optional
@dataclass
class NEFTOutwardRecord:
"""Represents a parsed NEFT Inward transaction mapped to DB columns."""
txnind: str # VARCHAR2(2), default "DR"
jrnl_id: str # VARCHAR2(4000), NOT NULL
ref_no: str # VARCHAR2(400), NOT NULL
txn_date: str # VARCHAR2(100), NOT NULL
txn_amt: Optional[Decimal] # NUMBER(17,2)
sender_ifsc: str # VARCHAR2(400)
reciever_ifsc: str # VARCHAR2(400)
sender_acct_no: str # VARCHAR2(400)
sender_acct_name: str # VARCHAR2(400)
recvr_acct_no: str # VARCHAR2(400)
recvr_acct_name: str # VARCHAR2(400)
reject_code: str # VARCHAR2(400)
reject_reason: str # VARCHAR2(400)
benef_address: str # VARCHAR2(400)
msg_type: str # VARCHAR2(400)
bank_code: str
def to_dict(self):
"""Convert to dictionary for DB insertion."""
return {
"TXNIND": self.txnind,
"BANKCODE": self.bank_code,
"JRNL_ID": self.jrnl_id,
"REF_NO": self.ref_no,
"TRAN_DATE": self.txn_date,
"TXN_AMT": self.txn_amt,
"SENDER_IFSC": self.sender_ifsc,
"RECIEVER_IFSC": self.reciever_ifsc,
"SENDER_ACCT_NO": self.sender_acct_no,
"SENDER_NAME": self.sender_acct_name,
"RECVR_ACCT_NO": self.recvr_acct_no,
"RECIEVER_NAME": self.recvr_acct_name,
"REJECT_CODE": self.reject_code,
"REJECT_REASON": self.reject_reason,
"BENEFICIARY_ADDRESS": self.benef_address,
"MSG_TYPE": self.msg_type,
}
@dataclass
class ProcessedFile:
"""Represents a processed file record for ach_processed_files table."""
filename: str
bankcode: str
file_path: str
transaction_count: int
status: str = 'SUCCESS'
error_message: Optional[str] = None
processed_at: Optional[datetime] = None
def to_dict(self):
"""Convert to dictionary for database insertion."""
return {
'filename': self.filename,
'bankcode': self.bankcode,
'file_path': self.file_path,
'transaction_count': self.transaction_count,
'status': self.status,
'error_message': self.error_message,
'processed_at': self.processed_at or datetime.now(),
}

111
db/oracle_connector.py Normal file
View File

@@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""
Oracle database connection pool manager using oracledb.
Manages connections with pooling and health checks.
oracledb is the modern, simpler replacement for cx_Oracle.
No Oracle Instant Client required - uses Thick or Thin mode.
"""
import oracledb
from logging_config import get_logger
from config import get_config
logger = get_logger(__name__)
class OracleConnector:
"""Manages Oracle database connections with pooling."""
def __init__(self):
"""Initialize connection pool."""
self.pool = None
self.config = get_config()
def initialize_pool(self):
"""Create connection pool."""
try:
# Build connection string for oracledb
# Format: user/password@host:port/service_name
connection_string = (
f"{self.config.db_user}/{self.config.db_password}@"
f"{self.config.db_host}:{self.config.db_port}/{self.config.db_service_name}"
)
# Create connection pool using oracledb API
# Note: oracledb uses 'min' and 'max' for pool sizing
self.pool = oracledb.create_pool(
dsn=connection_string,
min=self.config.db_pool_min,
max=self.config.db_pool_max,
increment=1,
)
logger.debug(f"Oracle connection pool initialized: min={self.config.db_pool_min}, max={self.config.db_pool_max}")
return True
except oracledb.DatabaseError as e:
logger.error(f"Failed to initialize connection pool: {e}", exc_info=True)
return False
except Exception as e:
logger.error(f"Unexpected error initializing pool: {e}", exc_info=True)
return False
def get_connection(self):
"""Get connection from pool."""
if not self.pool:
self.initialize_pool()
try:
conn = self.pool.acquire()
logger.debug("Connection acquired from pool")
return conn
except oracledb.DatabaseError as e:
logger.error(f"Failed to acquire connection: {e}", exc_info=True)
raise
except Exception as e:
logger.error(f"Unexpected error acquiring connection: {e}", exc_info=True)
raise
def close_pool(self):
"""Close connection pool."""
if self.pool:
try:
self.pool.close()
logger.info("Connection pool closed")
except Exception as e:
logger.error(f"Error closing pool: {e}")
def test_connection(self):
"""Test database connectivity."""
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM dual")
result = cursor.fetchone()
cursor.close()
conn.close()
logger.debug("Database connection test successful")
return True
except Exception as e:
logger.error(f"Database connection test failed: {e}")
return False
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close_pool()
# Global connector instance
_connector = None
def get_connector():
"""Get or create global connector instance."""
global _connector
if _connector is None:
_connector = OracleConnector()
return _connector

307
db/repository.py Normal file
View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""
Data access layer for NEFT inward file processing.
Handles CRUD operations and transaction management.
"""
from typing import List, Optional
from logging_config import get_logger
from .oracle_connector import get_connector
from .models import NEFTInwardRecord, ProcessedFile
logger = get_logger(__name__)
class Repository:
"""Data access layer for NEFT inward processing."""
def __init__(self):
"""Initialize repository with connector."""
self.connector = get_connector()
# ---------------------------------------------------------
# ADDED: Account validation using last 12 digits of RECVR_ACCT_NO
# ---------------------------------------------------------
def validate_account_exists(self, account_number: str) -> bool:
"""
Validate if account number exists in dep_account table.
Args:
account_number: Beneficiary account number (RECVR_ACCT_NO)
Returns:
True if account exists in dep_account.link_accno, False otherwise
"""
if not account_number:
return False
last12 = str(account_number)[-12:]
conn = self.connector.get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM dep_account WHERE link_accno = :accno",
{'accno': last12}
)
count = cursor.fetchone()[0]
return count > 0
except Exception as e:
logger.warning(f"Error validating account {account_number}: {e}")
return False
finally:
cursor.close()
conn.close()
# ---------------------------------------------------------
# UPDATED: bulk_insert_transactions WITH VALIDATION
# ---------------------------------------------------------
def bulk_insert_transactions(self, transactions: List[NEFTInwardRecord]) -> tuple:
"""
Bulk insert NEFT transactions into inward_neft_api_log.
Records with invalid beneficiary account numbers are skipped.
Args:
transactions: List of NEFTOutwardRecord objects
Returns:
(inserted_count, skipped_count)
"""
if not transactions:
logger.warning("No transactions to insert")
return 0, 0
valid_transactions = []
skipped_count = 0
for txn in transactions:
acct = txn.sender_acct_no
if self.validate_account_exists(acct):
valid_transactions.append(txn)
else:
skipped_count += 1
if not valid_transactions:
logger.debug(f"All {skipped_count} transactions skipped (invalid Remitter accounts)")
return 0, skipped_count
conn = self.connector.get_connection()
cursor = None
try:
cursor = conn.cursor()
batch_data = [txn.to_dict() for txn in valid_transactions]
logger.info(batch_data)
insert_sql = """
INSERT INTO outward_neft_api_log (
TXNIND,
JRNL_ID,
BANKCODE,
REF_NO,
TRAN_DATE,
TXN_AMT,
SENDER_IFSC,
RECIEVER_IFSC,
SENDER_ACCT_NO,
SENDER_NAME,
RECVR_ACCT_NO,
RECIEVER_NAME,
REJECT_CODE,
REJECT_REASON,
BENEFICIARY_ADDRESS,
MSG_TYPE
) VALUES (
:TXNIND,
:JRNL_ID,
:BANKCODE,
:REF_NO,
:TRAN_DATE,
:TXN_AMT,
:SENDER_IFSC,
:RECIEVER_IFSC,
:SENDER_ACCT_NO,
:SENDER_NAME,
:RECVR_ACCT_NO,
:RECIEVER_NAME,
:REJECT_CODE,
:REJECT_REASON,
:BENEFICIARY_ADDRESS,
:MSG_TYPE
)
"""
cursor.executemany(insert_sql, batch_data)
conn.commit()
inserted_count = len(valid_transactions)
logger.info(f"Inserted {inserted_count} NEFT transactions into outward_neft_api_log")
return inserted_count, skipped_count
except Exception as e:
if conn:
conn.rollback()
logger.error(f"Error inserting NEFT transactions: {e}", exc_info=True)
raise
finally:
if cursor:
cursor.close()
conn.close()
# ---------------------------------------------------------
# NOTHING ELSE BELOW THIS LINE WAS TOUCHED
# ---------------------------------------------------------
def is_file_processed(self, filename: str, bankcode: str) -> bool:
conn = self.connector.get_connection()
cursor = None
try:
cursor = conn.cursor()
cursor.execute(
"""
SELECT COUNT(*)
FROM neft_processed_files
WHERE filename = :filename
AND bankcode = :bankcode
""",
{'filename': filename, 'bankcode': bankcode}
)
count = cursor.fetchone()[0]
return count > 0
except Exception as e:
logger.error(f"Error checking processed file: {e}", exc_info=True)
return False
finally:
if cursor:
cursor.close()
conn.close()
def mark_file_processed(self, processed_file: ProcessedFile) -> bool:
conn = self.connector.get_connection()
cursor = None
try:
cursor = conn.cursor()
file_data = processed_file.to_dict()
insert_sql = """
INSERT INTO neft_processed_files (
filename, bankcode, file_path, transaction_count,
status, error_message, processed_at
) VALUES (
:filename, :bankcode, :file_path, :transaction_count,
:status, :error_message, :processed_at
)
"""
cursor.execute(insert_sql, file_data)
conn.commit()
logger.info(f"Marked file as processed: {processed_file.filename}")
return True
except Exception as e:
if conn:
conn.rollback()
logger.error(f"Error marking file as processed: {e}", exc_info=True)
return False
finally:
if cursor:
cursor.close()
conn.close()
def get_processed_files(self, bankcode: Optional[str] = None) -> List[str]:
conn = self.connector.get_connection()
cursor = None
try:
cursor = conn.cursor()
if bankcode:
cursor.execute(
"""
SELECT filename
FROM neft_processed_files
WHERE bankcode = :bankcode
ORDER BY processed_at DESC
""",
{'bankcode': bankcode}
)
else:
cursor.execute(
"""
SELECT filename
FROM neft_processed_files
ORDER BY processed_at DESC
"""
)
filenames = [row[0] for row in cursor.fetchall()]
return filenames
except Exception as e:
logger.error(f"Error retrieving processed files: {e}", exc_info=True)
return []
finally:
if cursor:
cursor.close()
conn.close()
def call_neft_api_txn_post(self) -> bool:
conn = self.connector.get_connection()
cursor = None
try:
cursor = conn.cursor()
logger.info("Calling neft_api_txn_post procedure to process all inserted transactions...")
try:
cursor.callproc('neft_api_txn_post')
except Exception:
cursor.execute("BEGIN neft_api_txn_post; END;")
conn.commit()
logger.info("neft_api_txn_post procedure executed successfully")
return True
except Exception as e:
logger.error(f"Error calling neft_api_txn_post procedure: {e}", exc_info=True)
return False
finally:
if cursor:
cursor.close()
conn.close()
def verify_tables_exist(self):
conn = self.connector.get_connection()
cursor = None
try:
cursor = conn.cursor()
try:
cursor.execute("SELECT COUNT(*) FROM inward_neft_api_log WHERE ROWNUM = 1")
logger.info("✓ inward_neft_api_log table exists")
except Exception as e:
logger.error(f"✗ inward_neft_api_log table not found: {e}")
raise SystemExit(
"FATAL: inward_neft_api_log table must be created manually before running this application"
)
try:
cursor.execute("SELECT COUNT(*) FROM neft_processed_files WHERE ROWNUM = 1")
logger.info("✓ neft_processed_files table exists")
except Exception as e:
logger.error(f"✗ neft_processed_files table not found: {e}")
raise SystemExit(
"FATAL: neft_processed_files table must be created manually before running this application"
)
logger.info("Database tables verified successfully")
except SystemExit:
raise
except Exception as e:
logger.error(f"Error verifying tables: {e}", exc_info=True)
raise SystemExit(f"FATAL: Error verifying database tables: {e}")
finally:
if cursor:
cursor.close()
conn.close()

51
logging_config.py Normal file
View File

@@ -0,0 +1,51 @@
import logging
import logging.handlers
import os
from pathlib import Path
def setup_logging(log_level=logging.INFO, log_dir="logs"):
"""
Configure logging with both console and file handlers.
Args:
log_level: logging level (default: logging.INFO)
log_dir: directory to store log files
"""
# Create logs directory if it doesn't exist
Path(log_dir).mkdir(exist_ok=True)
# Get root logger
logger = logging.getLogger()
logger.setLevel(log_level)
# Clear existing handlers
logger.handlers.clear()
# Create formatter
formatter = logging.Formatter(
fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler (rotating)
log_file = os.path.join(log_dir, 'app.log')
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5
)
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def get_logger(name):
"""Get a logger instance for a specific module."""
return logging.getLogger(name)

34
neft_outward.py Normal file
View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python3
"""
Main application entry point.
Runs ACH file processing scheduler.
"""
import logging
from logging_config import setup_logging, get_logger
from scheduler import Scheduler
# Initialize logging
logging.getLogger("paramiko").setLevel(logging.WARNING)
logger = setup_logging(log_level=logging.INFO)
app_logger = get_logger(__name__)
def main():
"""Main application function."""
app_logger.info("Application started")
try:
# Run the scheduler
scheduler = Scheduler()
scheduler.run()
app_logger.info("Application completed successfully")
except KeyboardInterrupt:
app_logger.info("Application interrupted by user")
except Exception as e:
app_logger.error(f"An error occurred: {e}", exc_info=True)
raise
if __name__ == "__main__":
main()

341
neft_outward_parser.py Normal file
View File

@@ -0,0 +1,341 @@
#!/usr/bin/env python3
"""
UTR Pipe-Delimited File Parser (SFTP feed)
- Robust parsing for files with '|' separator and inconsistent whitespace.
- Returns (transactions, file_metadata, summary_data) exactly like UIHParser style.
- TXN_DATE is left as-is from the file (no time concatenation or conversion).
"""
import csv
import os
import re
from decimal import Decimal, InvalidOperation
from typing import Dict, List, Tuple, Optional
from logging_config import get_logger
logger = get_logger(__name__)
# -------------------------
# Helpers & Normalization
# -------------------------
WS_COLLAPSE_RE = re.compile(r'[ \t\u00A0]+')
def normalize_text(s: Optional[str]) -> str:
"""
Normalize internal whitespace to single spaces, strip ends.
Keep None as ''.
"""
if s is None:
return ''
s = s.replace('\u00A0', ' ')
s = WS_COLLAPSE_RE.sub(' ', s)
return s.strip()
def to_decimal(value: str) -> Optional[Decimal]:
if value is None:
return None
v = value.replace(',', '').strip()
if v == '':
return None
try:
return Decimal(v)
except InvalidOperation:
logger.warning(f"Amount not a valid decimal: {value!r}")
return None
IFSC_RE = re.compile(r'^[A-Z]{4}0[A-Z0-9]{6}$', re.IGNORECASE)
def validate_ifsc(code: str) -> bool:
"""
Gentle IFSC validation: standard format is 11 chars (AAAA0XXXXXX).
Returns False if it doesn't match; NEVER rejects the record.
"""
if not code:
return False
return bool(IFSC_RE.match(code))
# -------------------------
# Parser
# -------------------------
class NEFT_OUTWARD_Parser:
"""
Parser for SFTP UTR pipe-delimited files.
Returns: (transactions, file_metadata, summary_data)
"""
# Canonical order (maps to snake_case keys) as they appear in the file header
EXPECTED_HEADER = [
"utr", "amount", "sender_acct_name", "remitter_detail", "remmiter_info",
"benef_address", "reject_code", "reject_reason", "journal_no",
"status", "sub_msg_type", "tran_date", "tran_time", "ifsc_sender",
"ifsc_recvr", "remitter_acct_no", "benef_acct_no",
"remitter_details", "beneficiary_details",
]
def __init__(self, file_path: str, encoding_priority: Optional[List[str]] = None):
self.file_path = file_path
self.encoding_priority = encoding_priority or ["utf-8-sig", "cp1252", "latin-1"]
self.transactions: List[Dict] = []
self.file_metadata: Dict = {}
self.summary_data: Dict = {}
def parse(self) -> Tuple[List[Dict], Dict, Dict]:
"""
Main parse method: returns (transactions, file_metadata, summary_data)
"""
try:
rows, header = self._read_rows_with_fallback()
header_map = self._prepare_header_map(header)
self.file_metadata = {
"source_file": os.path.basename(self.file_path),
"columns_detected": header,
"row_count": len(rows),
}
for idx, raw in enumerate(rows, start=1):
rec = self._row_to_transaction(raw, header_map, row_num=idx)
if rec:
self.transactions.append(rec)
self.summary_data = self._build_summary(self.transactions)
logger.info(
f"Parsed {len(self.transactions)} rows from {self.file_path}"
)
return self.transactions, self.file_metadata, self.summary_data
except Exception as e:
logger.error(f"Error parsing SFTP UTR file: {e}", exc_info=True)
raise
# -------------------------
# Internals
# -------------------------
def _read_rows_with_fallback(self) -> Tuple[List[List[str]], List[str]]:
"""
Try multiple encodings. Return (rows, header)
"""
last_err = None
for enc in self.encoding_priority:
try:
with open(self.file_path, 'r', encoding=enc, errors='replace', newline='') as f:
reader = csv.reader(f, delimiter='|')
all_rows = list(reader)
if not all_rows:
raise ValueError("Empty file")
header = [normalize_text(c) for c in all_rows[0]]
rows = [r for r in all_rows[1:]]
logger.info(f"Read {len(rows)} data rows using encoding {enc}")
return rows, header
except Exception as e:
last_err = e
logger.warning(f"Failed to read with encoding={enc}: {e}")
continue
# If we fall through all encodings
raise last_err or RuntimeError("File read failed for all encodings")
def _prepare_header_map(self, header: List[str]) -> Dict[int, str]:
"""
Map column index -> canonical snake_case key.
Unknown/extra headers become normalized snake_case as-is.
"""
def canon(name: str) -> str:
name = name.strip()
name = name.replace('/', '_').replace('-', '_').replace(' ', '_')
return name.lower()
header_norm = [canon(h) for h in header]
if len(header_norm) < len(self.EXPECTED_HEADER):
logger.warning(
f"Header has fewer columns ({len(header_norm)}) than expected ({len(self.EXPECTED_HEADER)}). "
f"Will pad rows defensively."
)
idx_to_key: Dict[int, str] = {}
for i, h in enumerate(header_norm):
idx_to_key[i] = h
return idx_to_key
def _row_to_transaction(self, row: List[str], header_map: Dict[int, str], row_num: int) -> Optional[Dict]:
"""
Convert raw CSV row to a normalized dict (no data-model mapping here).
"""
# Pad or trim to header length (defensive)
max_idx = max(header_map.keys()) if header_map else -1
if len(row) - 1 < max_idx:
row = row + [''] * (max_idx + 1 - len(row))
elif len(row) - 1 > max_idx:
logger.debug(f"Row {row_num} has extra fields; trimming to header size")
# Build base dict with normalized text
raw = {header_map[i]: normalize_text(row[i] if i < len(row) else '') for i in range(max_idx + 1)}
# Collect expected keys; leave as strings except amount where we coerce safely
txn: Dict[str, object] = {k: raw.get(k, '') for k in self.EXPECTED_HEADER}
# Amount normalization
amt = to_decimal(str(txn.get('amount', '') or ''))
txn['amount'] = amt if amt is not None else ''
# IFSC checks (gentle logs only)
ifsc_sender = str(txn.get('ifsc_sender') or '')
ifsc_recvr = str(txn.get('ifsc_recvr') or '')
if ifsc_sender and not validate_ifsc(ifsc_sender):
logger.debug(f"Row {row_num} sender IFSC looks non-standard: {ifsc_sender}")
if ifsc_recvr and not validate_ifsc(ifsc_recvr):
logger.debug(f"Row {row_num} receiver IFSC looks non-standard: {ifsc_recvr}")
# TXN_DATE: keep as-is from file; ignore time entirely
txn['tran_date'] = str(txn.get('tran_date') or '')
txn['tran_time'] = '' # explicitly blank to signal unused
# Basic sanity: UTR presence
if not str(txn.get('utr') or '').strip():
logger.debug(f"Row {row_num} skipped: missing UTR")
return None
return txn
def _build_summary(self, txns: List[Dict]) -> Dict:
"""
Build compact summary:
- total_count
- amount_total
- by_status: count, amount
"""
total_count = len(txns)
amount_total = Decimal('0')
by_status: Dict[str, Dict[str, object]] = {}
for t in txns:
amt = t.get('amount')
if isinstance(amt, Decimal):
pass
elif isinstance(amt, str):
try:
amt = Decimal(amt)
except InvalidOperation:
amt = Decimal('0')
elif amt is None:
amt = Decimal('0')
amount_total += amt
st = (str(t.get('status') or '')).upper()
if st not in by_status:
by_status[st] = {'count': 0, 'amount': Decimal('0')}
by_status[st]['count'] += 1
by_status[st]['amount'] = by_status[st]['amount'] + amt
by_status_str = {k: {'count': v['count'], 'amount': f"{v['amount']:.2f}"} for k, v in by_status.items()}
return {
'total_count': total_count,
'amount_total': f"{amount_total:.2f}",
'by_status': by_status_str
}
# -------------------------
# Printing Utilities
# -------------------------
def print_transactions(transactions: List[Dict], limit: Optional[int] = 50):
"""
Console print (raw transaction dict view similar to UIH print).
Includes all fields except time, REJECT_CODE, and REJECT_REASON.
"""
cols = [
('utr', 20),
('amount', 12),
('status', 8),
('journal_no', 14),
('tran_date', 10),
('sender_acct_name', 28),
('remitter_acct_no', 22),
('benef_acct_no', 22),
('ifsc_sender', 12),
('ifsc_recvr', 12),
('remitter_detail', 28),
('remmiter_info', 24),
('beneficiary_details', 30),
('benef_address', 30),
('sub_msg_type', 10),
]
header = " ".join([f"{name.upper():<{w}}" for name, w in cols])
print("\n" + "=" * len(header))
print(header)
print("=" * len(header))
shown = 0
for txn in transactions:
row = []
for name, w in cols:
val = txn.get(name, '')
if isinstance(val, Decimal):
val = f"{val:.2f}"
row.append(f"{str(val)[:w]:<{w}}")
print(" ".join(row))
shown += 1
if limit and shown >= limit:
print(f"... ({len(transactions) - shown} more rows not shown)")
break
print("=" * len(header))
print(f"Total transactions parsed: {len(transactions)}\n")
def print_metadata(metadata: Dict):
"""Print file metadata (UIH-like)."""
print("\n" + "=" * 80)
print("FILE METADATA")
print("=" * 80)
for key, value in metadata.items():
print(f"{key.upper():<20}: {value}")
print("=" * 80 + "\n")
def print_summary(summary: Dict):
"""Print summary data."""
if summary:
print("\n" + "=" * 80)
print("SUMMARY DATA")
print("=" * 80)
for key, value in summary.items():
print(f"{key.upper()}: {value}")
print("=" * 80 + "\n")
# -------------------------
# Runner
# -------------------------
if __name__ == '__main__':
from logging_config import setup_logging
setup_logging()
parser = SFTPUtrParser('/home/bishwajeet/test_parser/06032026_14_NEFT_INWARD.TXT')
transactions, metadata, summary = parser.parse()
print_metadata(metadata)
print_transactions(transactions, limit=80)
print_summary(summary)
logger.info(f"Parsing complete. Extracted {len(transactions)} transactions")

6
processors/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
"""Processors module for ACH file processing."""
from .data_mapper import NEFTDataMapper
from .file_processor import FileProcessor
__all__ = ['NEFTDataMapper', 'FileProcessor']

137
processors/data_mapper.py Normal file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
Data mapper for NEFT SFTP feed.
Maps parsed NEFT transactions (dicts) to NEFTOutwardRecord for database insertion.
- No padding of account numbers (kept as-is, trimmed).
"""
from datetime import datetime
from decimal import Decimal
from typing import Dict, Any, List
from logging_config import get_logger
from db.models import NEFTOutwardRecord
logger = get_logger(__name__)
class NEFTDataMapper:
"""Maps parsed NEFT transactions to NEFTOutwardRecord objects."""
# -------------------------
# Helpers
# -------------------------
@staticmethod
def convert_date(date_str: str) -> str:
"""
Convert NEFT date (YYYYMMDD) to DDMMYYYY.
Input : '20260306'
Output: '06032026'
On error, returns today's date in DDMMYYYY format.
"""
try:
if not date_str or len(date_str.strip()) != 8 or not date_str.isdigit():
raise ValueError(f"Invalid NEFT date format: {date_str!r}")
dt = datetime.strptime(date_str, "%Y%m%d")
return dt.strftime("%d%m%Y")
except Exception as e:
logger.error(f"Error converting date '{date_str}': {e}")
return datetime.now().strftime("%d%m%Y")
@staticmethod
def convert_amount(amount_in: Any) -> Decimal:
"""
Convert amount to Decimal and return absolute value.
Use TXNIND to capture the sign semantics.
"""
try:
if isinstance(amount_in, Decimal):
val = amount_in
else:
txt = (str(amount_in) or '').replace(',', '').strip()
val = Decimal(txt) if txt else Decimal('0')
return abs(val)
except Exception as e:
logger.error(f"Error converting amount '{amount_in}': {e}")
return Decimal('0')
# -------------------------
# Mapping
# -------------------------
@classmethod
def map_transaction(cls, parsed_txn: Dict[str, Any], bankcode: str) -> NEFTOutwardRecord:
"""
Map a single parsed NEFT transaction (dict) to NEFTOutwardRecord.
Args:
parsed_txn: Dict emitted by SFTPUtrParser
bankcode : Bank code for this transaction (mapped to NEFTOutwardRecord.bank_code)
"""
try:
# Amount handling
amount_in = parsed_txn.get('amount', '0')
txn_amt = cls.convert_amount(amount_in)
txnind = 'DR'
# Date handling
txn_date_raw = parsed_txn.get('tran_date', '') or ''
txn_date_ddmmyyyy = cls.convert_date(txn_date_raw)
# Account numbers: NO padding, just trim
sender_acct = (parsed_txn.get('remitter_acct_no') or '').strip()
recvr_acct = (parsed_txn.get('benef_acct_no') or '').strip()
recvr_acct_name = (parsed_txn.get('beneficiary_details') or '').strip()
record = NEFTOutwardRecord(
bank_code=bankcode,
txnind=txnind,
jrnl_id=(parsed_txn.get('journal_no') or '').strip(),
ref_no=(parsed_txn.get('utr') or '').strip(),
txn_date=txn_date_ddmmyyyy,
txn_amt=txn_amt,
sender_ifsc=(parsed_txn.get('ifsc_sender') or '').strip(),
reciever_ifsc=(parsed_txn.get('ifsc_recvr') or '').strip(),
sender_acct_no=sender_acct,
sender_acct_name=(parsed_txn.get('sender_acct_name') or '').strip(),
recvr_acct_no=recvr_acct,
recvr_acct_name=recvr_acct_name,
reject_code=(parsed_txn.get('reject_code') or '').strip(),
reject_reason=(parsed_txn.get('reject_reason') or '').strip(),
benef_address=(parsed_txn.get('benef_address') or '').strip(),
msg_type=(parsed_txn.get('sub_msg_type') or '').strip(),
)
return record
except Exception as e:
logger.error(f"Error mapping NEFT transaction: {e}", exc_info=True)
raise
@classmethod
def map_transactions(cls, parsed_transactions: List[Dict[str, Any]], bankcode: str) -> List[NEFTOutwardRecord]:
"""
Map a list of parsed NEFT transactions to NEFTOutwardRecord objects.
Args:
parsed_transactions: List of dicts from SFTPUtrParser
bankcode : Bank code to be applied to each record
"""
records: List[NEFTOutwardRecord] = []
for txn in parsed_transactions:
try:
rec = cls.map_transaction(txn, bankcode)
records.append(rec)
except Exception as e:
logger.warning(f"Skipping transaction due to error: {e}")
logger.info(f"Mapped {len(records)} NEFT transactions for bank {bankcode}")
return records

View File

@@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""
Main file processor for end-to-end ACH file processing.
Orchestrates download, parsing, mapping, and database insertion.
"""
import os
import tempfile
from pathlib import Path
from logging_config import get_logger
from neft_outward_parser import NEFT_OUTWARD_Parser
from db.repository import Repository
from db.models import ProcessedFile
from sftp.sftp_client import SFTPClient
from .data_mapper import NEFTDataMapper
logger = get_logger(__name__)
class FileProcessor:
"""Processes NEFT INWARD files end-to-end."""
def __init__(self, repository: Repository = None, sftp_client: SFTPClient = None):
"""
Initialize file processor.
Args:
repository: Repository instance (optional)
sftp_client: SFTPClient instance (optional)
"""
self.repository = repository or Repository()
self.sftp_client = sftp_client or SFTPClient()
self.temp_dir = tempfile.gettempdir()
def process_file(
self,
filename: str,
bankcode: str,
remote_path: str
) -> bool:
"""
Process a single NEFT OUTWARD file end-to-end.
Workflow:
1. Download file from SFTP
2. Parse using NEFT_OUTWARD_Parser
3. Map to database format
4. Insert to database
5. Mark as processed
6. Cleanup local file
Args:
filename: Name of file to process
bankcode: Bank code for this file
remote_path: Full remote path on SFTP
Returns:
True if successful, False otherwise
"""
local_path = os.path.join(self.temp_dir, filename)
try:
logger.info(f"Starting processing: {filename} (bank: {bankcode})")
# Step 1: Check if already processed for this bank
if self.repository.is_file_processed(filename, bankcode):
logger.info(f"File already processed for {bankcode}: {filename}")
return True
# Step 2: Download file
if not self.sftp_client.download_file(remote_path, local_path):
raise Exception(f"Failed to download file: {remote_path}")
# Step 3: Parse file
parser = NEFT_OUTWARD_Parser(local_path)
transactions, metadata, summary = parser.parse()
if not transactions:
logger.warning(f"No transactions found in {filename}")
# Still mark as processed but with 0 transactions
processed_file = ProcessedFile(
filename=filename,
bankcode=bankcode,
file_path=remote_path,
transaction_count=0,
status='SUCCESS'
)
self.repository.mark_file_processed(processed_file)
return True
# Step 4: Map transactions
mapped_records = NEFTDataMapper.map_transactions(transactions, bankcode)
# Step 5: Insert to database (with account validation)
inserted_count, skipped_count = self.repository.bulk_insert_transactions(mapped_records)
# Step 6: Mark file as processed
processed_file = ProcessedFile(
filename=filename,
bankcode=bankcode,
file_path=remote_path,
transaction_count=inserted_count,
status='SUCCESS'
)
self.repository.mark_file_processed(processed_file)
logger.info(f"Successfully processed {filename}: {inserted_count} inserted, {skipped_count} skipped (non-ipks accounts)")
return True
except Exception as e:
logger.error(f"Error processing {filename}: {e}", exc_info=True)
# Mark file as failed
try:
processed_file = ProcessedFile(
filename=filename,
bankcode=bankcode,
file_path=remote_path,
transaction_count=0,
status='FAILED',
error_message=str(e)[:2000]
)
self.repository.mark_file_processed(processed_file)
except Exception as mark_error:
logger.error(f"Failed to mark file as failed: {mark_error}")
return False
finally:
# Cleanup local file
try:
if os.path.exists(local_path):
os.remove(local_path)
logger.debug(f"Cleaned up local file: {local_path}")
except Exception as e:
logger.warning(f"Error cleaning up local file {local_path}: {e}")
def process_files(self, files_to_process: list) -> dict:
"""
Process multiple files.
Args:
files_to_process: List of (filename, bankcode, remote_path) tuples
Returns:
Dictionary with processing statistics
"""
stats = {
'total': len(files_to_process),
'successful': 0,
'failed': 0,
'files': []
}
for filename, bankcode, remote_path in files_to_process:
success = self.process_file(filename, bankcode, remote_path)
stats['successful'] += 1 if success else 0
stats['failed'] += 0 if success else 1
stats['files'].append({
'filename': filename,
'bankcode': bankcode,
'success': success
})
logger.info(f"Processing complete: {stats['successful']}/{stats['total']} successful")
return stats
def __enter__(self):
"""Context manager entry."""
if self.sftp_client and not self.sftp_client.sftp:
self.sftp_client.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
if self.sftp_client:
self.sftp_client.disconnect()

23
requirements.txt Normal file
View File

@@ -0,0 +1,23 @@
# Core dependencies
python-dotenv==1.0.0
# Database (modern Oracle driver - simpler than cx_Oracle)
oracledb==2.0.0
# SFTP
paramiko==3.4.0
cryptography==41.0.7
# Scheduling
schedule==1.2.0
# Configuration
python-decouple==3.8
# Timezone support
pytz==2023.3
# Development dependencies
pytest==7.4.0
black==23.7.0
flake8==6.0.0

171
scheduler.py Normal file
View File

@@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""
NEFT file processing scheduler.
Runs polling loop every 30 minutes to process new files.
"""
import signal
import time
import sys
from datetime import datetime
from logging_config import get_logger, setup_logging
from config import get_config
from db import OracleConnector, Repository
from sftp import SFTPClient, FileMonitor
from processors import FileProcessor
logger = get_logger(__name__)
class Scheduler:
"""Main scheduler for NEFT file processing."""
def __init__(self):
"""Initialize scheduler."""
self.config = get_config()
self.config.validate()
self.running = True
self.cycle_count = 0
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully."""
logger.info(f"Received signal {signum}, shutting down gracefully...")
self.running = False
def initialize_database(self):
"""Initialize database connection and verify tables exist."""
try:
connector = OracleConnector()
if connector.test_connection():
logger.info("Database connection test passed")
repository = Repository()
repository.verify_tables_exist()
return True
else:
logger.error("Database connection test failed")
return False
except SystemExit as e:
logger.error(f"Database initialization failed: {e}")
raise
except Exception as e:
logger.error(f"Error initializing database: {e}", exc_info=True)
return False
def run_processing_cycle(self):
"""Run single file processing cycle."""
self.cycle_count += 1
logger.info(f"=== Starting processing cycle {self.cycle_count} ===")
sftp_client = SFTPClient()
repository = Repository()
try:
# Connect to SFTP
if not sftp_client.connect():
logger.error("Failed to connect to SFTP server")
return
# Scan for new files across all banks
monitor = FileMonitor(sftp_client)
new_files = []
today_str = datetime.now().strftime("%d%m%Y")
logger.info(f'listing file for {today_str}')
for bank_code in self.config.bank_codes:
# Get list of files already processed for this specific bank
bank_processed = repository.get_processed_files(bank_code)
remote_path = f"{self.config.sftp_base_path}/{bank_code}/NEFT"
pattern = f"{today_str}_*_NEFT_OUTWARD.TXT"
files = sftp_client.list_files(remote_path, pattern=pattern)
for filename in files:
if filename not in bank_processed:
full_path = f"{remote_path}/{filename}"
new_files.append((filename, bank_code, full_path))
logger.info(f"Found new file: {filename} (bank: {bank_code})")
else:
logger.debug(f"Skipping already processed file for {bank_code}: {filename}")
if not new_files:
logger.info("No new files to process")
return
logger.info(f"Found {len(new_files)} new files to process")
# Process files
processor = FileProcessor(repository, sftp_client)
stats = processor.process_files(new_files)
# Log summary
logger.info(f"Cycle {self.cycle_count} complete:")
logger.info(f" Total files: {stats['total']}")
logger.info(f" Successful: {stats['successful']}")
logger.info(f" Failed: {stats['failed']}")
# Call neft_api_txn_post procedure once per cycle to process all inserted transactions
if stats['successful'] > 0:
logger.info("Calling neft_api_txn_post procedure for all inserted transactions...")
if repository.call_neft_api_txn_post():
logger.info("Transaction post-processing completed successfully")
else:
logger.error("Transaction post-processing failed")
except Exception as e:
logger.error(f"Error in processing cycle: {e}", exc_info=True)
finally:
sftp_client.disconnect()
def run(self):
"""Run scheduler main loop."""
logger.info("="*80)
logger.info("NEFT_OUTWARD File Processing Scheduler Started")
logger.info(f"Poll Interval: {self.config.poll_interval_minutes} minutes")
logger.info(f"Bank Codes: {', '.join(self.config.bank_codes)}")
logger.info("="*80)
# Initialize database
try:
if not self.initialize_database():
logger.error("Failed to initialize database. Exiting.")
return
except SystemExit as e:
logger.error(f"Fatal error: {e}")
raise
# Run processing loop
poll_interval_seconds = self.config.poll_interval_minutes * 60
while self.running:
try:
self.run_processing_cycle()
except Exception as e:
logger.error(f"Unexpected error in processing cycle: {e}", exc_info=True)
# Wait for next cycle
if self.running:
logger.info(f"Waiting {self.config.poll_interval_minutes} minutes until next cycle...")
time.sleep(poll_interval_seconds)
logger.info("Scheduler shutdown complete")
def main():
"""Main entry point."""
# Setup logging
setup_logging()
# Create and run scheduler
scheduler = Scheduler()
scheduler.run()
if __name__ == '__main__':
main()

6
sftp/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
"""SFTP module for ACH file processing."""
from .sftp_client import SFTPClient
from .file_monitor import FileMonitor
__all__ = ['SFTPClient', 'FileMonitor']

108
sftp/file_monitor.py Normal file
View File

@@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""
File monitoring and discovery for NEFT Inward files.
Scans SFTP directories for new files across multiple banks.
Filename convention handled:
DDMMYYYY_HH_NEFT_INWARD.TXT
Example:
06032026_14_NEFT_INWARD.TXT
"""
import re
from typing import List, Tuple, Dict
from logging_config import get_logger
from config import get_config
from .sftp_client import SFTPClient
logger = get_logger(__name__)
class FileMonitor:
"""Monitors SFTP for new NEFT Outward files."""
def __init__(self, sftp_client: SFTPClient = None):
"""
Initialize file monitor.
Args:
sftp_client: SFTPClient instance (optional)
"""
self.config = get_config()
self.sftp_client = sftp_client or SFTPClient()
def scan_for_new_files(self, processed_filenames: List[str]) -> List[Tuple[str, str, str]]:
"""
Scan all bank directories for new NEFT files.
Args:
processed_filenames: List of already processed filenames to skip
Returns:
List of (filename, bankcode, full_remote_path) tuples
"""
new_files: List[Tuple[str, str, str]] = []
for bank_code in self.config.bank_codes:
# Adjust subfolder name here if required (e.g., 'NEFT_INWARD' or other)
remote_path = f"{self.config.sftp_base_path}/{bank_code}/NEFT"
# Match any NEFT inward file for any date/hour
files = self.sftp_client.list_files(remote_path, pattern='*_NEFT_OUTWARD.TXT')
for filename in files:
if filename not in processed_filenames:
full_path = f"{remote_path}/{filename}"
new_files.append((filename, bank_code, full_path))
logger.info(f"Found new NEFT file: {filename} (bank: {bank_code})")
else:
logger.debug(f"Skipping already processed NEFT file: {filename}")
logger.info(f"NEFT scan complete: Found {len(new_files)} new files")
return new_files
@staticmethod
def parse_filename(filename: str) -> Dict[str, str]:
"""
Parse NEFT filename to extract metadata.
Expected format:
DDMMYYYY_HH_NEFT_OUTWARD.TXT
Example:
06032026_14_NEFT_OUTWARD.TXT
Args:
filename: Filename to parse
Returns:
Dictionary with extracted metadata or empty dict if parse fails
"""
# Groups: DD, MM, YYYY, HH
pattern = r'^(\d{2})(\d{2})(\d{4})_(\d{2})_NEFT_OUTWARD\.TXT$'
match = re.match(pattern, filename, flags=re.IGNORECASE)
if not match:
logger.warning(f"Could not parse NEFT filename: {filename}")
return {}
day, month, year, hour = match.groups()
return {
'filename': filename,
'day': day,
'month': month,
'year': year,
'hour': hour,
'timestamp': f"{day}/{month}/{year} {hour}:00:00"
}
def __enter__(self):
"""Context manager entry."""
if not self.sftp_client.sftp:
self.sftp_client.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.sftp_client.disconnect()

157
sftp/sftp_client.py Normal file
View File

@@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""
SFTP client for file operations.
Handles connection, file discovery, and download operations.
"""
import paramiko
import os
from pathlib import Path
from logging_config import get_logger
from config import get_config
logger = get_logger(__name__)
class SFTPClient:
"""SFTP operations for NEFT file processing."""
def __init__(self):
"""Initialize SFTP client."""
self.config = get_config()
self.sftp = None
self.ssh = None
def connect(self) -> bool:
"""
Establish SFTP connection.
Returns:
True if successful, False otherwise
"""
try:
# Create SSH client
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Connect
self.ssh.connect(
self.config.sftp_host,
port=self.config.sftp_port,
username=self.config.sftp_username,
password=self.config.sftp_password,
timeout=10
)
# Get SFTP channel
self.sftp = self.ssh.open_sftp()
logger.info(f"Connected to SFTP server: {self.config.sftp_host}:{self.config.sftp_port}")
return True
except Exception as e:
logger.error(f"Failed to connect to SFTP server: {e}", exc_info=True)
return False
def disconnect(self):
"""Close SFTP connection."""
try:
if self.sftp:
self.sftp.close()
if self.ssh:
self.ssh.close()
logger.info("SFTP connection closed")
except Exception as e:
logger.error(f"Error closing SFTP connection: {e}")
def list_files(self, remote_path: str, pattern: str) -> list:
"""
List files matching pattern in remote directory.
Args:
remote_path: Path on SFTP server
Returns:
List of matching filenames
"""
if not self.sftp:
logger.error("SFTP not connected")
return []
try:
files = []
try:
items = self.sftp.listdir_attr(remote_path)
except FileNotFoundError:
logger.warning(f"Directory not found: {remote_path}")
return []
import fnmatch
for item in items:
if fnmatch.fnmatch(item.filename, pattern):
files.append(item.filename)
logger.debug(f"Found {len(files)} files matching {pattern} in {remote_path}")
return sorted(files)
except Exception as e:
logger.error(f"Error listing files in {remote_path}: {e}", exc_info=True)
return []
def download_file(self, remote_path: str, local_path: str) -> bool:
"""
Download file from SFTP server.
Args:
remote_path: Full path on SFTP server
local_path: Local destination path
Returns:
True if successful, False otherwise
"""
if not self.sftp:
logger.error("SFTP not connected")
return False
try:
# Create local directory if needed
Path(local_path).parent.mkdir(parents=True, exist_ok=True)
# Download file
self.sftp.get(remote_path, local_path)
logger.info(f"Downloaded file: {remote_path} -> {local_path}")
return True
except Exception as e:
logger.error(f"Error downloading file {remote_path}: {e}", exc_info=True)
return False
def get_file_size(self, remote_path: str) -> int:
"""
Get size of remote file.
Args:
remote_path: Full path on SFTP server
Returns:
File size in bytes, or -1 if error
"""
if not self.sftp:
logger.error("SFTP not connected")
return -1
try:
stat = self.sftp.stat(remote_path)
return stat.st_size
except Exception as e:
logger.error(f"Error getting file size {remote_path}: {e}")
return -1
def __enter__(self):
"""Context manager entry."""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disconnect()