455 lines
12 KiB
Markdown
455 lines
12 KiB
Markdown
# ACH File Processing Pipeline - Implementation Guide
|
|
|
|
## Project Structure
|
|
|
|
```
|
|
ach_ui_dbtl_file_based/
|
|
├── config.py # Configuration management
|
|
├── scheduler.py # 30-minute polling scheduler
|
|
├── main.py # Application entry point
|
|
├── ach_parser.py # Existing ACH parser
|
|
├── logging_config.py # Existing logging setup
|
|
├── db/
|
|
│ ├── __init__.py
|
|
│ ├── oracle_connector.py # Database connection pooling
|
|
│ ├── models.py # Data models
|
|
│ └── repository.py # Data access layer
|
|
├── sftp/
|
|
│ ├── __init__.py
|
|
│ ├── sftp_client.py # SFTP operations
|
|
│ └── file_monitor.py # File discovery
|
|
├── processors/
|
|
│ ├── __init__.py
|
|
│ ├── data_mapper.py # Field transformation
|
|
│ └── file_processor.py # File processing orchestration
|
|
├── tests/
|
|
│ ├── __init__.py
|
|
│ ├── test_data_mapper.py
|
|
│ └── test_file_monitor.py
|
|
├── docker-compose.yml # Mock SFTP server
|
|
├── requirements.txt # Dependencies
|
|
├── .env.example # Configuration template
|
|
└── .env # Configuration (created)
|
|
```
|
|
|
|
## Implementation Summary
|
|
|
|
### Phase 1: Complete ✅
|
|
- Configuration management (`config.py`)
|
|
- Updated `requirements.txt` with new dependencies
|
|
- Created `.env` and `.env.example`
|
|
|
|
### Phase 2: Complete ✅
|
|
- Database module (`db/`)
|
|
- `oracle_connector.py` - Connection pooling
|
|
- `models.py` - Data models
|
|
- `repository.py` - CRUD operations
|
|
- Supports batch inserts and duplicate detection
|
|
|
|
### Phase 3: Complete ✅
|
|
- SFTP module (`sftp/`)
|
|
- `sftp_client.py` - File operations
|
|
- `file_monitor.py` - Multi-bank file discovery
|
|
- Supports file listing, download, and parsing filenames
|
|
|
|
### Phase 4: Complete ✅
|
|
- Processing module (`processors/`)
|
|
- `data_mapper.py` - Field transformation
|
|
- `file_processor.py` - End-to-end processing
|
|
- Transaction safety with database commit/rollback
|
|
|
|
### Phase 5: Complete ✅
|
|
- `scheduler.py` - 30-minute polling with graceful shutdown
|
|
- `main.py` - Updated entry point
|
|
|
|
### Phase 6: Complete ✅
|
|
- Error handling throughout all modules
|
|
- Duplicate detection by filename
|
|
- Failed file tracking in database
|
|
|
|
## Key Features
|
|
|
|
### 1. Field Mapping
|
|
Transforms parser output to database format:
|
|
- `remarks` → `narration`
|
|
- `sys` → `status`
|
|
- `jrnl_no` → `jrnl_id`
|
|
- `date` (DD/MM/YY) → `tran_date` (DATE)
|
|
- `cust_acct` → `cbs_acct`
|
|
- `amount` → `tran_amt` (absolute value)
|
|
- `amount` → `TXNIND` ('CR' for >=0, 'DR' for <0)
|
|
|
|
### 2. Duplicate Detection
|
|
Files are tracked in `ach_processed_files` table with:
|
|
- Unique constraint on filename
|
|
- Bank code, file path, transaction count
|
|
- Status and error message fields
|
|
|
|
### 3. Error Handling
|
|
- SFTP connection failures → logged and retried
|
|
- Parse errors → file marked as failed
|
|
- Database errors → transaction rollback
|
|
- Graceful shutdown on SIGTERM/SIGINT
|
|
|
|
### 4. Batch Processing
|
|
- Configurable batch size (default: 100)
|
|
- Reduces database round-trips
|
|
- Transaction safety
|
|
|
|
## Configuration
|
|
|
|
### Environment Variables (.env)
|
|
|
|
```
|
|
# Database
|
|
DB_USER=pacs_db
|
|
DB_PASSWORD=pacs_db
|
|
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
|
|
DB_PORT=1521
|
|
DB_SERVICE_NAME=IPKSDB
|
|
|
|
# SFTP
|
|
SFTP_HOST=127.0.0.1
|
|
SFTP_PORT=2222
|
|
SFTP_USERNAME=ipks
|
|
SFTP_PASSWORD=ipks_password
|
|
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS
|
|
|
|
# Processing
|
|
POLL_INTERVAL_MINUTES=30
|
|
BATCH_SIZE=100
|
|
BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB
|
|
|
|
# Logging
|
|
LOG_LEVEL=INFO
|
|
```
|
|
|
|
## Setup Instructions
|
|
|
|
### 1. Install Dependencies
|
|
|
|
```bash
|
|
pip install -r requirements.txt
|
|
```
|
|
|
|
### 2. Oracle Client Setup (Required)
|
|
|
|
```bash
|
|
# Download and install Oracle Instant Client
|
|
wget https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
|
|
unzip instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
|
|
sudo mv instantclient_21_12 /opt/oracle/
|
|
echo '/opt/oracle/instantclient_21_12' | sudo tee /etc/ld.so.conf.d/oracle.conf
|
|
sudo ldconfig
|
|
export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH
|
|
```
|
|
|
|
### 3. Database Setup
|
|
|
|
Before running, ensure these tables exist in Oracle:
|
|
|
|
```sql
|
|
-- ACH transaction log (existing table - must already exist)
|
|
CREATE TABLE ach_api_log (
|
|
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
|
narration VARCHAR2(500),
|
|
status VARCHAR2(100),
|
|
bankcode VARCHAR2(20),
|
|
jrnl_id VARCHAR2(50),
|
|
tran_date DATE,
|
|
cbs_acct VARCHAR2(50),
|
|
tran_amt NUMBER(15, 2),
|
|
TXNIND VARCHAR2(2),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE INDEX idx_ach_jrnl_id ON ach_api_log(jrnl_id);
|
|
CREATE INDEX idx_ach_bankcode ON ach_api_log(bankcode);
|
|
|
|
-- Processed files log (created by application)
|
|
CREATE TABLE ach_processed_files (
|
|
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
|
filename VARCHAR2(500) UNIQUE NOT NULL,
|
|
bankcode VARCHAR2(20) NOT NULL,
|
|
file_path VARCHAR2(1000),
|
|
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
transaction_count NUMBER,
|
|
status VARCHAR2(20) DEFAULT 'SUCCESS',
|
|
error_message VARCHAR2(2000)
|
|
);
|
|
|
|
CREATE INDEX idx_processed_filename ON ach_processed_files(filename);
|
|
```
|
|
|
|
### 4. Configuration
|
|
|
|
Edit `.env` with your environment:
|
|
|
|
```bash
|
|
cp .env.example .env
|
|
# Edit .env with production values
|
|
```
|
|
|
|
## Testing
|
|
|
|
### Unit Tests
|
|
|
|
```bash
|
|
# Run all tests
|
|
pytest tests/ -v
|
|
|
|
# Run specific test file
|
|
pytest tests/test_data_mapper.py -v
|
|
|
|
# Run with coverage
|
|
pytest tests/ --cov=processors --cov=db --cov=sftp
|
|
```
|
|
|
|
### Integration Testing with Mock SFTP
|
|
|
|
#### Option 1: Docker (Recommended)
|
|
|
|
```bash
|
|
# Create SFTP directory structure
|
|
mkdir -p sftp_data/HDFC/NACH
|
|
mkdir -p sftp_data/ICICI/NACH
|
|
mkdir -p sftp_data/SBI/NACH
|
|
|
|
# Copy sample ACH file
|
|
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
|
|
|
|
# Start mock SFTP server
|
|
docker-compose up -d
|
|
|
|
# Verify connection
|
|
sftp -P 2222 ipks@127.0.0.1
|
|
# Password: ipks_password
|
|
# Commands: ls, cd, etc.
|
|
|
|
# Run application
|
|
python main.py
|
|
|
|
# Stop SFTP server
|
|
docker-compose down
|
|
```
|
|
|
|
#### Option 2: Manual SFTP Setup
|
|
|
|
If you have your own SFTP server, update `.env`:
|
|
|
|
```bash
|
|
SFTP_HOST=your.sftp.server
|
|
SFTP_PORT=22
|
|
SFTP_USERNAME=your_user
|
|
SFTP_PASSWORD=your_password
|
|
```
|
|
|
|
## Running the Application
|
|
|
|
### Development Mode (Manual)
|
|
|
|
```bash
|
|
python main.py
|
|
```
|
|
|
|
The scheduler will:
|
|
1. Connect to database and SFTP
|
|
2. Scan all bank directories every 30 minutes
|
|
3. Download new ACH files
|
|
4. Parse transactions
|
|
5. Insert to database
|
|
6. Mark files as processed
|
|
7. Clean up local files
|
|
|
|
### Production Mode (Systemd Service)
|
|
|
|
Create `/etc/systemd/system/ach_processor.service`:
|
|
|
|
```ini
|
|
[Unit]
|
|
Description=ACH File Processor
|
|
After=network.target
|
|
|
|
[Service]
|
|
Type=simple
|
|
User=appuser
|
|
WorkingDirectory=/opt/ach_processor
|
|
Environment="PATH=/opt/ach_processor/venv/bin"
|
|
Environment="LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH"
|
|
ExecStart=/opt/ach_processor/venv/bin/python main.py
|
|
Restart=always
|
|
RestartSec=10
|
|
|
|
[Install]
|
|
WantedBy=multi-user.target
|
|
```
|
|
|
|
Then:
|
|
|
|
```bash
|
|
sudo systemctl daemon-reload
|
|
sudo systemctl enable ach_processor
|
|
sudo systemctl start ach_processor
|
|
sudo systemctl status ach_processor
|
|
|
|
# View logs
|
|
journalctl -u ach_processor -f
|
|
```
|
|
|
|
## Verification Checklist
|
|
|
|
Before deployment, verify:
|
|
|
|
- [ ] Oracle Instant Client installed and LD_LIBRARY_PATH set
|
|
- [ ] Oracle database accessible (test with SQL*Plus)
|
|
- [ ] `ach_api_log` table exists and is accessible
|
|
- [ ] SFTP credentials configured correctly
|
|
- [ ] Mock SFTP server running (for testing)
|
|
- [ ] Sample ACH file in test SFTP directory
|
|
- [ ] Unit tests passing: `pytest tests/ -v`
|
|
- [ ] Application can connect to database
|
|
- [ ] Application can connect to SFTP
|
|
- [ ] Application processes sample file successfully
|
|
- [ ] Duplicate detection prevents reprocessing
|
|
- [ ] Log files are created in `logs/` directory
|
|
- [ ] Graceful shutdown works with CTRL+C
|
|
|
|
## Troubleshooting
|
|
|
|
### Database Connection Issues
|
|
|
|
```bash
|
|
# Test Oracle connection
|
|
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB
|
|
|
|
# Check LD_LIBRARY_PATH
|
|
echo $LD_LIBRARY_PATH
|
|
|
|
# Verify cx_Oracle installation
|
|
python -c "import cx_Oracle; print(cx_Oracle.version)"
|
|
```
|
|
|
|
### SFTP Connection Issues
|
|
|
|
```bash
|
|
# Test SFTP connection manually
|
|
sftp -P 2222 ipks@127.0.0.1
|
|
|
|
# Enable debug in logs
|
|
# Change LOG_LEVEL=DEBUG in .env
|
|
```
|
|
|
|
### File Processing Issues
|
|
|
|
Check logs:
|
|
```bash
|
|
tail -f logs/app.log
|
|
|
|
# Look for:
|
|
# - "Connected to SFTP server"
|
|
# - "Found X files matching pattern"
|
|
# - "Successfully processed"
|
|
# - Error messages with stack traces
|
|
```
|
|
|
|
## Module Documentation
|
|
|
|
### config.py
|
|
Loads and validates environment variables from `.env` file.
|
|
- `get_config()` - Get global Config instance
|
|
- `config.validate()` - Validate required settings
|
|
|
|
### db/oracle_connector.py
|
|
Manages Oracle database connection pooling.
|
|
- `OracleConnector` class with connection pool management
|
|
- `get_connector()` - Get global connector instance
|
|
- Supports context manager usage
|
|
|
|
### db/repository.py
|
|
Data access layer with CRUD operations.
|
|
- `bulk_insert_transactions()` - Batch insert to ach_api_log
|
|
- `is_file_processed()` - Check duplicate by filename
|
|
- `mark_file_processed()` - Track processed files
|
|
- `get_processed_files()` - List processed filenames
|
|
- `create_tables()` - Initialize database schema
|
|
|
|
### sftp/sftp_client.py
|
|
SFTP client for file operations.
|
|
- `connect()` / `disconnect()` - Connection management
|
|
- `list_files()` - Find files by pattern
|
|
- `download_file()` - Download from SFTP
|
|
- `get_file_size()` - Check file size
|
|
|
|
### sftp/file_monitor.py
|
|
File discovery and monitoring.
|
|
- `scan_for_new_files()` - Find new files across banks
|
|
- `parse_filename()` - Extract metadata from filename
|
|
|
|
### processors/data_mapper.py
|
|
Field transformation and mapping.
|
|
- `convert_date()` - DD/MM/YY → date
|
|
- `calculate_txnind()` - CR/DR logic
|
|
- `convert_amount()` - String → Decimal
|
|
- `map_transaction()` - Single transaction mapping
|
|
- `map_transactions()` - Batch mapping
|
|
|
|
### processors/file_processor.py
|
|
End-to-end file processing orchestration.
|
|
- `process_file()` - Download → Parse → Map → Insert → Mark
|
|
- `process_files()` - Process multiple files with stats
|
|
|
|
### scheduler.py
|
|
Main polling scheduler.
|
|
- `run()` - Start scheduler loop
|
|
- `run_processing_cycle()` - Execute one processing cycle
|
|
- Graceful shutdown on SIGTERM/SIGINT
|
|
|
|
## Performance Considerations
|
|
|
|
1. **Batch Inserts**: Configured to insert 100 records per batch
|
|
- Adjust `BATCH_SIZE` in `.env` for your database capacity
|
|
|
|
2. **Connection Pooling**: Min=2, Max=10 connections
|
|
- Adjust `DB_POOL_MIN/MAX` for concurrent load
|
|
|
|
3. **Polling Interval**: Default 30 minutes
|
|
- Change `POLL_INTERVAL_MINUTES` for more frequent checks
|
|
|
|
4. **SFTP Timeout**: 10 seconds for connection
|
|
- Modify in `sftp_client.py` if needed
|
|
|
|
## Log Output Example
|
|
|
|
```
|
|
2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
|
|
2026-01-30 12:00:00 - scheduler - INFO - ACH File Processing Scheduler Started
|
|
2026-01-30 12:00:00 - scheduler - INFO - Poll Interval: 30 minutes
|
|
2026-01-30 12:00:00 - scheduler - INFO - Bank Codes: HDFC, ICICI, SBI
|
|
2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
|
|
2026-01-30 12:00:01 - db.oracle_connector - INFO - Oracle connection pool initialized
|
|
2026-01-30 12:00:01 - db.oracle_connector - INFO - Database connection test successful
|
|
2026-01-30 12:00:01 - scheduler - INFO - === Starting processing cycle 1 ===
|
|
2026-01-30 12:00:02 - sftp.sftp_client - INFO - Connected to SFTP server
|
|
2026-01-30 12:00:03 - sftp.file_monitor - INFO - Found 2 new files
|
|
2026-01-30 12:00:05 - processors.file_processor - INFO - Successfully processed ACH_99944_19012026103217_001.txt
|
|
2026-01-30 12:00:05 - scheduler - INFO - Cycle 1 complete: Total: 2, Successful: 2, Failed: 0
|
|
```
|
|
|
|
## Future Enhancements
|
|
|
|
1. **Parallel File Processing**: Process multiple files concurrently
|
|
2. **Dead Letter Queue**: Store failed files for manual review
|
|
3. **Email Notifications**: Alert on processing errors
|
|
4. **Database Auditing**: Track all changes with timestamps
|
|
5. **File Archival**: Archive processed files to S3 or backup storage
|
|
6. **Metrics Export**: Prometheus metrics for monitoring
|
|
|
|
## Support
|
|
|
|
For issues or questions:
|
|
1. Check logs in `logs/app.log`
|
|
2. Enable `LOG_LEVEL=DEBUG` in `.env`
|
|
3. Review traceback for specific errors
|
|
4. Check database connectivity with `sqlplus`
|
|
5. Test SFTP with `sftp` command-line tool
|