From 8ac16da50339d544180ed455efd850118fe55723 Mon Sep 17 00:00:00 2001 From: asif Date: Tue, 3 Feb 2026 15:25:54 +0530 Subject: [PATCH] added validation to consider only ipks mirror account numbers --- db/repository.py | 57 +++++++++++++++++++++++++++++++----- processors/file_processor.py | 6 ++-- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/db/repository.py b/db/repository.py index bcfa6b8..aad1ae3 100644 --- a/db/repository.py +++ b/db/repository.py @@ -19,26 +19,67 @@ class Repository: """Initialize repository with connector.""" self.connector = get_connector() - def bulk_insert_transactions(self, transactions: List[TransactionRecord]) -> int: + def validate_account_exists(self, account_number: str) -> bool: """ - Bulk insert transaction records into ach_api_log_temp git. + Validate if account number exists in dep_account table. + + Args: + account_number: Account number to validate (cbs_acct) + + Returns: + True if account exists in dep_account.link_accno, False otherwise + """ + conn = self.connector.get_connection() + try: + cursor = conn.cursor() + cursor.execute( + "SELECT COUNT(*) FROM dep_account WHERE link_accno = :accno", + {'accno': account_number} + ) + count = cursor.fetchone()[0] + return count > 0 + except Exception as e: + logger.warning(f"Error validating account {account_number}: {e}") + return False + finally: + cursor.close() + conn.close() + + def bulk_insert_transactions(self, transactions: List[TransactionRecord]) -> tuple: + """ + Bulk insert transaction records into ach_api_log. + Records with invalid account numbers are silently skipped. Args: transactions: List of TransactionRecord objects Returns: - Number of inserted records + Tuple of (inserted_count, skipped_count) """ if not transactions: logger.warning("No transactions to insert") - return 0 + return 0, 0 + + # Validate accounts and filter out invalid ones + valid_transactions = [] + skipped_count = 0 + + for txn in transactions: + if self.validate_account_exists(txn.cbs_acct): + valid_transactions.append(txn) + else: + skipped_count += 1 + + if not valid_transactions: + logger.info(f"All {skipped_count} transactions skipped (invalid accounts)") + return 0, skipped_count conn = self.connector.get_connection() try: cursor = conn.cursor() # Prepare batch data - batch_data = [txn.to_dict() for txn in transactions] + batch_data = [txn.to_dict() for txn in valid_transactions] # Execute batch insert insert_sql = """ @@ -54,9 +95,9 @@ class Repository: cursor.executemany(insert_sql, batch_data) conn.commit() - count = len(transactions) - logger.info(f"Successfully inserted {count} transactions into ach_api_log_temp") - return count + inserted_count = len(valid_transactions) + logger.info(f"Inserted {inserted_count} transactions, skipped {skipped_count} (invalid accounts)") + return inserted_count, skipped_count except Exception as e: conn.rollback() diff --git a/processors/file_processor.py b/processors/file_processor.py index 943818e..b01e2f4 100644 --- a/processors/file_processor.py +++ b/processors/file_processor.py @@ -93,8 +93,8 @@ class FileProcessor: mapped_records = DataMapper.map_transactions(transactions, bankcode) logger.info(f"Mapped {len(mapped_records)} transactions") - # Step 5: Insert to database - inserted_count = self.repository.bulk_insert_transactions(mapped_records) + # Step 5: Insert to database (with account validation) + inserted_count, skipped_count = self.repository.bulk_insert_transactions(mapped_records) # Step 6: Mark file as processed processed_file = ProcessedFile( @@ -106,7 +106,7 @@ class FileProcessor: ) self.repository.mark_file_processed(processed_file) - logger.info(f"Successfully processed {filename}: {inserted_count} transactions inserted") + logger.info(f"Successfully processed {filename}: {inserted_count} inserted, {skipped_count} skipped (invalid accounts)") return True except Exception as e: