12 KiB
ACH File Processing Pipeline - Implementation Guide
Project Structure
ach_ui_dbtl_file_based/
├── config.py # Configuration management
├── scheduler.py # 30-minute polling scheduler
├── main.py # Application entry point
├── ach_parser.py # Existing ACH parser
├── logging_config.py # Existing logging setup
├── db/
│ ├── __init__.py
│ ├── oracle_connector.py # Database connection pooling
│ ├── models.py # Data models
│ └── repository.py # Data access layer
├── sftp/
│ ├── __init__.py
│ ├── sftp_client.py # SFTP operations
│ └── file_monitor.py # File discovery
├── processors/
│ ├── __init__.py
│ ├── data_mapper.py # Field transformation
│ └── file_processor.py # File processing orchestration
├── tests/
│ ├── __init__.py
│ ├── test_data_mapper.py
│ └── test_file_monitor.py
├── docker-compose.yml # Mock SFTP server
├── requirements.txt # Dependencies
├── .env.example # Configuration template
└── .env # Configuration (created)
Implementation Summary
Phase 1: Complete ✅
- Configuration management (
config.py) - Updated
requirements.txtwith new dependencies - Created
.envand.env.example
Phase 2: Complete ✅
- Database module (
db/)oracle_connector.py- Connection poolingmodels.py- Data modelsrepository.py- CRUD operations
- Supports batch inserts and duplicate detection
Phase 3: Complete ✅
- SFTP module (
sftp/)sftp_client.py- File operationsfile_monitor.py- Multi-bank file discovery
- Supports file listing, download, and parsing filenames
Phase 4: Complete ✅
- Processing module (
processors/)data_mapper.py- Field transformationfile_processor.py- End-to-end processing
- Transaction safety with database commit/rollback
Phase 5: Complete ✅
scheduler.py- 30-minute polling with graceful shutdownmain.py- Updated entry point
Phase 6: Complete ✅
- Error handling throughout all modules
- Duplicate detection by filename
- Failed file tracking in database
Key Features
1. Field Mapping
Transforms parser output to database format:
remarks→narrationsys→statusjrnl_no→jrnl_iddate(DD/MM/YY) →tran_date(DATE)cust_acct→cbs_acctamount→tran_amt(absolute value)amount→TXNIND('CR' for >=0, 'DR' for <0)
2. Duplicate Detection
Files are tracked in ach_processed_files table with:
- Unique constraint on filename
- Bank code, file path, transaction count
- Status and error message fields
3. Error Handling
- SFTP connection failures → logged and retried
- Parse errors → file marked as failed
- Database errors → transaction rollback
- Graceful shutdown on SIGTERM/SIGINT
4. Batch Processing
- Configurable batch size (default: 100)
- Reduces database round-trips
- Transaction safety
Configuration
Environment Variables (.env)
# Database
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB
# SFTP
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USERNAME=ipks
SFTP_PASSWORD=ipks_password
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS
# Processing
POLL_INTERVAL_MINUTES=30
BATCH_SIZE=100
BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB
# Logging
LOG_LEVEL=INFO
Setup Instructions
1. Install Dependencies
pip install -r requirements.txt
2. Oracle Client Setup (Required)
# Download and install Oracle Instant Client
wget https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
unzip instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
sudo mv instantclient_21_12 /opt/oracle/
echo '/opt/oracle/instantclient_21_12' | sudo tee /etc/ld.so.conf.d/oracle.conf
sudo ldconfig
export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH
3. Database Setup
Before running, ensure these tables exist in Oracle:
-- ACH transaction log (existing table - must already exist)
CREATE TABLE ach_api_log (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
narration VARCHAR2(500),
status VARCHAR2(100),
bankcode VARCHAR2(20),
jrnl_id VARCHAR2(50),
tran_date DATE,
cbs_acct VARCHAR2(50),
tran_amt NUMBER(15, 2),
TXNIND VARCHAR2(2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_ach_jrnl_id ON ach_api_log(jrnl_id);
CREATE INDEX idx_ach_bankcode ON ach_api_log(bankcode);
-- Processed files log (created by application)
CREATE TABLE ach_processed_files (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
filename VARCHAR2(500) UNIQUE NOT NULL,
bankcode VARCHAR2(20) NOT NULL,
file_path VARCHAR2(1000),
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
transaction_count NUMBER,
status VARCHAR2(20) DEFAULT 'SUCCESS',
error_message VARCHAR2(2000)
);
CREATE INDEX idx_processed_filename ON ach_processed_files(filename);
4. Configuration
Edit .env with your environment:
cp .env.example .env
# Edit .env with production values
Testing
Unit Tests
# Run all tests
pytest tests/ -v
# Run specific test file
pytest tests/test_data_mapper.py -v
# Run with coverage
pytest tests/ --cov=processors --cov=db --cov=sftp
Integration Testing with Mock SFTP
Option 1: Docker (Recommended)
# Create SFTP directory structure
mkdir -p sftp_data/HDFC/NACH
mkdir -p sftp_data/ICICI/NACH
mkdir -p sftp_data/SBI/NACH
# Copy sample ACH file
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
# Start mock SFTP server
docker-compose up -d
# Verify connection
sftp -P 2222 ipks@127.0.0.1
# Password: ipks_password
# Commands: ls, cd, etc.
# Run application
python main.py
# Stop SFTP server
docker-compose down
Option 2: Manual SFTP Setup
If you have your own SFTP server, update .env:
SFTP_HOST=your.sftp.server
SFTP_PORT=22
SFTP_USERNAME=your_user
SFTP_PASSWORD=your_password
Running the Application
Development Mode (Manual)
python main.py
The scheduler will:
- Connect to database and SFTP
- Scan all bank directories every 30 minutes
- Download new ACH files
- Parse transactions
- Insert to database
- Mark files as processed
- Clean up local files
Production Mode (Systemd Service)
Create /etc/systemd/system/ach_processor.service:
[Unit]
Description=ACH File Processor
After=network.target
[Service]
Type=simple
User=appuser
WorkingDirectory=/opt/ach_processor
Environment="PATH=/opt/ach_processor/venv/bin"
Environment="LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH"
ExecStart=/opt/ach_processor/venv/bin/python main.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Then:
sudo systemctl daemon-reload
sudo systemctl enable ach_processor
sudo systemctl start ach_processor
sudo systemctl status ach_processor
# View logs
journalctl -u ach_processor -f
Verification Checklist
Before deployment, verify:
- Oracle Instant Client installed and LD_LIBRARY_PATH set
- Oracle database accessible (test with SQL*Plus)
ach_api_logtable exists and is accessible- SFTP credentials configured correctly
- Mock SFTP server running (for testing)
- Sample ACH file in test SFTP directory
- Unit tests passing:
pytest tests/ -v - Application can connect to database
- Application can connect to SFTP
- Application processes sample file successfully
- Duplicate detection prevents reprocessing
- Log files are created in
logs/directory - Graceful shutdown works with CTRL+C
Troubleshooting
Database Connection Issues
# Test Oracle connection
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB
# Check LD_LIBRARY_PATH
echo $LD_LIBRARY_PATH
# Verify cx_Oracle installation
python -c "import cx_Oracle; print(cx_Oracle.version)"
SFTP Connection Issues
# Test SFTP connection manually
sftp -P 2222 ipks@127.0.0.1
# Enable debug in logs
# Change LOG_LEVEL=DEBUG in .env
File Processing Issues
Check logs:
tail -f logs/app.log
# Look for:
# - "Connected to SFTP server"
# - "Found X files matching pattern"
# - "Successfully processed"
# - Error messages with stack traces
Module Documentation
config.py
Loads and validates environment variables from .env file.
get_config()- Get global Config instanceconfig.validate()- Validate required settings
db/oracle_connector.py
Manages Oracle database connection pooling.
OracleConnectorclass with connection pool managementget_connector()- Get global connector instance- Supports context manager usage
db/repository.py
Data access layer with CRUD operations.
bulk_insert_transactions()- Batch insert to ach_api_logis_file_processed()- Check duplicate by filenamemark_file_processed()- Track processed filesget_processed_files()- List processed filenamescreate_tables()- Initialize database schema
sftp/sftp_client.py
SFTP client for file operations.
connect()/disconnect()- Connection managementlist_files()- Find files by patterndownload_file()- Download from SFTPget_file_size()- Check file size
sftp/file_monitor.py
File discovery and monitoring.
scan_for_new_files()- Find new files across banksparse_filename()- Extract metadata from filename
processors/data_mapper.py
Field transformation and mapping.
convert_date()- DD/MM/YY → datecalculate_txnind()- CR/DR logicconvert_amount()- String → Decimalmap_transaction()- Single transaction mappingmap_transactions()- Batch mapping
processors/file_processor.py
End-to-end file processing orchestration.
process_file()- Download → Parse → Map → Insert → Markprocess_files()- Process multiple files with stats
scheduler.py
Main polling scheduler.
run()- Start scheduler looprun_processing_cycle()- Execute one processing cycle- Graceful shutdown on SIGTERM/SIGINT
Performance Considerations
-
Batch Inserts: Configured to insert 100 records per batch
- Adjust
BATCH_SIZEin.envfor your database capacity
- Adjust
-
Connection Pooling: Min=2, Max=10 connections
- Adjust
DB_POOL_MIN/MAXfor concurrent load
- Adjust
-
Polling Interval: Default 30 minutes
- Change
POLL_INTERVAL_MINUTESfor more frequent checks
- Change
-
SFTP Timeout: 10 seconds for connection
- Modify in
sftp_client.pyif needed
- Modify in
Log Output Example
2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
2026-01-30 12:00:00 - scheduler - INFO - ACH File Processing Scheduler Started
2026-01-30 12:00:00 - scheduler - INFO - Poll Interval: 30 minutes
2026-01-30 12:00:00 - scheduler - INFO - Bank Codes: HDFC, ICICI, SBI
2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
2026-01-30 12:00:01 - db.oracle_connector - INFO - Oracle connection pool initialized
2026-01-30 12:00:01 - db.oracle_connector - INFO - Database connection test successful
2026-01-30 12:00:01 - scheduler - INFO - === Starting processing cycle 1 ===
2026-01-30 12:00:02 - sftp.sftp_client - INFO - Connected to SFTP server
2026-01-30 12:00:03 - sftp.file_monitor - INFO - Found 2 new files
2026-01-30 12:00:05 - processors.file_processor - INFO - Successfully processed ACH_99944_19012026103217_001.txt
2026-01-30 12:00:05 - scheduler - INFO - Cycle 1 complete: Total: 2, Successful: 2, Failed: 0
Future Enhancements
- Parallel File Processing: Process multiple files concurrently
- Dead Letter Queue: Store failed files for manual review
- Email Notifications: Alert on processing errors
- Database Auditing: Track all changes with timestamps
- File Archival: Archive processed files to S3 or backup storage
- Metrics Export: Prometheus metrics for monitoring
Support
For issues or questions:
- Check logs in
logs/app.log - Enable
LOG_LEVEL=DEBUGin.env - Review traceback for specific errors
- Check database connectivity with
sqlplus - Test SFTP with
sftpcommand-line tool