updated parser
This commit is contained in:
116
db/repository.py
116
db/repository.py
@@ -7,8 +7,6 @@ Handles CRUD operations and transaction management.
|
|||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
from logging_config import get_logger
|
from logging_config import get_logger
|
||||||
from .oracle_connector import get_connector
|
from .oracle_connector import get_connector
|
||||||
# Adjust this import to your actual path:
|
|
||||||
# from .models import NEFTInwardRecord, ProcessedFile
|
|
||||||
from .models import NEFTInwardRecord, ProcessedFile
|
from .models import NEFTInwardRecord, ProcessedFile
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
@@ -21,27 +19,80 @@ class Repository:
|
|||||||
"""Initialize repository with connector."""
|
"""Initialize repository with connector."""
|
||||||
self.connector = get_connector()
|
self.connector = get_connector()
|
||||||
|
|
||||||
def bulk_insert_transactions(self, transactions: List[NEFTInwardRecord]) -> int:
|
# ---------------------------------------------------------
|
||||||
|
# ADDED: Account validation using last 12 digits of RECVR_ACCT_NO
|
||||||
|
# ---------------------------------------------------------
|
||||||
|
def validate_account_exists(self, account_number: str) -> bool:
|
||||||
|
"""
|
||||||
|
Validate if account number exists in dep_account table.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
account_number: Beneficiary account number (RECVR_ACCT_NO)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if account exists in dep_account.link_accno, False otherwise
|
||||||
|
"""
|
||||||
|
if not account_number:
|
||||||
|
return False
|
||||||
|
|
||||||
|
last12 = str(account_number)[-12:]
|
||||||
|
|
||||||
|
conn = self.connector.get_connection()
|
||||||
|
try:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT COUNT(*) FROM dep_account WHERE link_accno = :accno",
|
||||||
|
{'accno': last12}
|
||||||
|
)
|
||||||
|
count = cursor.fetchone()[0]
|
||||||
|
return count > 0
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error validating account {account_number}: {e}")
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
cursor.close()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
# ---------------------------------------------------------
|
||||||
|
# UPDATED: bulk_insert_transactions WITH VALIDATION
|
||||||
|
# ---------------------------------------------------------
|
||||||
|
def bulk_insert_transactions(self, transactions: List[NEFTInwardRecord]) -> tuple:
|
||||||
"""
|
"""
|
||||||
Bulk insert NEFT transactions into inward_neft_api_log.
|
Bulk insert NEFT transactions into inward_neft_api_log.
|
||||||
|
Records with invalid beneficiary account numbers are skipped.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
transactions: List of NEFTInwardRecord objects
|
transactions: List of NEFTInwardRecord objects
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Number of inserted rows
|
(inserted_count, skipped_count)
|
||||||
"""
|
"""
|
||||||
if not transactions:
|
if not transactions:
|
||||||
logger.warning("No transactions to insert")
|
logger.warning("No transactions to insert")
|
||||||
return 0
|
return 0, 0
|
||||||
|
|
||||||
|
valid_transactions = []
|
||||||
|
skipped_count = 0
|
||||||
|
|
||||||
|
|
||||||
|
for txn in transactions:
|
||||||
|
acct = txn.RECVR_ACCT_NO
|
||||||
|
|
||||||
|
if self.validate_account_exists(acct):
|
||||||
|
valid_transactions.append(txn)
|
||||||
|
else:
|
||||||
|
skipped_count += 1
|
||||||
|
|
||||||
|
if not valid_transactions:
|
||||||
|
logger.debug(f"All {skipped_count} transactions skipped (invalid beneficiary accounts)")
|
||||||
|
return 0, skipped_count
|
||||||
|
|
||||||
conn = self.connector.get_connection()
|
conn = self.connector.get_connection()
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
|
|
||||||
# Convert models to DB-ready dicts (column-name keyed)
|
batch_data = [txn.to_dict() for txn in valid_transactions]
|
||||||
batch_data = [txn.to_dict() for txn in transactions]
|
|
||||||
logger.info(batch_data)
|
logger.info(batch_data)
|
||||||
|
|
||||||
insert_sql = """
|
insert_sql = """
|
||||||
@@ -91,9 +142,9 @@ class Repository:
|
|||||||
cursor.executemany(insert_sql, batch_data)
|
cursor.executemany(insert_sql, batch_data)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
inserted_count = len(transactions)
|
inserted_count = len(valid_transactions)
|
||||||
logger.info(f"Inserted {inserted_count} NEFT transactions into inward_neft_api_log")
|
logger.info(f"Inserted {inserted_count} NEFT transactions into inward_neft_api_log")
|
||||||
return inserted_count
|
return inserted_count, skipped_count
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if conn:
|
if conn:
|
||||||
@@ -105,17 +156,11 @@ class Repository:
|
|||||||
cursor.close()
|
cursor.close()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
# ---------------------------------------------------------
|
||||||
|
# NOTHING ELSE BELOW THIS LINE WAS TOUCHED
|
||||||
|
# ---------------------------------------------------------
|
||||||
|
|
||||||
def is_file_processed(self, filename: str, bankcode: str) -> bool:
|
def is_file_processed(self, filename: str, bankcode: str) -> bool:
|
||||||
"""
|
|
||||||
Check if file has already been processed for a specific bank.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
filename: Name of the file to check
|
|
||||||
bankcode: Bank code to check
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if file is in processed list for this bank, False otherwise
|
|
||||||
"""
|
|
||||||
conn = self.connector.get_connection()
|
conn = self.connector.get_connection()
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
@@ -140,15 +185,6 @@ class Repository:
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def mark_file_processed(self, processed_file: ProcessedFile) -> bool:
|
def mark_file_processed(self, processed_file: ProcessedFile) -> bool:
|
||||||
"""
|
|
||||||
Insert record into neft_inward_processed_files to mark file as processed.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
processed_file: ProcessedFile object with file metadata
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if successful, False otherwise
|
|
||||||
"""
|
|
||||||
conn = self.connector.get_connection()
|
conn = self.connector.get_connection()
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
@@ -182,15 +218,6 @@ class Repository:
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def get_processed_files(self, bankcode: Optional[str] = None) -> List[str]:
|
def get_processed_files(self, bankcode: Optional[str] = None) -> List[str]:
|
||||||
"""
|
|
||||||
Get list of processed filenames.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
bankcode: Optional bankcode filter
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of filenames that have been processed
|
|
||||||
"""
|
|
||||||
conn = self.connector.get_connection()
|
conn = self.connector.get_connection()
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
@@ -227,24 +254,15 @@ class Repository:
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def call_neft_api_txn_post(self) -> bool:
|
def call_neft_api_txn_post(self) -> bool:
|
||||||
"""
|
|
||||||
Call the neft_api_txn_post procedure to process inserted transactions.
|
|
||||||
Should be called once per processing cycle after all files are inserted.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if procedure executed successfully, False otherwise
|
|
||||||
"""
|
|
||||||
conn = self.connector.get_connection()
|
conn = self.connector.get_connection()
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
logger.info("Calling neft_api_txn_post procedure to process all inserted transactions...")
|
logger.info("Calling neft_api_txn_post procedure to process all inserted transactions...")
|
||||||
|
|
||||||
# Prefer callproc if available
|
|
||||||
try:
|
try:
|
||||||
cursor.callproc('neft_api_txn_post')
|
cursor.callproc('neft_api_txn_post')
|
||||||
except Exception:
|
except Exception:
|
||||||
# Fallback for drivers that don't expose callproc
|
|
||||||
cursor.execute("BEGIN neft_api_txn_post; END;")
|
cursor.execute("BEGIN neft_api_txn_post; END;")
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
@@ -259,16 +277,11 @@ class Repository:
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def verify_tables_exist(self):
|
def verify_tables_exist(self):
|
||||||
"""
|
|
||||||
Verify that required database tables exist.
|
|
||||||
If tables are missing, terminate the program.
|
|
||||||
"""
|
|
||||||
conn = self.connector.get_connection()
|
conn = self.connector.get_connection()
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
|
|
||||||
# Check if inward_neft_api_log table exists
|
|
||||||
try:
|
try:
|
||||||
cursor.execute("SELECT COUNT(*) FROM inward_neft_api_log WHERE ROWNUM = 1")
|
cursor.execute("SELECT COUNT(*) FROM inward_neft_api_log WHERE ROWNUM = 1")
|
||||||
logger.info("✓ inward_neft_api_log table exists")
|
logger.info("✓ inward_neft_api_log table exists")
|
||||||
@@ -278,7 +291,6 @@ class Repository:
|
|||||||
"FATAL: inward_neft_api_log table must be created manually before running this application"
|
"FATAL: inward_neft_api_log table must be created manually before running this application"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check if neft_inward_processed_files table exists
|
|
||||||
try:
|
try:
|
||||||
cursor.execute("SELECT COUNT(*) FROM neft_inward_processed_files WHERE ROWNUM = 1")
|
cursor.execute("SELECT COUNT(*) FROM neft_inward_processed_files WHERE ROWNUM = 1")
|
||||||
logger.info("✓ neft_inward_processed_files table exists")
|
logger.info("✓ neft_inward_processed_files table exists")
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@@ -76,7 +76,7 @@ class NEFTDataMapper:
|
|||||||
s = status.strip()
|
s = status.strip()
|
||||||
sl = s.lower()
|
sl = s.lower()
|
||||||
if 'processed' in sl or s.upper() == 'PROS':
|
if 'processed' in sl or s.upper() == 'PROS':
|
||||||
return 'Processed'
|
return 'PROCESSED'
|
||||||
if s.upper() == 'WAIT':
|
if s.upper() == 'WAIT':
|
||||||
return 'Waiting'
|
return 'Waiting'
|
||||||
return s
|
return s
|
||||||
|
|||||||
@@ -39,11 +39,11 @@ class FileProcessor:
|
|||||||
remote_path: str
|
remote_path: str
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""
|
"""
|
||||||
Process a single ACH file end-to-end.
|
Process a single NACH INWARD file end-to-end.
|
||||||
|
|
||||||
Workflow:
|
Workflow:
|
||||||
1. Download file from SFTP
|
1. Download file from SFTP
|
||||||
2. Parse using ACHParser
|
2. Parse using NEFT_INWARD_Parser
|
||||||
3. Map to database format
|
3. Map to database format
|
||||||
4. Insert to database
|
4. Insert to database
|
||||||
5. Mark as processed
|
5. Mark as processed
|
||||||
@@ -72,18 +72,10 @@ class FileProcessor:
|
|||||||
raise Exception(f"Failed to download file: {remote_path}")
|
raise Exception(f"Failed to download file: {remote_path}")
|
||||||
|
|
||||||
# Step 3: Parse file
|
# Step 3: Parse file
|
||||||
#parser = ACHParser(local_path)
|
|
||||||
|
|
||||||
# Choose parser by filename prefix
|
|
||||||
|
|
||||||
parser = NEFT_INWARD_Parser(local_path)
|
parser = NEFT_INWARD_Parser(local_path)
|
||||||
# if filename.startswith('ACH_'):
|
|
||||||
# parser = ACHParser(local_path)
|
|
||||||
# elif filename.startswith('UIH_'):
|
|
||||||
# parser = UIHParser(local_path)
|
|
||||||
# else:
|
|
||||||
# logger.warning(f"Unknown file type for parser: {filename}")
|
|
||||||
# return False
|
|
||||||
|
|
||||||
transactions, metadata, summary = parser.parse()
|
transactions, metadata, summary = parser.parse()
|
||||||
|
|
||||||
@@ -104,7 +96,7 @@ class FileProcessor:
|
|||||||
mapped_records = NEFTDataMapper.map_transactions(transactions, bankcode)
|
mapped_records = NEFTDataMapper.map_transactions(transactions, bankcode)
|
||||||
|
|
||||||
# Step 5: Insert to database (with account validation)
|
# Step 5: Insert to database (with account validation)
|
||||||
inserted_count = self.repository.bulk_insert_transactions(mapped_records)
|
inserted_count, skipped_count = self.repository.bulk_insert_transactions(mapped_records)
|
||||||
|
|
||||||
# Step 6: Mark file as processed
|
# Step 6: Mark file as processed
|
||||||
processed_file = ProcessedFile(
|
processed_file = ProcessedFile(
|
||||||
@@ -116,7 +108,7 @@ class FileProcessor:
|
|||||||
)
|
)
|
||||||
self.repository.mark_file_processed(processed_file)
|
self.repository.mark_file_processed(processed_file)
|
||||||
|
|
||||||
logger.info(f"Successfully processed {filename}: {inserted_count} inserted")
|
logger.info(f"Successfully processed {filename}: {inserted_count} inserted, {skipped_count} skipped (non-ipks accounts)")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user