From 81ad34fd652ce40ce6a66741f1105f3f589a6c2d Mon Sep 17 00:00:00 2001 From: asif Date: Thu, 5 Feb 2026 11:58:59 +0530 Subject: [PATCH] added bankwise file monitoring and calling the ach_api_txn_post procedure after each run --- db/repository.py | 23 +++++++++++++++++++++++ scheduler.py | 34 ++++++++++++++++++++++++++-------- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/db/repository.py b/db/repository.py index ca6099c..e6a4225 100644 --- a/db/repository.py +++ b/db/repository.py @@ -205,6 +205,29 @@ class Repository: cursor.close() conn.close() + def call_ach_api_txn_post(self) -> bool: + """ + Call the ach_api_txn_post procedure to process inserted transactions. + Should be called once per processing cycle after all files are inserted. + + Returns: + True if procedure executed successfully, False otherwise + """ + conn = self.connector.get_connection() + try: + cursor = conn.cursor() + logger.info("Calling ach_api_txn_post procedure to process all inserted transactions...") + cursor.execute("CALL ach_api_txn_post()") + conn.commit() + logger.info("ach_api_txn_post procedure executed successfully") + return True + except Exception as e: + logger.error(f"Error calling ach_api_txn_post procedure: {e}", exc_info=True) + return False + finally: + cursor.close() + conn.close() + def verify_tables_exist(self): """ Verify that required database tables exist. diff --git a/scheduler.py b/scheduler.py index da8c853..d40536f 100644 --- a/scheduler.py +++ b/scheduler.py @@ -68,20 +68,30 @@ class Scheduler: logger.error("Failed to connect to SFTP server") return - # Get list of already processed files - processed_files = set() - for bank_code in self.config.bank_codes: - bank_processed = repository.get_processed_files(bank_code) - processed_files.update(bank_processed) - - # Scan for new files + # Scan for new files across all banks monitor = FileMonitor(sftp_client) - new_files = monitor.scan_for_new_files(list(processed_files)) + new_files = [] + + for bank_code in self.config.bank_codes: + # Get list of files already processed for this specific bank + bank_processed = repository.get_processed_files(bank_code) + remote_path = f"{self.config.sftp_base_path}/{bank_code}/NACH" + files = sftp_client.list_files(remote_path, pattern='ACH_*.txt') + + for filename in files: + if filename not in bank_processed: + full_path = f"{remote_path}/{filename}" + new_files.append((filename, bank_code, full_path)) + logger.info(f"Found new file: {filename} (bank: {bank_code})") + else: + logger.debug(f"Skipping already processed file for {bank_code}: {filename}") if not new_files: logger.info("No new files to process") return + logger.info(f"Found {len(new_files)} new files to process") + # Process files processor = FileProcessor(repository, sftp_client) stats = processor.process_files(new_files) @@ -92,6 +102,14 @@ class Scheduler: logger.info(f" Successful: {stats['successful']}") logger.info(f" Failed: {stats['failed']}") + # Call ach_api_txn_post procedure once per cycle to process all inserted transactions + if stats['successful'] > 0: + logger.info("Calling ach_api_txn_post procedure for all inserted transactions...") + if repository.call_ach_api_txn_post(): + logger.info("Transaction post-processing completed successfully") + else: + logger.error("Transaction post-processing failed") + except Exception as e: logger.error(f"Error in processing cycle: {e}", exc_info=True)