Files
ach_ui_dbtl_file_based/IMPLEMENTATION.md
2026-02-02 13:06:07 +05:30

12 KiB

ACH File Processing Pipeline - Implementation Guide

Project Structure

ach_ui_dbtl_file_based/
├── config.py                  # Configuration management
├── scheduler.py               # 30-minute polling scheduler
├── main.py                    # Application entry point
├── ach_parser.py             # Existing ACH parser
├── logging_config.py         # Existing logging setup
├── db/
│   ├── __init__.py
│   ├── oracle_connector.py   # Database connection pooling
│   ├── models.py             # Data models
│   └── repository.py         # Data access layer
├── sftp/
│   ├── __init__.py
│   ├── sftp_client.py        # SFTP operations
│   └── file_monitor.py       # File discovery
├── processors/
│   ├── __init__.py
│   ├── data_mapper.py        # Field transformation
│   └── file_processor.py     # File processing orchestration
├── tests/
│   ├── __init__.py
│   ├── test_data_mapper.py
│   └── test_file_monitor.py
├── docker-compose.yml         # Mock SFTP server
├── requirements.txt          # Dependencies
├── .env.example              # Configuration template
└── .env                      # Configuration (created)

Implementation Summary

Phase 1: Complete

  • Configuration management (config.py)
  • Updated requirements.txt with new dependencies
  • Created .env and .env.example

Phase 2: Complete

  • Database module (db/)
    • oracle_connector.py - Connection pooling
    • models.py - Data models
    • repository.py - CRUD operations
  • Supports batch inserts and duplicate detection

Phase 3: Complete

  • SFTP module (sftp/)
    • sftp_client.py - File operations
    • file_monitor.py - Multi-bank file discovery
  • Supports file listing, download, and parsing filenames

Phase 4: Complete

  • Processing module (processors/)
    • data_mapper.py - Field transformation
    • file_processor.py - End-to-end processing
  • Transaction safety with database commit/rollback

Phase 5: Complete

  • scheduler.py - 30-minute polling with graceful shutdown
  • main.py - Updated entry point

Phase 6: Complete

  • Error handling throughout all modules
  • Duplicate detection by filename
  • Failed file tracking in database

Key Features

1. Field Mapping

Transforms parser output to database format:

  • remarksnarration
  • sysstatus
  • jrnl_nojrnl_id
  • date (DD/MM/YY) → tran_date (DATE)
  • cust_acctcbs_acct
  • amounttran_amt (absolute value)
  • amountTXNIND ('CR' for >=0, 'DR' for <0)

2. Duplicate Detection

Files are tracked in ach_processed_files table with:

  • Unique constraint on filename
  • Bank code, file path, transaction count
  • Status and error message fields

3. Error Handling

  • SFTP connection failures → logged and retried
  • Parse errors → file marked as failed
  • Database errors → transaction rollback
  • Graceful shutdown on SIGTERM/SIGINT

4. Batch Processing

  • Configurable batch size (default: 100)
  • Reduces database round-trips
  • Transaction safety

Configuration

Environment Variables (.env)

# Database
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB

# SFTP
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USERNAME=ipks
SFTP_PASSWORD=ipks_password
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS

# Processing
POLL_INTERVAL_MINUTES=30
BATCH_SIZE=100
BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB

# Logging
LOG_LEVEL=INFO

Setup Instructions

1. Install Dependencies

pip install -r requirements.txt

2. Oracle Client Setup (Required)

# Download and install Oracle Instant Client
wget https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
unzip instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
sudo mv instantclient_21_12 /opt/oracle/
echo '/opt/oracle/instantclient_21_12' | sudo tee /etc/ld.so.conf.d/oracle.conf
sudo ldconfig
export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH

3. Database Setup

Before running, ensure these tables exist in Oracle:

-- ACH transaction log (existing table - must already exist)
CREATE TABLE ach_api_log (
    id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    narration VARCHAR2(500),
    status VARCHAR2(100),
    bankcode VARCHAR2(20),
    jrnl_id VARCHAR2(50),
    tran_date DATE,
    cbs_acct VARCHAR2(50),
    tran_amt NUMBER(15, 2),
    TXNIND VARCHAR2(2),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_ach_jrnl_id ON ach_api_log(jrnl_id);
CREATE INDEX idx_ach_bankcode ON ach_api_log(bankcode);

-- Processed files log (created by application)
CREATE TABLE ach_processed_files (
    id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    filename VARCHAR2(500) UNIQUE NOT NULL,
    bankcode VARCHAR2(20) NOT NULL,
    file_path VARCHAR2(1000),
    processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    transaction_count NUMBER,
    status VARCHAR2(20) DEFAULT 'SUCCESS',
    error_message VARCHAR2(2000)
);

CREATE INDEX idx_processed_filename ON ach_processed_files(filename);

4. Configuration

Edit .env with your environment:

cp .env.example .env
# Edit .env with production values

Testing

Unit Tests

# Run all tests
pytest tests/ -v

# Run specific test file
pytest tests/test_data_mapper.py -v

# Run with coverage
pytest tests/ --cov=processors --cov=db --cov=sftp

Integration Testing with Mock SFTP

# Create SFTP directory structure
mkdir -p sftp_data/HDFC/NACH
mkdir -p sftp_data/ICICI/NACH
mkdir -p sftp_data/SBI/NACH

# Copy sample ACH file
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/

# Start mock SFTP server
docker-compose up -d

# Verify connection
sftp -P 2222 ipks@127.0.0.1
# Password: ipks_password
# Commands: ls, cd, etc.

# Run application
python main.py

# Stop SFTP server
docker-compose down

Option 2: Manual SFTP Setup

If you have your own SFTP server, update .env:

SFTP_HOST=your.sftp.server
SFTP_PORT=22
SFTP_USERNAME=your_user
SFTP_PASSWORD=your_password

Running the Application

Development Mode (Manual)

python main.py

The scheduler will:

  1. Connect to database and SFTP
  2. Scan all bank directories every 30 minutes
  3. Download new ACH files
  4. Parse transactions
  5. Insert to database
  6. Mark files as processed
  7. Clean up local files

Production Mode (Systemd Service)

Create /etc/systemd/system/ach_processor.service:

[Unit]
Description=ACH File Processor
After=network.target

[Service]
Type=simple
User=appuser
WorkingDirectory=/opt/ach_processor
Environment="PATH=/opt/ach_processor/venv/bin"
Environment="LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH"
ExecStart=/opt/ach_processor/venv/bin/python main.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

Then:

sudo systemctl daemon-reload
sudo systemctl enable ach_processor
sudo systemctl start ach_processor
sudo systemctl status ach_processor

# View logs
journalctl -u ach_processor -f

Verification Checklist

Before deployment, verify:

  • Oracle Instant Client installed and LD_LIBRARY_PATH set
  • Oracle database accessible (test with SQL*Plus)
  • ach_api_log table exists and is accessible
  • SFTP credentials configured correctly
  • Mock SFTP server running (for testing)
  • Sample ACH file in test SFTP directory
  • Unit tests passing: pytest tests/ -v
  • Application can connect to database
  • Application can connect to SFTP
  • Application processes sample file successfully
  • Duplicate detection prevents reprocessing
  • Log files are created in logs/ directory
  • Graceful shutdown works with CTRL+C

Troubleshooting

Database Connection Issues

# Test Oracle connection
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB

# Check LD_LIBRARY_PATH
echo $LD_LIBRARY_PATH

# Verify cx_Oracle installation
python -c "import cx_Oracle; print(cx_Oracle.version)"

SFTP Connection Issues

# Test SFTP connection manually
sftp -P 2222 ipks@127.0.0.1

# Enable debug in logs
# Change LOG_LEVEL=DEBUG in .env

File Processing Issues

Check logs:

tail -f logs/app.log

# Look for:
# - "Connected to SFTP server"
# - "Found X files matching pattern"
# - "Successfully processed"
# - Error messages with stack traces

Module Documentation

config.py

Loads and validates environment variables from .env file.

  • get_config() - Get global Config instance
  • config.validate() - Validate required settings

db/oracle_connector.py

Manages Oracle database connection pooling.

  • OracleConnector class with connection pool management
  • get_connector() - Get global connector instance
  • Supports context manager usage

db/repository.py

Data access layer with CRUD operations.

  • bulk_insert_transactions() - Batch insert to ach_api_log
  • is_file_processed() - Check duplicate by filename
  • mark_file_processed() - Track processed files
  • get_processed_files() - List processed filenames
  • create_tables() - Initialize database schema

sftp/sftp_client.py

SFTP client for file operations.

  • connect() / disconnect() - Connection management
  • list_files() - Find files by pattern
  • download_file() - Download from SFTP
  • get_file_size() - Check file size

sftp/file_monitor.py

File discovery and monitoring.

  • scan_for_new_files() - Find new files across banks
  • parse_filename() - Extract metadata from filename

processors/data_mapper.py

Field transformation and mapping.

  • convert_date() - DD/MM/YY → date
  • calculate_txnind() - CR/DR logic
  • convert_amount() - String → Decimal
  • map_transaction() - Single transaction mapping
  • map_transactions() - Batch mapping

processors/file_processor.py

End-to-end file processing orchestration.

  • process_file() - Download → Parse → Map → Insert → Mark
  • process_files() - Process multiple files with stats

scheduler.py

Main polling scheduler.

  • run() - Start scheduler loop
  • run_processing_cycle() - Execute one processing cycle
  • Graceful shutdown on SIGTERM/SIGINT

Performance Considerations

  1. Batch Inserts: Configured to insert 100 records per batch

    • Adjust BATCH_SIZE in .env for your database capacity
  2. Connection Pooling: Min=2, Max=10 connections

    • Adjust DB_POOL_MIN/MAX for concurrent load
  3. Polling Interval: Default 30 minutes

    • Change POLL_INTERVAL_MINUTES for more frequent checks
  4. SFTP Timeout: 10 seconds for connection

    • Modify in sftp_client.py if needed

Log Output Example

2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
2026-01-30 12:00:00 - scheduler - INFO - ACH File Processing Scheduler Started
2026-01-30 12:00:00 - scheduler - INFO - Poll Interval: 30 minutes
2026-01-30 12:00:00 - scheduler - INFO - Bank Codes: HDFC, ICICI, SBI
2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
2026-01-30 12:00:01 - db.oracle_connector - INFO - Oracle connection pool initialized
2026-01-30 12:00:01 - db.oracle_connector - INFO - Database connection test successful
2026-01-30 12:00:01 - scheduler - INFO - === Starting processing cycle 1 ===
2026-01-30 12:00:02 - sftp.sftp_client - INFO - Connected to SFTP server
2026-01-30 12:00:03 - sftp.file_monitor - INFO - Found 2 new files
2026-01-30 12:00:05 - processors.file_processor - INFO - Successfully processed ACH_99944_19012026103217_001.txt
2026-01-30 12:00:05 - scheduler - INFO - Cycle 1 complete: Total: 2, Successful: 2, Failed: 0

Future Enhancements

  1. Parallel File Processing: Process multiple files concurrently
  2. Dead Letter Queue: Store failed files for manual review
  3. Email Notifications: Alert on processing errors
  4. Database Auditing: Track all changes with timestamps
  5. File Archival: Archive processed files to S3 or backup storage
  6. Metrics Export: Prometheus metrics for monitoring

Support

For issues or questions:

  1. Check logs in logs/app.log
  2. Enable LOG_LEVEL=DEBUG in .env
  3. Review traceback for specific errors
  4. Check database connectivity with sqlplus
  5. Test SFTP with sftp command-line tool