neft_inward
This commit is contained in:
6
processors/__init__.py
Normal file
6
processors/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Processors module for ACH file processing."""
|
||||
|
||||
from .data_mapper import NEFTDataMapper
|
||||
from .file_processor import FileProcessor
|
||||
|
||||
__all__ = ['NEFTDataMapper', 'FileProcessor']
|
||||
183
processors/data_mapper.py
Normal file
183
processors/data_mapper.py
Normal file
@@ -0,0 +1,183 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Data mapper for NEFT SFTP feed.
|
||||
Maps parsed NEFT transactions (dicts) to NEFTInwardRecord for database insertion.
|
||||
- No padding of account numbers (kept as-is, trimmed).
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
from typing import Dict, Any, List
|
||||
|
||||
from logging_config import get_logger
|
||||
from db.models import NEFTInwardRecord
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class NEFTDataMapper:
|
||||
"""Maps parsed NEFT transactions to NEFTInwardRecord objects."""
|
||||
|
||||
# -------------------------
|
||||
# Helpers
|
||||
# -------------------------
|
||||
|
||||
@staticmethod
|
||||
def convert_date(date_str: str) -> str:
|
||||
"""
|
||||
Convert NEFT date (YYYYMMDD) to DDMMYYYY.
|
||||
|
||||
Input : '20260306'
|
||||
Output: '06032026'
|
||||
|
||||
On error, returns today's date in DDMMYYYY format.
|
||||
"""
|
||||
try:
|
||||
if not date_str or len(date_str.strip()) != 8 or not date_str.isdigit():
|
||||
raise ValueError(f"Invalid NEFT date format: {date_str!r}")
|
||||
dt = datetime.strptime(date_str, "%Y%m%d")
|
||||
return dt.strftime("%d%m%Y")
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting date '{date_str}': {e}")
|
||||
return datetime.now().strftime("%d%m%Y")
|
||||
|
||||
@staticmethod
|
||||
def calculate_txnind(amount_in: Any) -> str:
|
||||
"""
|
||||
Calculate transaction indicator from amount.
|
||||
'CR' for credit (>= 0), 'DR' for debit (< 0).
|
||||
|
||||
Accepts Decimal or string. Defaults to 'CR' on error.
|
||||
"""
|
||||
try:
|
||||
if isinstance(amount_in, Decimal):
|
||||
amount = amount_in
|
||||
else:
|
||||
txt = (str(amount_in) or "").strip()
|
||||
amount = Decimal(txt) if txt else Decimal("0")
|
||||
return 'DR' if amount < 0 else 'CR'
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating TXNIND for amount '{amount_in}': {e}")
|
||||
return 'CR'
|
||||
|
||||
@staticmethod
|
||||
def process_status(status: str) -> str:
|
||||
"""
|
||||
Normalize status field.
|
||||
|
||||
- If contains 'processed' (case-insensitive) -> 'Processed'
|
||||
- If equals 'PROS' (common NEFT code) -> 'Processed'
|
||||
- If equals 'WAIT' -> 'Waiting'
|
||||
- Else return original status (trimmed)
|
||||
"""
|
||||
try:
|
||||
if not status:
|
||||
return ''
|
||||
s = status.strip()
|
||||
sl = s.lower()
|
||||
if 'processed' in sl or s.upper() == 'PROS':
|
||||
return 'Processed'
|
||||
if s.upper() == 'WAIT':
|
||||
return 'Waiting'
|
||||
return s
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing status: {e}")
|
||||
return status
|
||||
|
||||
@staticmethod
|
||||
def convert_amount(amount_in: Any) -> Decimal:
|
||||
"""
|
||||
Convert amount to Decimal and return absolute value.
|
||||
Use TXNIND to capture the sign semantics.
|
||||
"""
|
||||
try:
|
||||
if isinstance(amount_in, Decimal):
|
||||
val = amount_in
|
||||
else:
|
||||
txt = (str(amount_in) or '').replace(',', '').strip()
|
||||
val = Decimal(txt) if txt else Decimal('0')
|
||||
return abs(val)
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting amount '{amount_in}': {e}")
|
||||
return Decimal('0')
|
||||
|
||||
# -------------------------
|
||||
# Mapping
|
||||
# -------------------------
|
||||
|
||||
@classmethod
|
||||
def map_transaction(cls, parsed_txn: Dict[str, Any]) -> NEFTInwardRecord:
|
||||
"""
|
||||
Map a single parsed NEFT transaction (dict) to NEFTInwardRecord.
|
||||
|
||||
Expects keys as emitted by SFTPUtrParser:
|
||||
- utr, amount, journal_no, status, sub_msg_type
|
||||
- tran_date (YYYYMMDD), tran_time (ignored here)
|
||||
- ifsc_sender, ifsc_recvr
|
||||
- remitter_acct_no, sender_acct_name
|
||||
- remitter_detail, remmiter_info
|
||||
- benef_acct_no, beneficiary_details
|
||||
- benef_address
|
||||
- reject_code, reject_reason (optional)
|
||||
"""
|
||||
try:
|
||||
# Amount handling
|
||||
amount_in = parsed_txn.get('amount', '0')
|
||||
txn_amt = cls.convert_amount(amount_in)
|
||||
txnind = cls.calculate_txnind(amount_in)
|
||||
|
||||
# Date handling
|
||||
txn_date_raw = parsed_txn.get('tran_date', '') or ''
|
||||
txn_date_ddmmyyyy = cls.convert_date(txn_date_raw)
|
||||
|
||||
# Account numbers: NO padding, just trim
|
||||
sender_acct = (parsed_txn.get('remitter_acct_no') or '').strip()
|
||||
recvr_acct = (parsed_txn.get('benef_acct_no') or '').strip()
|
||||
|
||||
# Status normalization
|
||||
status_norm = cls.process_status(parsed_txn.get('status', ''))
|
||||
|
||||
# Receiver account name: best available proxy is beneficiary_details
|
||||
recvr_acct_name = (parsed_txn.get('beneficiary_details') or '').strip()
|
||||
|
||||
record = NEFTInwardRecord(
|
||||
txnind=txnind,
|
||||
jrnl_id=(parsed_txn.get('journal_no') or '').strip(),
|
||||
ref_no=(parsed_txn.get('utr') or '').strip(),
|
||||
txn_date=txn_date_ddmmyyyy,
|
||||
txn_amt=txn_amt,
|
||||
sender_ifsc=(parsed_txn.get('ifsc_sender') or '').strip(),
|
||||
reciever_ifsc=(parsed_txn.get('ifsc_recvr') or '').strip(),
|
||||
sender_acct_no=sender_acct,
|
||||
sender_acct_name=(parsed_txn.get('sender_acct_name') or '').strip(),
|
||||
remitter_detail=(parsed_txn.get('remitter_detail') or '').strip(),
|
||||
remitter_info=(parsed_txn.get('remmiter_info') or '').strip(),
|
||||
recvr_acct_no=recvr_acct,
|
||||
recvr_acct_name=recvr_acct_name,
|
||||
status=status_norm,
|
||||
reject_code=(parsed_txn.get('reject_code') or '').strip(),
|
||||
reject_reason=(parsed_txn.get('reject_reason') or '').strip(),
|
||||
benef_address=(parsed_txn.get('benef_address') or '').strip(),
|
||||
msg_type=(parsed_txn.get('sub_msg_type') or '').strip(),
|
||||
)
|
||||
|
||||
return record
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error mapping NEFT transaction: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
@classmethod
|
||||
def map_transactions(cls, parsed_transactions: List[Dict[str, Any]]) -> List[NEFTInwardRecord]:
|
||||
"""
|
||||
Map a list of parsed NEFT transactions to NEFTInwardRecord objects.
|
||||
"""
|
||||
records: List[NEFTInwardRecord] = []
|
||||
for txn in parsed_transactions:
|
||||
try:
|
||||
rec = cls.map_transaction(txn)
|
||||
records.append(rec)
|
||||
except Exception as e:
|
||||
logger.warning(f"Skipping transaction due to error: {e}")
|
||||
logger.info(f"Mapped {len(records)} NEFT transactions")
|
||||
return records
|
||||
189
processors/file_processor.py
Normal file
189
processors/file_processor.py
Normal file
@@ -0,0 +1,189 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Main file processor for end-to-end ACH file processing.
|
||||
Orchestrates download, parsing, mapping, and database insertion.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from logging_config import get_logger
|
||||
from neft_inward_parser import NEFT_INWARD_Parser
|
||||
from db.repository import Repository
|
||||
from db.models import ProcessedFile
|
||||
from sftp.sftp_client import SFTPClient
|
||||
from .data_mapper import NEFTDataMapper
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class FileProcessor:
|
||||
"""Processes NEFT INWARD files end-to-end."""
|
||||
|
||||
def __init__(self, repository: Repository = None, sftp_client: SFTPClient = None):
|
||||
"""
|
||||
Initialize file processor.
|
||||
|
||||
Args:
|
||||
repository: Repository instance (optional)
|
||||
sftp_client: SFTPClient instance (optional)
|
||||
"""
|
||||
self.repository = repository or Repository()
|
||||
self.sftp_client = sftp_client or SFTPClient()
|
||||
self.temp_dir = tempfile.gettempdir()
|
||||
|
||||
def process_file(
|
||||
self,
|
||||
filename: str,
|
||||
bankcode: str,
|
||||
remote_path: str
|
||||
) -> bool:
|
||||
"""
|
||||
Process a single ACH file end-to-end.
|
||||
|
||||
Workflow:
|
||||
1. Download file from SFTP
|
||||
2. Parse using ACHParser
|
||||
3. Map to database format
|
||||
4. Insert to database
|
||||
5. Mark as processed
|
||||
6. Cleanup local file
|
||||
|
||||
Args:
|
||||
filename: Name of file to process
|
||||
bankcode: Bank code for this file
|
||||
remote_path: Full remote path on SFTP
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
local_path = os.path.join(self.temp_dir, filename)
|
||||
|
||||
try:
|
||||
logger.info(f"Starting processing: {filename} (bank: {bankcode})")
|
||||
|
||||
# Step 1: Check if already processed for this bank
|
||||
if self.repository.is_file_processed(filename, bankcode):
|
||||
logger.info(f"File already processed for {bankcode}: {filename}")
|
||||
return True
|
||||
|
||||
# Step 2: Download file
|
||||
if not self.sftp_client.download_file(remote_path, local_path):
|
||||
raise Exception(f"Failed to download file: {remote_path}")
|
||||
|
||||
# Step 3: Parse file
|
||||
#parser = ACHParser(local_path)
|
||||
|
||||
# Choose parser by filename prefix
|
||||
|
||||
parser = NEFT_INWARD_Parser(local_path)
|
||||
# if filename.startswith('ACH_'):
|
||||
# parser = ACHParser(local_path)
|
||||
# elif filename.startswith('UIH_'):
|
||||
# parser = UIHParser(local_path)
|
||||
# else:
|
||||
# logger.warning(f"Unknown file type for parser: {filename}")
|
||||
# return False
|
||||
|
||||
transactions, metadata, summary = parser.parse()
|
||||
|
||||
if not transactions:
|
||||
logger.warning(f"No transactions found in {filename}")
|
||||
# Still mark as processed but with 0 transactions
|
||||
processed_file = ProcessedFile(
|
||||
filename=filename,
|
||||
bankcode=bankcode,
|
||||
file_path=remote_path,
|
||||
transaction_count=0,
|
||||
status='SUCCESS'
|
||||
)
|
||||
self.repository.mark_file_processed(processed_file)
|
||||
return True
|
||||
|
||||
# Step 4: Map transactions
|
||||
mapped_records = NEFTDataMapper.map_transactions(transactions, bankcode)
|
||||
|
||||
# Step 5: Insert to database (with account validation)
|
||||
inserted_count = self.repository.bulk_insert_transactions(mapped_records)
|
||||
|
||||
# Step 6: Mark file as processed
|
||||
processed_file = ProcessedFile(
|
||||
filename=filename,
|
||||
bankcode=bankcode,
|
||||
file_path=remote_path,
|
||||
transaction_count=inserted_count,
|
||||
status='SUCCESS'
|
||||
)
|
||||
self.repository.mark_file_processed(processed_file)
|
||||
|
||||
logger.info(f"Successfully processed {filename}: {inserted_count} inserted")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {filename}: {e}", exc_info=True)
|
||||
|
||||
# Mark file as failed
|
||||
try:
|
||||
processed_file = ProcessedFile(
|
||||
filename=filename,
|
||||
bankcode=bankcode,
|
||||
file_path=remote_path,
|
||||
transaction_count=0,
|
||||
status='FAILED',
|
||||
error_message=str(e)[:2000]
|
||||
)
|
||||
self.repository.mark_file_processed(processed_file)
|
||||
except Exception as mark_error:
|
||||
logger.error(f"Failed to mark file as failed: {mark_error}")
|
||||
|
||||
return False
|
||||
|
||||
finally:
|
||||
# Cleanup local file
|
||||
try:
|
||||
if os.path.exists(local_path):
|
||||
os.remove(local_path)
|
||||
logger.debug(f"Cleaned up local file: {local_path}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error cleaning up local file {local_path}: {e}")
|
||||
|
||||
def process_files(self, files_to_process: list) -> dict:
|
||||
"""
|
||||
Process multiple files.
|
||||
|
||||
Args:
|
||||
files_to_process: List of (filename, bankcode, remote_path) tuples
|
||||
|
||||
Returns:
|
||||
Dictionary with processing statistics
|
||||
"""
|
||||
stats = {
|
||||
'total': len(files_to_process),
|
||||
'successful': 0,
|
||||
'failed': 0,
|
||||
'files': []
|
||||
}
|
||||
|
||||
for filename, bankcode, remote_path in files_to_process:
|
||||
success = self.process_file(filename, bankcode, remote_path)
|
||||
stats['successful'] += 1 if success else 0
|
||||
stats['failed'] += 0 if success else 1
|
||||
stats['files'].append({
|
||||
'filename': filename,
|
||||
'bankcode': bankcode,
|
||||
'success': success
|
||||
})
|
||||
|
||||
logger.info(f"Processing complete: {stats['successful']}/{stats['total']} successful")
|
||||
return stats
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry."""
|
||||
if self.sftp_client and not self.sftp_client.sftp:
|
||||
self.sftp_client.connect()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit."""
|
||||
if self.sftp_client:
|
||||
self.sftp_client.disconnect()
|
||||
Reference in New Issue
Block a user