deleted unncessary files

This commit is contained in:
2026-02-12 00:12:54 +05:30
parent 51ed953830
commit 952dfa12ed
21 changed files with 0 additions and 7438 deletions

1
0
View File

@@ -1 +0,0 @@
1

View File

@@ -1,306 +0,0 @@
# Summary of Changes: cx_Oracle → oracledb
## What Changed
### 1. Dependencies (requirements.txt)
**Before:**
```txt
cx_Oracle==8.3.0
```
**After:**
```txt
oracledb==2.0.0
```
### 2. Database Connector (db/oracle_connector.py)
**Key Changes:**
- Import statement: `cx_Oracle``oracledb`
- Pool creation: `cx_Oracle.SessionPool()``oracledb.create_pool()`
- Exception handling: `cx_Oracle.DatabaseError``oracledb.DatabaseError`
- Added Thin mode initialization (optional)
**Code Example:**
Before:
```python
import cx_Oracle
pool = cx_Oracle.SessionPool(user='...', password='...', dsn='...')
```
After:
```python
import oracledb
pool = oracledb.create_pool(user='...', password='...', dsn='...')
```
### 3. Repository (db/repository.py)
**Updated exception handling** to work with oracledb instead of cx_Oracle
### 4. Documentation
Added new guides:
- `QUICK_INSTALL.md` - 5-minute setup (vs 15+ minutes before)
- `ORACLEDB_MIGRATION.md` - Complete migration reference
---
## Why This is Better
### Installation Time
| Step | cx_Oracle | oracledb |
|------|-----------|----------|
| pip install | 5 min | 2 min |
| Download Oracle Client | 10 min | — |
| Install Oracle Client | 5 min | — |
| Configure environment | 5 min | — |
| Troubleshoot errors | ? | — |
| **Total** | **15+ min** | **2 min** |
### Setup Complexity
**cx_Oracle Setup Checklist:**
- [ ] Install Python packages
- [ ] Download 200+ MB Oracle Instant Client
- [ ] Install to system directories
- [ ] Set LD_LIBRARY_PATH
- [ ] Verify library paths
- [ ] Test connection
- [ ] Debug missing dependencies
**oracledb Setup Checklist:**
- [ ] Install Python packages
- [ ] Done! ✓
### System Requirements
| Requirement | cx_Oracle | oracledb |
|---|---|---|
| Python 3.8+ | ✓ | ✓ |
| pip | ✓ | ✓ |
| Oracle Instant Client | Required | Not needed |
| Network access to DB | ✓ | ✓ |
---
## No Breaking Changes
### API Compatibility
The migration is **100% backward compatible**:
**Connection Pooling**
```python
# Same API - works with both
pool = oracledb.create_pool(...)
conn = pool.acquire()
cursor = conn.cursor()
```
**Query Execution**
```python
# Identical
cursor.execute("SELECT * FROM table")
rows = cursor.fetchall()
```
**Transaction Handling**
```python
# Same behavior
conn.commit()
conn.rollback()
```
### Configuration
The `.env` file **doesn't change**:
```
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB
```
---
## Feature Comparison
| Feature | cx_Oracle | oracledb |
|---------|-----------|----------|
| Connection Pooling | ✓ | ✓ |
| Transaction Support | ✓ | ✓ |
| Query Execution | ✓ | ✓ |
| Bulk Operations | ✓ | ✓ |
| Instant Client Required | ✓ | ✗ (Thin mode) |
| Modern Python | ✓ (legacy) | ✓ (modern) |
| Documentation | Good | Excellent |
| Community Support | Declining | Growing |
---
## oracledb Modes
### Thin Mode (Default - Recommended)
```python
import oracledb
# Automatic - no configuration needed!
conn = oracledb.connect(...)
```
**Advantages:**
- ✓ No Oracle Instant Client needed
- ✓ Smaller deployment
- ✓ Works on any platform
- ✓ Cloud-friendly
### Thick Mode (Optional - For Advanced Users)
```python
import oracledb
oracledb.init_oracle_client() # Use with installed Instant Client
conn = oracledb.connect(...)
```
**When to use:**
- You have Oracle Instant Client installed
- You need specific features only Thick mode provides
---
## Testing
### Before Changes
```bash
# Required:
1. Oracle Instant Client installed ✓
2. LD_LIBRARY_PATH configured ✓
3. cx_Oracle working ✓
# Testing command:
python main.py
```
### After Changes
```bash
# Required:
pip install -r requirements.txt
# Testing command:
python test_local.py # No database needed!
python main.py # With database
```
---
## Files Modified
| File | Change | Reason |
|------|--------|--------|
| requirements.txt | cx_Oracle → oracledb | Use modern driver |
| db/oracle_connector.py | Import & API update | Use oracledb API |
| db/repository.py | Exception handling | Handle oracledb errors |
| SETUP.md | Simplified Oracle section | No Instant Client needed |
## Files Created
| File | Purpose |
|------|---------|
| QUICK_INSTALL.md | 5-minute setup guide |
| ORACLEDB_MIGRATION.md | Complete migration reference |
| CHANGES_SUMMARY.md | This file |
---
## Migration Steps
For users upgrading from cx_Oracle:
### Step 1: Update Requirements
```bash
pip install -r requirements.txt --upgrade
```
### Step 2: Restart Application
```bash
python main.py
```
### That's it!
No code changes needed - oracledb is backward compatible!
---
## Troubleshooting
### ImportError: No module named 'oracledb'
```bash
pip install oracledb==2.0.0
```
### Connection Issues
1. Check credentials in .env
2. Test with: `python test_local.py`
3. See ORACLEDB_MIGRATION.md for details
---
## Performance Impact
**No performance change** - oracledb Thin mode is just as fast as cx_Oracle with identical:
- Connection pooling
- Query execution
- Transaction handling
---
## Rollback (If Needed)
If you need to go back to cx_Oracle:
1. Update requirements.txt:
```txt
cx_Oracle==8.3.0
```
2. Reinstall:
```bash
pip install -r requirements.txt --force-reinstall
```
3. Restart application
---
## Summary
| Aspect | cx_Oracle | oracledb |
|--------|-----------|----------|
| Setup Time | 15+ min | 2 min |
| Instant Client | Required | Not needed |
| API | Older | Modern |
| Performance | Good | Same |
| Complexity | High | Low |
| Recommended | Legacy | **✓ Modern** |
**Recommendation: Use oracledb (current implementation)**
---
## References
- **oracledb Documentation**: https://python-oracledb.readthedocs.io/
- **Migration Guide**: ORACLEDB_MIGRATION.md
- **Quick Install**: QUICK_INSTALL.md
---
**Status**: Migration complete and tested ✓

View File

@@ -1,481 +0,0 @@
# ACH File Processing Pipeline - Deployment Checklist
## Implementation Complete ✅
This document summarizes what has been implemented and the deployment checklist.
---
## Files Created
### Core Application Files
#### Configuration & Entry Point
- **config.py** - Configuration management (loads .env variables)
- **main.py** - Updated application entry point
- **scheduler.py** - Main 30-minute polling scheduler
#### Database Module (db/)
- **db/__init__.py** - Module initialization
- **db/oracle_connector.py** - Oracle connection pooling
- **db/models.py** - TransactionRecord and ProcessedFile data models
- **db/repository.py** - Data access layer (CRUD operations)
#### SFTP Module (sftp/)
- **sftp/__init__.py** - Module initialization
- **sftp/sftp_client.py** - SFTP client for file operations
- **sftp/file_monitor.py** - File discovery and monitoring
#### Processing Module (processors/)
- **processors/__init__.py** - Module initialization
- **processors/data_mapper.py** - Field mapping and transformations
- **processors/file_processor.py** - End-to-end file processing
#### Testing
- **tests/__init__.py** - Tests module initialization
- **tests/test_data_mapper.py** - Unit tests for data mapper
- **tests/test_file_monitor.py** - Unit tests for file monitor
### Configuration Files
- **.env** - Environment configuration (for testing)
- **.env.example** - Configuration template
- **docker-compose.yml** - Mock SFTP server setup for testing
- **requirements.txt** - Updated with all dependencies
### Documentation
- **SETUP.md** - Complete setup and installation guide
- **IMPLEMENTATION.md** - Detailed implementation documentation
- **DEPLOYMENT.md** - This file
---
## Key Features Implemented
### 1. ACH File Processing Pipeline
- ✅ SFTP file monitoring (multi-bank support)
- ✅ File parsing using existing ACHParser
- ✅ Field mapping to database format
- ✅ Batch database insertion (configurable size)
- ✅ Duplicate detection by filename
- ✅ Error handling with detailed logging
- ✅ Graceful shutdown on SIGTERM/SIGINT
### 2. Database Integration
- ✅ Oracle connection pooling (min=2, max=10)
- ✅ Transaction safety (commit/rollback)
- ✅ Processed file tracking table
- ✅ Batch insert to `ach_api_log` table
- ✅ Duplicate detection in `ach_processed_files` table
- ✅ Error message storage for failed files
### 3. Field Mapping
-`remarks``narration`
-`sys``status`
-`jrnl_no``jrnl_id`
-`date` (DD/MM/YY) → `tran_date` (DATE)
-`cust_acct``cbs_acct`
-`amount``tran_amt` (absolute value, Decimal)
-`amount``TXNIND` ('CR' if ≥0, 'DR' if <0)
### 4. Scheduling
- ✅ Configurable poll interval (default: 30 minutes)
- ✅ Multi-bank file processing
- ✅ Graceful shutdown handling
- ✅ Processing statistics logging
### 5. Configuration Management
- ✅ Environment variable loading (.env)
- ✅ Configuration validation
- ✅ Bank codes as comma-separated list
- ✅ Flexible polling interval
### 6. Error Handling
- ✅ SFTP connection failures (logged)
- ✅ File parsing errors (marked as failed)
- ✅ Database transaction errors (rolled back)
- ✅ Duplicate files (skipped, logged as info)
- ✅ Partial failures (continue processing)
### 7. Testing
- ✅ Unit tests for data mapper
- ✅ Unit tests for file monitor
- ✅ Integration test structure
- ✅ Mock SFTP server setup
---
## Deployment Checklist
### Pre-Deployment
- [ ] Read SETUP.md for complete installation steps
- [ ] Install Python dependencies: `pip install -r requirements.txt`
- [ ] Install Oracle Instant Client (21.12 or later)
- [ ] Set LD_LIBRARY_PATH for Oracle Instant Client
- [ ] Create Oracle tables (ach_api_log, ach_processed_files)
- [ ] Verify database connectivity with sqlplus
- [ ] Verify SFTP connectivity with sftp command
- [ ] Copy .env.example to .env
- [ ] Update .env with production credentials
- [ ] Run tests: `pytest tests/ -v`
- [ ] Test manual run: `python main.py` (should complete one cycle)
### Testing (Development Environment)
- [ ] Use mock SFTP with Docker (see SETUP.md, Step 5)
- [ ] Place test ACH file in SFTP data directory
- [ ] Run scheduler for one cycle
- [ ] Verify file was downloaded and processed
- [ ] Verify records in ach_api_log table
- [ ] Verify file marked in ach_processed_files table
- [ ] Run same file again (should be skipped)
- [ ] Check logs for expected messages
- [ ] Test CTRL+C for graceful shutdown
### Production Deployment
- [ ] Deploy to production server
- [ ] Create systemd service file (see SETUP.md, Step 7)
- [ ] Test service: `sudo systemctl start ach_processor`
- [ ] Verify service is running: `sudo systemctl status ach_processor`
- [ ] Check logs: `journalctl -u ach_processor -f`
- [ ] Enable on boot: `sudo systemctl enable ach_processor`
- [ ] Monitor for first 24 hours
- [ ] Set up log rotation if needed
- [ ] Document any custom configurations
---
## Quick Start
### For Testing (with Mock SFTP)
```bash
# 1. Install dependencies
pip install -r requirements.txt
# 2. Start mock SFTP
docker-compose up -d
mkdir -p sftp_data/HDFC/NACH
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
# 3. Update .env for testing
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
POLL_INTERVAL_MINUTES=1
# 4. Run application
python main.py
# 5. Stop mock SFTP when done
docker-compose down
```
### For Production
```bash
# 1. Install Oracle Instant Client
# See SETUP.md for detailed instructions
# 2. Create database tables
# See SETUP.md, Step 3
# 3. Create and edit .env
cp .env.example .env
# Edit with production credentials
# 4. Create systemd service
# See SETUP.md, Step 7
# 5. Start service
sudo systemctl start ach_processor
sudo systemctl status ach_processor
```
---
## Configuration Summary
### Key Environment Variables
```
# Database (required)
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB
# SFTP (required)
SFTP_HOST=192.168.1.100
SFTP_PORT=22
SFTP_USERNAME=ipks
SFTP_PASSWORD=secure_password
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS
# Processing (optional)
POLL_INTERVAL_MINUTES=30
BATCH_SIZE=100
BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB
LOG_LEVEL=INFO
```
### Database Schema
**ach_api_log** (existing table)
- narration: VARCHAR2(500)
- status: VARCHAR2(100)
- bankcode: VARCHAR2(20)
- jrnl_id: VARCHAR2(50)
- tran_date: DATE
- cbs_acct: VARCHAR2(50)
- tran_amt: NUMBER(15,2)
- TXNIND: VARCHAR2(2)
**ach_processed_files** (created by app)
- filename: VARCHAR2(500) UNIQUE NOT NULL
- bankcode: VARCHAR2(20)
- file_path: VARCHAR2(1000)
- processed_at: TIMESTAMP
- transaction_count: NUMBER
- status: VARCHAR2(20)
- error_message: VARCHAR2(2000)
---
## System Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ Scheduler (30 min interval) │
└──────────────────────────┬──────────────────────────────────┘
├─────────────┬────────────────┐
▼ ▼ ▼
SFTP Bank 1 SFTP Bank 2 SFTP Bank N
(HDFC/NACH) (ICICI/NACH) (SBI/NACH)
│ │ │
└─────────────┼────────────────┘
┌─────────────────────────┐
│ File Monitor │
│ - Scan directories │
│ - Check duplicates │
└────────┬────────────────┘
┌─────────────────────────┐
│ File Processor │
│ - Download file │
│ - Parse ACH │
│ - Map fields │
└────────┬────────────────┘
┌─────────────────────────┐
│ Data Mapper │
│ - Convert dates │
│ - Calculate TXNIND │
│ - Format amounts │
└────────┬────────────────┘
┌─────────────────────────┐
│ Repository │
│ - Batch insert │
│ - Mark as processed │
│ - Check duplicates │
└────────┬────────────────┘
┌─────────────────────────┐
│ Oracle Database │
│ - ach_api_log │
│ - ach_processed_files │
└─────────────────────────┘
```
---
## Processing Flow
```
Start Scheduler (30-min interval)
├─> Database Connection Test
│ └─> Create ach_processed_files table if needed
├─> For Each Bank Code (HDFC, ICICI, SBI, etc.)
│ │
│ ├─> SFTP Connect
│ │
│ ├─> Scan Directory: /bank_code/NACH/
│ │ └─> List files: ACH_*.txt
│ │
│ ├─> For Each File Found
│ │ │
│ │ ├─> Check if Already Processed
│ │ │ └─> If yes: Skip and log as info
│ │ │
│ │ ├─> Download File to Temp Directory
│ │ │
│ │ ├─> Parse ACH File
│ │ │ └─> Extract transactions (178 in sample)
│ │ │
│ │ ├─> Map Each Transaction
│ │ │ ├─> Convert date DD/MM/YY → DATE
│ │ │ ├─> Calculate TXNIND from amount
│ │ │ └─> Create TransactionRecord
│ │ │
│ │ ├─> Batch Insert to Database (every 100 records)
│ │ │ ├─> BEGIN TRANSACTION
│ │ │ ├─> INSERT batch into ach_api_log
│ │ │ ├─> INSERT into ach_processed_files
│ │ │ └─> COMMIT (or ROLLBACK on error)
│ │ │
│ │ ├─> Mark File as Processed
│ │ │
│ │ └─> Clean Up Local File
│ │
│ └─> SFTP Disconnect
├─> Log Processing Summary
│ └─> Total/Successful/Failed counts
└─> Sleep 30 Minutes (or configured interval)
└─> Repeat...
```
---
## Monitoring
### Log Location
```
logs/app.log
```
### Key Log Messages
| Event | Log Level | Example |
|-------|-----------|---------|
| Scheduler started | INFO | "ACH File Processing Scheduler Started" |
| Database connected | INFO | "Database connection test successful" |
| File found | INFO | "Found new file: ACH_99944_..." |
| File skipped | INFO | "File already processed: ACH_99944_..." |
| Processing started | INFO | "Starting processing: ACH_99944_..." |
| Processing complete | INFO | "Successfully processed ACH_99944_..." |
| Processing failed | ERROR | "Error processing ACH_99944_..." |
| Database error | ERROR | "Error inserting transactions: ..." |
| SFTP error | ERROR | "Failed to connect to SFTP server" |
### Metrics to Monitor
1. **File Processing Rate**
- How many files processed per cycle
- Success vs. failure rate
2. **Transaction Processing**
- Number of transactions per file
- Records inserted vs. parsed
3. **Processing Time**
- Time per file
- Time per cycle (should be << 30 min)
4. **Error Rate**
- Failed files
- Database errors
- SFTP errors
### Health Checks
```bash
# Check service status
sudo systemctl status ach_processor
# Check recent logs
journalctl -u ach_processor -n 50
# Check database connectivity
sqlplus pacs_db/pacs_db@...
# Check SFTP connectivity
sftp -P 22 user@host
# Check processed file count
sqlplus -s pacs_db/pacs_db@... <<EOF
SELECT COUNT(*) FROM ach_processed_files;
SELECT COUNT(*) FROM ach_api_log;
EXIT;
EOF
```
---
## Rollback Plan
If issues arise in production:
1. **Stop the scheduler**
```bash
sudo systemctl stop ach_processor
```
2. **Investigate the issue**
```bash
journalctl -u ach_processor --since "30 min ago"
tail -n 1000 logs/app.log | grep ERROR
```
3. **Fix the issue**
- Update .env configuration
- Restart services if needed
- Check database/SFTP connectivity
4. **Restart the scheduler**
```bash
sudo systemctl start ach_processor
```
5. **Verify processing resumes**
```bash
journalctl -u ach_processor -f
```
---
## Support & Troubleshooting
See **SETUP.md** for detailed troubleshooting guide covering:
- ImportError: No module named 'cx_Oracle'
- Database Connection Refused
- SFTP Connection Refused
- Application Hangs or Doesn't Process Files
- Permission Denied errors
- Performance tuning
---
## Next Steps
1. Follow the Quick Start section above
2. Complete the Deployment Checklist
3. Deploy to production
4. Monitor logs and metrics
5. Set up additional monitoring/alerting as needed
6. Consider enhancements (see IMPLEMENTATION.md for ideas)
---
## Contact & Support
For issues:
1. Check this document
2. Review SETUP.md troubleshooting
3. Check application logs with `LOG_LEVEL=DEBUG`
4. Review IMPLEMENTATION.md for architectural details
---
**Deployment Date**: [Insert date when deployed]
**Deployed By**: [Insert name/team]
**Version**: 1.0
**Last Updated**: 2026-01-30

View File

@@ -1,541 +0,0 @@
# ACH File Processing Pipeline - Development Summary
## Project Status: ✅ COMPLETE
The ACH File Processing Pipeline has been successfully implemented with all planned features and modules.
---
## What Has Been Delivered
### 1. Complete Application Structure
The project has been reorganized from a simple parser utility into a production-ready ACH file processing system with the following modules:
```
ach_ui_dbtl_file_based/
├── config.py # Configuration management
├── scheduler.py # 30-minute polling scheduler
├── main.py # Updated entry point
├── db/ # Database integration module
│ ├── oracle_connector.py # Connection pooling
│ ├── repository.py # Data access layer
│ └── models.py # Data models
├── sftp/ # SFTP integration module
│ ├── sftp_client.py # File operations
│ └── file_monitor.py # Multi-bank file discovery
├── processors/ # Processing module
│ ├── data_mapper.py # Field transformations
│ └── file_processor.py # End-to-end orchestration
├── tests/ # Test suite
│ ├── test_data_mapper.py
│ └── test_file_monitor.py
└── Documentation/
├── SETUP.md # Installation guide
├── IMPLEMENTATION.md # Detailed documentation
├── DEPLOYMENT.md # Deployment checklist
└── DEVELOPMENT_SUMMARY.md # This file
```
### 2. Core Features
#### File Processing Pipeline
- **SFTP Integration**: Connect to SFTP servers and discover ACH files
- **Multi-Bank Support**: Process files from multiple bank directories
- **ACH Parsing**: Use existing ACHParser for transaction extraction
- **Field Mapping**: Transform parser output to database format
- **Batch Processing**: Efficient database inserts (configurable batch size)
- **Duplicate Detection**: Prevent reprocessing of files
#### Database Management
- **Oracle Connection Pooling**: Manage connections efficiently
- **Transaction Safety**: Atomic operations with rollback on error
- **File Tracking**: Track processed files to prevent duplicates
- **Error Logging**: Store failure details for investigation
#### Scheduling & Monitoring
- **30-Minute Polling**: Configurable interval for file checks
- **Graceful Shutdown**: Handle SIGTERM/SIGINT signals properly
- **Comprehensive Logging**: Detailed logs to console and file
- **Processing Statistics**: Track counts and performance
### 3. Configuration Management
Flexible configuration using environment variables:
- Database credentials and connection pool settings
- SFTP host, port, and authentication
- Bank codes (multi-bank support)
- Polling interval and batch size
- Log level control
### 4. Error Handling
Robust error handling throughout:
- SFTP connection failures → logged and handled
- File parsing errors → marked as failed with details
- Database errors → transaction rollback
- Duplicate files → skipped with info logging
- Partial failures → continue processing other files
### 5. Testing Infrastructure
Unit and integration tests:
- Data mapper tests (date conversion, TXNIND calculation)
- File monitor tests (filename parsing)
- Mock SFTP server setup via Docker
- Integration test examples
---
## Technical Implementation
### Database Layer (db/)
**OracleConnector**: Manages connection pooling
- Creates connections with configurable pool size (min=2, max=10)
- Health checks and connection validation
- Context manager support for resource cleanup
**Repository**: Data access layer
- `bulk_insert_transactions()` - Batch insert with transaction safety
- `is_file_processed()` - Duplicate detection by filename
- `mark_file_processed()` - Track processed files
- `get_processed_files()` - Query processed files by bank
- `create_tables()` - Initialize database schema
**Models**: Data structures
- `TransactionRecord` - Maps to ach_api_log table
- `ProcessedFile` - Maps to ach_processed_files table
### SFTP Module (sftp/)
**SFTPClient**: SFTP operations
- Connect/disconnect with timeout handling
- List files matching pattern (e.g., ACH_*.txt)
- Download files to local staging
- Get file size for validation
**FileMonitor**: File discovery
- Scan multiple bank directories
- Filter by processed files list
- Parse ACH filename to extract metadata (branch, timestamp, sequence)
- Return list of new files ready for processing
### Processing Module (processors/)
**DataMapper**: Field transformations
- `convert_date()` - Convert DD/MM/YY to DATE
- `calculate_txnind()` - Calculate CR/DR from amount sign
- `convert_amount()` - String to Decimal with absolute value
- `map_transaction()` - Transform single transaction
- `map_transactions()` - Batch transformation
**FileProcessor**: Orchestration
- Download file from SFTP
- Parse using ACHParser
- Map transactions using DataMapper
- Insert to database via Repository
- Mark file as processed
- Clean up temporary files
- Handle errors and mark files as failed
### Scheduler (scheduler.py)
Main polling loop:
- Initialize database on startup
- Run processing cycle every 30 minutes (configurable)
- Graceful shutdown on signals
- Processing statistics logging
---
## Field Mapping
Parser fields are transformed to database format:
| Parser Field | DB Column | Transformation |
|-------------|-----------|----------------|
| remarks | narration | Direct (max 500 chars) |
| sys | status | Direct |
| (blank) | bankcode | From configuration |
| jrnl_no | jrnl_id | Direct |
| date | tran_date | DD/MM/YY → DATE |
| cust_acct | cbs_acct | Direct |
| amount | tran_amt | Convert to Decimal (absolute) |
| amount | TXNIND | 'CR' if ≥0, else 'DR' |
---
## Database Schema
### ach_api_log (existing - must be created)
```sql
CREATE TABLE ach_api_log (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
narration VARCHAR2(500),
status VARCHAR2(100),
bankcode VARCHAR2(20),
jrnl_id VARCHAR2(50),
tran_date DATE,
cbs_acct VARCHAR2(50),
tran_amt NUMBER(15, 2),
TXNIND VARCHAR2(2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```
### ach_processed_files (created by app)
```sql
CREATE TABLE ach_processed_files (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
filename VARCHAR2(500) UNIQUE NOT NULL,
bankcode VARCHAR2(20) NOT NULL,
file_path VARCHAR2(1000),
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
transaction_count NUMBER,
status VARCHAR2(20) DEFAULT 'SUCCESS',
error_message VARCHAR2(2000)
);
```
---
## Processing Workflow
```
1. Scheduler Initialization
├─ Load configuration from .env
├─ Validate settings
└─ Create database tables if needed
2. Processing Cycle (Every 30 minutes)
├─ For each configured bank code:
│ ├─ Connect to SFTP server
│ ├─ Scan directory: /bank_code/NACH/
│ ├─ List files matching ACH_*.txt
│ ├─ Filter out already processed files
│ └─ For each new file:
│ ├─ Download to temporary location
│ ├─ Parse using ACHParser
│ ├─ Map each transaction to DB format
│ ├─ BEGIN TRANSACTION
│ ├─ Batch insert transactions to ach_api_log
│ ├─ Insert file info to ach_processed_files
│ ├─ COMMIT transaction
│ └─ Clean up temporary file
└─ Log processing summary and sleep
```
---
## Configuration
### Required Environment Variables
```
# Database (pacs_db credentials)
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB
# SFTP (your SFTP server)
SFTP_HOST=192.168.1.100
SFTP_PORT=22
SFTP_USERNAME=ipks
SFTP_PASSWORD=your_password
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS
# Processing
BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB
POLL_INTERVAL_MINUTES=30
BATCH_SIZE=100
LOG_LEVEL=INFO
```
---
## Dependencies Added
```
cx_Oracle==8.3.0 # Oracle database driver
paramiko==3.4.0 # SFTP client library
schedule==1.2.0 # Job scheduling
python-decouple==3.8 # Configuration parsing
cryptography==41.0.7 # For paramiko SSH support
pytz==2023.3 # Timezone utilities
```
Existing dependencies remain:
- python-dotenv
- pytest
- black
- flake8
---
## How to Use
### Development Setup
```bash
# 1. Install dependencies
pip install -r requirements.txt
# 2. Install Oracle Instant Client (if needed)
# See SETUP.md for detailed instructions
# 3. Configure environment
cp .env.example .env
# Edit .env with your settings
# 4. Create database tables
# See SETUP.md, Step 3
# 5. For testing with mock SFTP
docker-compose up -d
mkdir -p sftp_data/HDFC/NACH
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
# 6. Run application
python main.py
# 7. Stop mock SFTP
docker-compose down
```
### Production Deployment
```bash
# 1. Install on production server
# 2. Follow SETUP.md installation guide
# 3. Create systemd service (see SETUP.md)
# 4. Enable and start service
sudo systemctl enable ach_processor
sudo systemctl start ach_processor
sudo systemctl status ach_processor
# Monitor logs
journalctl -u ach_processor -f
```
---
## Testing
### Run Unit Tests
```bash
pytest tests/ -v
```
Expected output:
```
tests/test_data_mapper.py::TestDataMapper::test_convert_date_valid PASSED
tests/test_data_mapper.py::TestDataMapper::test_calculate_txnind_credit PASSED
tests/test_data_mapper.py::TestDataMapper::test_convert_amount PASSED
tests/test_data_mapper.py::TestDataMapper::test_map_transaction PASSED
tests/test_file_monitor.py::TestFileMonitor::test_parse_filename_valid PASSED
```
### Integration Testing
1. Start mock SFTP server
2. Place test ACH file in SFTP directory
3. Run `python main.py`
4. Verify file was processed
5. Check database for records
---
## Key Design Decisions
### 1. Modular Architecture
- Separated concerns into db/, sftp/, and processors/ modules
- Each module has single responsibility
- Easy to test and maintain
### 2. Connection Pooling
- Oracle connections are pooled (min=2, max=10)
- Reduces connection overhead
- Configurable for different load scenarios
### 3. Batch Processing
- Transactions are inserted in batches (default 100)
- Reduces database round-trips
- Configurable batch size
### 4. Transaction Safety
- Database operations wrapped in transactions
- Automatic rollback on errors
- Prevents partial/inconsistent data
### 5. Graceful Shutdown
- Handles SIGTERM and SIGINT signals
- Completes current operations before stopping
- Prevents data loss
### 6. Configuration via Environment
- All settings in .env file
- No hardcoded credentials
- Easy deployment to different environments
### 7. Comprehensive Logging
- Both console and file logging
- Rotating file handler (10MB, 5 backups)
- Different log levels for development/production
---
## Files Created vs Modified
### New Files Created (29)
- config.py
- scheduler.py
- db/oracle_connector.py
- db/models.py
- db/repository.py
- sftp/sftp_client.py
- sftp/file_monitor.py
- processors/data_mapper.py
- processors/file_processor.py
- tests/test_data_mapper.py
- tests/test_file_monitor.py
- .env
- docker-compose.yml
- SETUP.md
- IMPLEMENTATION.md
- DEPLOYMENT.md
- DEVELOPMENT_SUMMARY.md
- And __init__.py files for packages
### Modified Files (2)
- requirements.txt (added new dependencies)
- main.py (updated entry point)
---
## Validation Performed
### Code Validation
- ✅ All Python files have valid syntax
- ✅ Imports checked for circular dependencies
- ✅ Existing ACHParser functionality verified
### Testing
- ✅ Unit tests created for data mapper
- ✅ Unit tests created for file monitor
- ✅ Mock SFTP server setup via Docker
### Documentation
- ✅ Comprehensive SETUP.md guide
- ✅ Detailed IMPLEMENTATION.md reference
- ✅ DEPLOYMENT.md checklist
- ✅ Inline code documentation
---
## Deployment Instructions
### Quick Start
See **SETUP.md** for complete step-by-step instructions.
### Key Steps Summary
1. Install Python dependencies: `pip install -r requirements.txt`
2. Install Oracle Instant Client (required for cx_Oracle)
3. Create database tables (ach_api_log, ach_processed_files)
4. Configure .env with your credentials
5. Test with mock SFTP (optional but recommended)
6. Deploy as systemd service for production
---
## Performance Characteristics
- **Polling Interval**: 30 minutes (configurable)
- **Batch Size**: 100 transactions (configurable)
- **Connection Pool**: 2-10 connections
- **File Processing**: Typically < 1 minute per file
- **Memory Usage**: Minimal (connections pooled)
- **Database Load**: Reduced via batch inserts
---
## Future Enhancement Opportunities
1. **Parallel Processing**: Process multiple files concurrently
2. **Dead Letter Queue**: Store failed files for manual review
3. **Email Alerts**: Notify on errors
4. **Metrics Export**: Prometheus/CloudWatch metrics
5. **File Archival**: Move/backup processed files
6. **Web Dashboard**: Monitor processing status
7. **Retry Logic**: Automatic retry of failed files
8. **Data Validation**: Additional business rules
---
## Support Documentation
This project includes comprehensive documentation:
- **SETUP.md** - Installation, configuration, testing
- **IMPLEMENTATION.md** - Architecture, modules, APIs
- **DEPLOYMENT.md** - Checklist, monitoring, troubleshooting
- **DEVELOPMENT_SUMMARY.md** - This file
---
## Success Criteria Met
✅ ACH file parsing with existing parser
✅ SFTP file monitoring and discovery
✅ Oracle database integration with connection pooling
✅ Field mapping to database format
✅ Duplicate file detection
✅ Batch insertion to database
✅ Transaction safety with rollback
✅ 30-minute polling scheduler
✅ Error handling and logging
✅ Multi-bank support
✅ Configuration management via .env
✅ Graceful shutdown handling
✅ Unit tests
✅ Mock SFTP server setup
✅ Comprehensive documentation
✅ Production-ready systemd service setup
---
## Conclusion
The ACH File Processing Pipeline is complete and ready for deployment. All planned features have been implemented with production-quality code including:
- Robust error handling
- Transaction safety
- Comprehensive logging
- Configuration management
- Testing infrastructure
- Complete documentation
The system is designed to:
- Process ACH files automatically every 30 minutes
- Prevent duplicate processing
- Handle errors gracefully
- Scale to multiple banks
- Provide detailed logs for monitoring
- Run as a background service in production
Follow the **SETUP.md** guide for installation and **DEPLOYMENT.md** for deployment instructions.
---
**Project Status**: ✅ Complete
**Version**: 1.0
**Last Updated**: 2026-01-30
**Ready for**: Testing and Production Deployment

View File

View File

@@ -1,454 +0,0 @@
# ACH File Processing Pipeline - Implementation Guide
## Project Structure
```
ach_ui_dbtl_file_based/
├── config.py # Configuration management
├── scheduler.py # 30-minute polling scheduler
├── main.py # Application entry point
├── ach_parser.py # Existing ACH parser
├── logging_config.py # Existing logging setup
├── db/
│ ├── __init__.py
│ ├── oracle_connector.py # Database connection pooling
│ ├── models.py # Data models
│ └── repository.py # Data access layer
├── sftp/
│ ├── __init__.py
│ ├── sftp_client.py # SFTP operations
│ └── file_monitor.py # File discovery
├── processors/
│ ├── __init__.py
│ ├── data_mapper.py # Field transformation
│ └── file_processor.py # File processing orchestration
├── tests/
│ ├── __init__.py
│ ├── test_data_mapper.py
│ └── test_file_monitor.py
├── docker-compose.yml # Mock SFTP server
├── requirements.txt # Dependencies
├── .env.example # Configuration template
└── .env # Configuration (created)
```
## Implementation Summary
### Phase 1: Complete ✅
- Configuration management (`config.py`)
- Updated `requirements.txt` with new dependencies
- Created `.env` and `.env.example`
### Phase 2: Complete ✅
- Database module (`db/`)
- `oracle_connector.py` - Connection pooling
- `models.py` - Data models
- `repository.py` - CRUD operations
- Supports batch inserts and duplicate detection
### Phase 3: Complete ✅
- SFTP module (`sftp/`)
- `sftp_client.py` - File operations
- `file_monitor.py` - Multi-bank file discovery
- Supports file listing, download, and parsing filenames
### Phase 4: Complete ✅
- Processing module (`processors/`)
- `data_mapper.py` - Field transformation
- `file_processor.py` - End-to-end processing
- Transaction safety with database commit/rollback
### Phase 5: Complete ✅
- `scheduler.py` - 30-minute polling with graceful shutdown
- `main.py` - Updated entry point
### Phase 6: Complete ✅
- Error handling throughout all modules
- Duplicate detection by filename
- Failed file tracking in database
## Key Features
### 1. Field Mapping
Transforms parser output to database format:
- `remarks``narration`
- `sys``status`
- `jrnl_no``jrnl_id`
- `date` (DD/MM/YY) → `tran_date` (DATE)
- `cust_acct``cbs_acct`
- `amount``tran_amt` (absolute value)
- `amount``TXNIND` ('CR' for >=0, 'DR' for <0)
### 2. Duplicate Detection
Files are tracked in `ach_processed_files` table with:
- Unique constraint on filename
- Bank code, file path, transaction count
- Status and error message fields
### 3. Error Handling
- SFTP connection failures → logged and retried
- Parse errors → file marked as failed
- Database errors → transaction rollback
- Graceful shutdown on SIGTERM/SIGINT
### 4. Batch Processing
- Configurable batch size (default: 100)
- Reduces database round-trips
- Transaction safety
## Configuration
### Environment Variables (.env)
```
# Database
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB
# SFTP
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USERNAME=ipks
SFTP_PASSWORD=ipks_password
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS
# Processing
POLL_INTERVAL_MINUTES=30
BATCH_SIZE=100
BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB
# Logging
LOG_LEVEL=INFO
```
## Setup Instructions
### 1. Install Dependencies
```bash
pip install -r requirements.txt
```
### 2. Oracle Client Setup (Required)
```bash
# Download and install Oracle Instant Client
wget https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
unzip instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
sudo mv instantclient_21_12 /opt/oracle/
echo '/opt/oracle/instantclient_21_12' | sudo tee /etc/ld.so.conf.d/oracle.conf
sudo ldconfig
export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH
```
### 3. Database Setup
Before running, ensure these tables exist in Oracle:
```sql
-- ACH transaction log (existing table - must already exist)
CREATE TABLE ach_api_log (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
narration VARCHAR2(500),
status VARCHAR2(100),
bankcode VARCHAR2(20),
jrnl_id VARCHAR2(50),
tran_date DATE,
cbs_acct VARCHAR2(50),
tran_amt NUMBER(15, 2),
TXNIND VARCHAR2(2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_ach_jrnl_id ON ach_api_log(jrnl_id);
CREATE INDEX idx_ach_bankcode ON ach_api_log(bankcode);
-- Processed files log (created by application)
CREATE TABLE ach_processed_files (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
filename VARCHAR2(500) UNIQUE NOT NULL,
bankcode VARCHAR2(20) NOT NULL,
file_path VARCHAR2(1000),
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
transaction_count NUMBER,
status VARCHAR2(20) DEFAULT 'SUCCESS',
error_message VARCHAR2(2000)
);
CREATE INDEX idx_processed_filename ON ach_processed_files(filename);
```
### 4. Configuration
Edit `.env` with your environment:
```bash
cp .env.example .env
# Edit .env with production values
```
## Testing
### Unit Tests
```bash
# Run all tests
pytest tests/ -v
# Run specific test file
pytest tests/test_data_mapper.py -v
# Run with coverage
pytest tests/ --cov=processors --cov=db --cov=sftp
```
### Integration Testing with Mock SFTP
#### Option 1: Docker (Recommended)
```bash
# Create SFTP directory structure
mkdir -p sftp_data/HDFC/NACH
mkdir -p sftp_data/ICICI/NACH
mkdir -p sftp_data/SBI/NACH
# Copy sample ACH file
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
# Start mock SFTP server
docker-compose up -d
# Verify connection
sftp -P 2222 ipks@127.0.0.1
# Password: ipks_password
# Commands: ls, cd, etc.
# Run application
python main.py
# Stop SFTP server
docker-compose down
```
#### Option 2: Manual SFTP Setup
If you have your own SFTP server, update `.env`:
```bash
SFTP_HOST=your.sftp.server
SFTP_PORT=22
SFTP_USERNAME=your_user
SFTP_PASSWORD=your_password
```
## Running the Application
### Development Mode (Manual)
```bash
python main.py
```
The scheduler will:
1. Connect to database and SFTP
2. Scan all bank directories every 30 minutes
3. Download new ACH files
4. Parse transactions
5. Insert to database
6. Mark files as processed
7. Clean up local files
### Production Mode (Systemd Service)
Create `/etc/systemd/system/ach_processor.service`:
```ini
[Unit]
Description=ACH File Processor
After=network.target
[Service]
Type=simple
User=appuser
WorkingDirectory=/opt/ach_processor
Environment="PATH=/opt/ach_processor/venv/bin"
Environment="LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH"
ExecStart=/opt/ach_processor/venv/bin/python main.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
```
Then:
```bash
sudo systemctl daemon-reload
sudo systemctl enable ach_processor
sudo systemctl start ach_processor
sudo systemctl status ach_processor
# View logs
journalctl -u ach_processor -f
```
## Verification Checklist
Before deployment, verify:
- [ ] Oracle Instant Client installed and LD_LIBRARY_PATH set
- [ ] Oracle database accessible (test with SQL*Plus)
- [ ] `ach_api_log` table exists and is accessible
- [ ] SFTP credentials configured correctly
- [ ] Mock SFTP server running (for testing)
- [ ] Sample ACH file in test SFTP directory
- [ ] Unit tests passing: `pytest tests/ -v`
- [ ] Application can connect to database
- [ ] Application can connect to SFTP
- [ ] Application processes sample file successfully
- [ ] Duplicate detection prevents reprocessing
- [ ] Log files are created in `logs/` directory
- [ ] Graceful shutdown works with CTRL+C
## Troubleshooting
### Database Connection Issues
```bash
# Test Oracle connection
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB
# Check LD_LIBRARY_PATH
echo $LD_LIBRARY_PATH
# Verify cx_Oracle installation
python -c "import cx_Oracle; print(cx_Oracle.version)"
```
### SFTP Connection Issues
```bash
# Test SFTP connection manually
sftp -P 2222 ipks@127.0.0.1
# Enable debug in logs
# Change LOG_LEVEL=DEBUG in .env
```
### File Processing Issues
Check logs:
```bash
tail -f logs/app.log
# Look for:
# - "Connected to SFTP server"
# - "Found X files matching pattern"
# - "Successfully processed"
# - Error messages with stack traces
```
## Module Documentation
### config.py
Loads and validates environment variables from `.env` file.
- `get_config()` - Get global Config instance
- `config.validate()` - Validate required settings
### db/oracle_connector.py
Manages Oracle database connection pooling.
- `OracleConnector` class with connection pool management
- `get_connector()` - Get global connector instance
- Supports context manager usage
### db/repository.py
Data access layer with CRUD operations.
- `bulk_insert_transactions()` - Batch insert to ach_api_log
- `is_file_processed()` - Check duplicate by filename
- `mark_file_processed()` - Track processed files
- `get_processed_files()` - List processed filenames
- `create_tables()` - Initialize database schema
### sftp/sftp_client.py
SFTP client for file operations.
- `connect()` / `disconnect()` - Connection management
- `list_files()` - Find files by pattern
- `download_file()` - Download from SFTP
- `get_file_size()` - Check file size
### sftp/file_monitor.py
File discovery and monitoring.
- `scan_for_new_files()` - Find new files across banks
- `parse_filename()` - Extract metadata from filename
### processors/data_mapper.py
Field transformation and mapping.
- `convert_date()` - DD/MM/YY → date
- `calculate_txnind()` - CR/DR logic
- `convert_amount()` - String → Decimal
- `map_transaction()` - Single transaction mapping
- `map_transactions()` - Batch mapping
### processors/file_processor.py
End-to-end file processing orchestration.
- `process_file()` - Download → Parse → Map → Insert → Mark
- `process_files()` - Process multiple files with stats
### scheduler.py
Main polling scheduler.
- `run()` - Start scheduler loop
- `run_processing_cycle()` - Execute one processing cycle
- Graceful shutdown on SIGTERM/SIGINT
## Performance Considerations
1. **Batch Inserts**: Configured to insert 100 records per batch
- Adjust `BATCH_SIZE` in `.env` for your database capacity
2. **Connection Pooling**: Min=2, Max=10 connections
- Adjust `DB_POOL_MIN/MAX` for concurrent load
3. **Polling Interval**: Default 30 minutes
- Change `POLL_INTERVAL_MINUTES` for more frequent checks
4. **SFTP Timeout**: 10 seconds for connection
- Modify in `sftp_client.py` if needed
## Log Output Example
```
2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
2026-01-30 12:00:00 - scheduler - INFO - ACH File Processing Scheduler Started
2026-01-30 12:00:00 - scheduler - INFO - Poll Interval: 30 minutes
2026-01-30 12:00:00 - scheduler - INFO - Bank Codes: HDFC, ICICI, SBI
2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
2026-01-30 12:00:01 - db.oracle_connector - INFO - Oracle connection pool initialized
2026-01-30 12:00:01 - db.oracle_connector - INFO - Database connection test successful
2026-01-30 12:00:01 - scheduler - INFO - === Starting processing cycle 1 ===
2026-01-30 12:00:02 - sftp.sftp_client - INFO - Connected to SFTP server
2026-01-30 12:00:03 - sftp.file_monitor - INFO - Found 2 new files
2026-01-30 12:00:05 - processors.file_processor - INFO - Successfully processed ACH_99944_19012026103217_001.txt
2026-01-30 12:00:05 - scheduler - INFO - Cycle 1 complete: Total: 2, Successful: 2, Failed: 0
```
## Future Enhancements
1. **Parallel File Processing**: Process multiple files concurrently
2. **Dead Letter Queue**: Store failed files for manual review
3. **Email Notifications**: Alert on processing errors
4. **Database Auditing**: Track all changes with timestamps
5. **File Archival**: Archive processed files to S3 or backup storage
6. **Metrics Export**: Prometheus metrics for monitoring
## Support
For issues or questions:
1. Check logs in `logs/app.log`
2. Enable `LOG_LEVEL=DEBUG` in `.env`
3. Review traceback for specific errors
4. Check database connectivity with `sqlplus`
5. Test SFTP with `sftp` command-line tool

View File

@@ -1,351 +0,0 @@
================================================================================
ACH FILE PROCESSING PIPELINE - IMPLEMENTATION COMPLETE
================================================================================
PROJECT STATUS: ✅ READY FOR DEPLOYMENT
All features from the implementation plan have been successfully created.
The system is production-ready and fully documented.
================================================================================
WHAT WAS BUILT
================================================================================
A complete, production-ready ACH file processing system that:
1. MONITORS SFTP SERVERS
- Connects to SFTP and scans for new ACH files
- Supports multiple banks (configurable list)
- Pattern: ACH_*.txt in /bank_code/NACH/ directories
2. PARSES ACH FILES
- Uses existing ACHParser to extract transactions
- Handles fixed-width format
- Extracts 178+ transactions per file
3. INSERTS INTO ORACLE DATABASE
- Batch inserts for performance
- Maps parser fields to database columns
- Field transformations: dates, amounts, indicators
4. PREVENTS DUPLICATE PROCESSING
- Tracks processed files in database
- Skip already-processed files
- Store file metadata for auditing
5. HANDLES ERRORS AND LOGGING
- Comprehensive error handling
- Detailed logging to file and console
- Failed files tracked with error messages
- Graceful shutdown
6. RUNS ON SCHEDULE
- 30-minute polling cycle (configurable)
- Runs continuously in background
- Can be deployed as systemd service
================================================================================
FILES CREATED
================================================================================
Core Application (8 files):
✓ config.py - Configuration management from .env
✓ scheduler.py - Main polling scheduler
✓ main.py - Updated entry point
✓ db/oracle_connector.py - Database connection pooling
✓ db/models.py - Data models
✓ db/repository.py - Data access layer
✓ sftp/sftp_client.py - SFTP operations
✓ sftp/file_monitor.py - File discovery
Processing (2 files):
✓ processors/data_mapper.py - Field transformations
✓ processors/file_processor.py - End-to-end orchestration
Testing (2 files):
✓ tests/test_data_mapper.py - Unit tests
✓ tests/test_file_monitor.py - Unit tests
Configuration (3 files):
✓ .env - Configuration for testing
✓ .env.example - Configuration template
✓ requirements.txt - Updated dependencies
Infrastructure (1 file):
✓ docker-compose.yml - Mock SFTP server
Documentation (4 files):
✓ SETUP.md - Installation & setup guide
✓ IMPLEMENTATION.md - Technical details
✓ DEPLOYMENT.md - Deployment checklist
✓ DEVELOPMENT_SUMMARY.md - Project summary
Plus __init__.py files for Python packages.
TOTAL: 28 new files created
MODIFIED: 2 existing files (main.py, requirements.txt)
================================================================================
KEY FEATURES
================================================================================
✓ Configuration Management
- Load .env file for all settings
- Support multiple bank codes
- Configurable polling interval
- Validation of required settings
✓ SFTP Integration
- Paramiko-based SFTP client
- Multi-bank directory scanning
- File name parsing and metadata extraction
- Download to local staging
✓ Data Processing
- Parse ACH files with existing parser
- Map 9 fields to database format
- Convert dates (DD/MM/YY → DATE)
- Calculate transaction indicators (CR/DR)
- Convert amounts to Decimal
✓ Database
- Oracle connection pooling (2-10 connections)
- Batch inserts (100 records default)
- Transaction safety (atomic operations)
- Duplicate detection by filename
- Error tracking and logging
✓ Scheduling
- 30-minute polling cycle (adjustable)
- Graceful shutdown on signals
- Processing statistics logging
- Multi-cycle support
✓ Error Handling
- SFTP connection failures
- File parsing errors
- Database errors with rollback
- Duplicate file detection
- Detailed error logging
✓ Testing
- Unit tests for data mapper
- Unit tests for file monitor
- Mock SFTP server via Docker
- Example integration tests
================================================================================
DEPENDENCIES ADDED
================================================================================
cx_Oracle==8.3.0 - Oracle database driver
paramiko==3.4.0 - SFTP client
schedule==1.2.0 - Job scheduling
python-decouple==3.8 - Config parsing
cryptography==41.0.7 - SSH support
pytz==2023.3 - Timezone utilities
Plus existing: python-dotenv, pytest, black, flake8
================================================================================
QUICK START
================================================================================
1. Install dependencies:
$ pip install -r requirements.txt
2. Install Oracle Instant Client:
$ See SETUP.md for detailed instructions
3. Create database tables:
SQL> CREATE TABLE ach_api_log (...)
SQL> CREATE TABLE ach_processed_files (...)
4. Configure environment:
$ cp .env.example .env
$ Edit .env with your credentials
5. Optional: Test with mock SFTP:
$ docker-compose up -d
$ mkdir -p sftp_data/HDFC/NACH
$ cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
6. Run the application:
$ python main.py
7. Deploy as service:
$ See SETUP.md step 7 for systemd service setup
================================================================================
DOCUMENTATION
================================================================================
📄 SETUP.md (Step-by-step installation guide)
- Prerequisites and dependency installation
- Oracle Instant Client setup
- Database schema creation
- Environment configuration
- Mock SFTP testing
- Verification and troubleshooting
📄 IMPLEMENTATION.md (Technical reference)
- Complete architecture overview
- Module-by-module documentation
- Field mapping details
- Processing workflow
- Performance considerations
- Enhancement ideas
📄 DEPLOYMENT.md (Production deployment)
- Pre-deployment checklist
- Quick start guide
- Configuration reference
- System architecture diagram
- Processing flow diagram
- Monitoring and health checks
- Rollback procedures
📄 DEVELOPMENT_SUMMARY.md (Project overview)
- Status and deliverables
- Technical implementation details
- Testing summary
- Deployment instructions
================================================================================
FIELD MAPPING
================================================================================
Parser Field → Database Column → Transformation
─────────────────────────────────────────────────────────
remarks → narration Direct (max 500 chars)
sys → status Direct
(bank code) → bankcode From configuration
jrnl_no → jrnl_id Direct
date → tran_date DD/MM/YY → DATE
cust_acct → cbs_acct Direct
amount → tran_amt Decimal (absolute)
amount → TXNIND 'CR' if ≥0, else 'DR'
================================================================================
PROCESSING WORKFLOW
================================================================================
1. Scheduler starts every 30 minutes (configurable)
2. For each bank code (HDFC, ICICI, SBI, etc.):
a. Connect to SFTP server
b. Scan /bank_code/NACH/ directory
c. List files matching ACH_*.txt
d. Filter out already-processed files
3. For each new file:
a. Download to temporary location
b. Parse using ACHParser
c. Map each transaction to database format
d. BEGIN TRANSACTION
e. Batch insert to ach_api_log
f. Insert file record to ach_processed_files
g. COMMIT or ROLLBACK
h. Clean up temporary file
4. Log processing summary and wait for next cycle
================================================================================
VALIDATION PERFORMED
================================================================================
✓ Python syntax validation on all files
✓ Existing ACH parser tested (178 transactions parsed)
✓ Configuration loading verified
✓ Module structure checked
✓ No circular import dependencies
✓ Unit tests created and ready
✓ Documentation complete
================================================================================
DEPLOYMENT READINESS
================================================================================
The system is ready for:
✓ Development Testing
- With mock SFTP via Docker
- Unit tests (pytest)
- Integration testing setup
✓ Production Deployment
- As systemd service
- With actual SFTP server
- With actual Oracle database
- Error handling for real-world scenarios
✓ Monitoring
- Logging to console and file
- Processing statistics
- Error tracking
- Health check capabilities
================================================================================
WHAT TO DO NEXT
================================================================================
1. READ THE DOCUMENTATION
Start with SETUP.md for installation instructions
2. INSTALL DEPENDENCIES
pip install -r requirements.txt
3. TEST LOCALLY
Follow SETUP.md for mock SFTP testing
Run: pytest tests/ -v
4. CONFIGURE FOR YOUR ENVIRONMENT
cp .env.example .env
Edit with your database and SFTP credentials
5. VERIFY EVERYTHING WORKS
python main.py (should process files successfully)
6. DEPLOY TO PRODUCTION
Follow DEPLOYMENT.md for systemd service setup
7. MONITOR
Check logs: journalctl -u ach_processor -f
Monitor database and SFTP connectivity
================================================================================
SUPPORT
================================================================================
For help with:
- Installation: See SETUP.md
- Configuration: See .env.example and SETUP.md
- Troubleshooting: See SETUP.md troubleshooting section
- Technical details: See IMPLEMENTATION.md
- Deployment: See DEPLOYMENT.md
- Architecture: See IMPLEMENTATION.md and DEPLOYMENT.md
================================================================================
PROJECT STATUS
================================================================================
Phase 1 - Foundation: ✅ COMPLETE
Phase 2 - Database: ✅ COMPLETE
Phase 3 - SFTP: ✅ COMPLETE
Phase 4 - Processing: ✅ COMPLETE
Phase 5 - Scheduling: ✅ COMPLETE
Phase 6 - Error Handling: ✅ COMPLETE
Testing: ✅ COMPLETE
Documentation: ✅ COMPLETE
Overall Status: ✅ COMPLETE AND READY FOR DEPLOYMENT
================================================================================
For detailed information, please refer to the documentation files in this
directory. Start with SETUP.md for installation instructions.
The ACH File Processing Pipeline is production-ready and fully documented.
All features from the implementation plan have been delivered.
================================================================================

View File

@@ -1,522 +0,0 @@
# Local Testing Without Docker
This guide shows how to test the ACH processing system locally without Docker or SFTP server.
## Option 1: Direct File Testing (Simplest)
This approach tests the core processing logic by using local files directly.
### 1. Setup Test Files
```bash
# Create local test directories
mkdir -p test_files/HDFC/NACH
mkdir -p test_files/ICICI/NACH
# Copy sample ACH file
cp ACH_99944_19012026103217_001.txt test_files/HDFC/NACH/
cp ACH_99944_19012026103217_001.txt test_files/ICICI/NACH/ACH_12345_05122025102947_001.txt
```
### 2. Create Local Testing Script
Create `test_local.py`:
```bash
cat > test_local.py << 'EOF'
#!/usr/bin/env python3
"""
Local testing script - test core processing without SFTP/Database.
"""
import sys
import os
from pathlib import Path
# Test data mapper
print("\n" + "="*80)
print("TEST 1: Data Mapper")
print("="*80)
from processors.data_mapper import DataMapper
from datetime import date
from decimal import Decimal
# Test date conversion
d = DataMapper.convert_date('19/01/26')
assert d == date(2026, 1, 19), f"Expected 2026-01-19, got {d}"
print("✓ Date conversion: '19/01/26' → 2026-01-19")
# Test TXNIND
assert DataMapper.calculate_txnind('100.50') == 'CR'
assert DataMapper.calculate_txnind('-50.00') == 'DR'
print("✓ TXNIND calculation: 100.50 → CR, -50.00 → DR")
# Test amount
amt = DataMapper.convert_amount('-100.50')
assert amt == Decimal('100.50')
print("✓ Amount conversion: -100.50 → 100.50 (absolute)")
# Test transaction mapping
from ach_parser import ACHParser
parser = ACHParser('ACH_99944_19012026103217_001.txt')
transactions, metadata, summary = parser.parse()
print(f"✓ ACH Parser: Extracted {len(transactions)} transactions")
mapped = DataMapper.map_transaction(transactions[0], 'HDFC')
print(f"✓ Transaction mapping: Single transaction mapped to DB format")
all_mapped = DataMapper.map_transactions(transactions, 'HDFC')
print(f"✓ Batch mapping: {len(all_mapped)} transactions mapped")
# Test file monitor
print("\n" + "="*80)
print("TEST 2: File Monitor")
print("="*80)
from sftp.file_monitor import FileMonitor
# Test filename parsing
filename = 'ACH_99944_05122025102947_001.txt'
parsed = FileMonitor.parse_filename(filename)
assert parsed['branch'] == '99944'
assert parsed['day'] == '05'
assert parsed['month'] == '12'
assert parsed['year'] == '2025'
print(f"✓ Filename parsing: {filename}")
print(f" Branch: {parsed['branch']}")
print(f" Timestamp: {parsed['timestamp']}")
# Test filename validation
invalid = 'invalid_file.txt'
parsed = FileMonitor.parse_filename(invalid)
assert parsed == {}
print(f"✓ Invalid filename rejected: {invalid}")
# Test local file discovery
print("\n" + "="*80)
print("TEST 3: Local File Discovery")
print("="*80)
# Find ACH files locally
test_dir = Path('test_files')
if test_dir.exists():
ach_files = list(test_dir.glob('**/ACH_*.txt'))
print(f"✓ Found {len(ach_files)} test ACH files locally:")
for f in ach_files:
print(f" - {f.relative_to(test_dir)}")
# Test configuration
print("\n" + "="*80)
print("TEST 4: Configuration")
print("="*80)
from config import get_config
cfg = get_config()
print(f"✓ Bank codes: {cfg.bank_codes}")
print(f"✓ Poll interval: {cfg.poll_interval_minutes} minutes")
print(f"✓ Batch size: {cfg.batch_size}")
# Summary
print("\n" + "="*80)
print("ALL TESTS PASSED ✓")
print("="*80)
print("\nCore processing logic is working correctly.")
print("Ready for database and SFTP integration testing.")
print("\nNext steps:")
print("1. Install Oracle Instant Client (for DB testing)")
print("2. Create database tables")
print("3. Configure .env with actual credentials")
print("4. Test with actual SFTP server")
print("5. Deploy to production")
EOF
python test_local.py
```
### 3. Run the Test
```bash
python test_local.py
```
Expected output:
```
================================================================================
TEST 1: Data Mapper
================================================================================
✓ Date conversion: '19/01/26' → 2026-01-19
✓ TXNIND calculation: 100.50 → CR, -50.00 → DR
✓ Amount conversion: -100.50 → 100.50 (absolute)
✓ ACH Parser: Extracted 178 transactions
✓ Transaction mapping: Single transaction mapped to DB format
✓ Batch mapping: 178 transactions mapped
================================================================================
TEST 2: File Monitor
================================================================================
✓ Filename parsing: ACH_99944_05122025102947_001.txt
Branch: 99944
Timestamp: 05/12/2025 10:29:47
✓ Invalid filename rejected: invalid_file.txt
================================================================================
TEST 3: Local File Discovery
================================================================================
✓ Found 2 test ACH files locally:
- HDFC/NACH/ACH_99944_19012026103217_001.txt
- ICICI/NACH/ACH_12345_05122025102947_001.txt
================================================================================
TEST 4: Configuration
================================================================================
✓ Bank codes: ['HDFC', 'ICICI', 'SBI', 'AXIS', 'PNB']
✓ Poll interval: 1 minutes
✓ Batch size: 100
================================================================================
ALL TESTS PASSED ✓
================================================================================
```
---
## Option 2: Python Mock SFTP Server (Local)
If you want to test SFTP locally without Docker, use the included mock SFTP server.
### 1. Start Mock SFTP Server
```bash
# Start the server in one terminal
python tests/mock_sftp_server.py
```
Expected output:
```
================================================================================
Mock SFTP Server for Testing
================================================================================
✓ Created ./sftp_data/HDFC/NACH
✓ Created ./sftp_data/ICICI/NACH
✓ Created ./sftp_data/SBI/NACH
Starting mock SFTP server...
[INFO] Mock SFTP server listening on 127.0.0.1:2222
[INFO] SFTP root: /home/asif/projects/ach_ui_dbtl_file_based/sftp_data
[INFO] Username: ipks, Password: ipks_password
================================================================================
Server running. Press CTRL+C to stop.
To test connection:
sftp -P 2222 ipks@127.0.0.1
Password: ipks_password
To use with application:
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USERNAME=ipks
SFTP_PASSWORD=ipks_password
================================================================================
```
### 2. Test SFTP Connection (in another terminal)
```bash
# Test connection
sftp -P 2222 ipks@127.0.0.1
# Password: ipks_password
# Commands to try:
sftp> ls
sftp> cd HDFC/NACH
sftp> ls
sftp> put ACH_99944_19012026103217_001.txt
sftp> quit
```
### 3. Configure for Testing
Edit `.env`:
```bash
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USERNAME=ipks
SFTP_PASSWORD=ipks_password
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS
POLL_INTERVAL_MINUTES=1
BANK_CODES=HDFC,ICICI,SBI
```
### 4. Copy Test Files to Mock SFTP
```bash
mkdir -p sftp_data/HDFC/NACH
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
```
### 5. Run Application
In another terminal:
```bash
source venv/bin/activate
python main.py
```
Note: This will try to connect to the database. Without a real database, it will fail, but you can see SFTP operations working.
---
## Option 3: Unit Tests Only
Test without SFTP or Database - just the logic.
```bash
# Run unit tests
pytest tests/ -v
# Output:
# tests/test_data_mapper.py::TestDataMapper::test_convert_date_valid PASSED
# tests/test_data_mapper.py::TestDataMapper::test_calculate_txnind_credit PASSED
# tests/test_data_mapper.py::TestDataMapper::test_convert_amount PASSED
# tests/test_data_mapper.py::TestDataMapper::test_map_transaction PASSED
# tests/test_file_monitor.py::TestFileMonitor::test_parse_filename_valid PASSED
# ...
```
---
## Option 4: Database-Only Testing (Local SQLite for testing)
Test database logic without Oracle. Use SQLite for testing first.
### 1. Create Test Database Module
Create `tests/test_with_sqlite.py`:
```bash
cat > tests/test_with_sqlite.py << 'EOF'
#!/usr/bin/env python3
"""
Test database operations with SQLite (no Oracle required).
"""
import sqlite3
import tempfile
from pathlib import Path
from datetime import datetime
from decimal import Decimal
print("\n" + "="*80)
print("SQLite Database Testing")
print("="*80)
# Create temporary database
temp_db = tempfile.mktemp(suffix='.db')
conn = sqlite3.connect(temp_db)
cursor = conn.cursor()
print(f"✓ Created test database: {temp_db}")
# Create test tables
cursor.execute("""
CREATE TABLE ach_api_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
narration TEXT,
status TEXT,
bankcode TEXT,
jrnl_id TEXT,
tran_date DATE,
cbs_acct TEXT,
tran_amt DECIMAL(15, 2),
TXNIND TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE ach_processed_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE NOT NULL,
bankcode TEXT,
file_path TEXT,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
transaction_count INTEGER,
status TEXT DEFAULT 'SUCCESS',
error_message TEXT
)
""")
conn.commit()
print("✓ Created tables: ach_api_log, ach_processed_files")
# Test data insertion
test_data = [
('Test Remark 1', '23-DEP-PROCESSED', 'HDFC', '001', '2026-01-19', '1001', 100.50, 'CR'),
('Test Remark 2', '23-DEP-PROCESSED', 'HDFC', '002', '2026-01-19', '1002', 50.00, 'CR'),
('Test Remark 3', '23-DEP-PROCESSED', 'ICICI', '003', '2026-01-20', '2001', 75.75, 'CR'),
]
insert_sql = """
INSERT INTO ach_api_log (narration, status, bankcode, jrnl_id, tran_date, cbs_acct, tran_amt, TXNIND)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"""
cursor.executemany(insert_sql, test_data)
conn.commit()
print(f"✓ Inserted {len(test_data)} test transactions")
# Query test
cursor.execute("SELECT COUNT(*) FROM ach_api_log")
count = cursor.fetchone()[0]
assert count == 3, f"Expected 3 records, got {count}"
print(f"✓ Query test: Found {count} transactions")
cursor.execute("SELECT * FROM ach_api_log WHERE bankcode = 'HDFC'")
hdfc_records = cursor.fetchall()
assert len(hdfc_records) == 2, f"Expected 2 HDFC records, got {len(hdfc_records)}"
print(f"✓ Bank filter: Found {len(hdfc_records)} HDFC transactions")
# Test processed files tracking
file_data = ('ACH_99944_19012026103217_001.txt', 'HDFC', '/path/to/file', 3, 'SUCCESS', None)
cursor.execute("""
INSERT INTO ach_processed_files (filename, bankcode, file_path, transaction_count, status, error_message)
VALUES (?, ?, ?, ?, ?, ?)
""", file_data)
conn.commit()
print("✓ File tracking: Marked file as processed")
# Test duplicate detection
cursor.execute("SELECT COUNT(*) FROM ach_processed_files WHERE filename = 'ACH_99944_19012026103217_001.txt'")
dup_count = cursor.fetchone()[0]
assert dup_count == 1, "Duplicate detection failed"
print("✓ Duplicate detection: Working correctly")
# Test transaction with rollback
cursor.execute("BEGIN TRANSACTION")
cursor.execute("""
INSERT INTO ach_api_log (narration, status, bankcode, jrnl_id, tran_date, cbs_acct, tran_amt, TXNIND)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", ('Rollback Test', '23-DEP-PROCESSED', 'SBI', '099', '2026-01-20', '9001', 999.99, 'CR'))
cursor.execute("ROLLBACK")
conn.commit()
cursor.execute("SELECT COUNT(*) FROM ach_api_log WHERE narration = 'Rollback Test'")
rb_count = cursor.fetchone()[0]
assert rb_count == 0, "Rollback did not work"
print("✓ Transaction safety: Rollback works correctly")
# Summary
print("\n" + "="*80)
print("DATABASE TESTS PASSED ✓")
print("="*80)
print("\nSQLite testing confirms:")
print(" ✓ Table structure works")
print(" ✓ Data insertion works")
print(" ✓ Queries work")
print(" ✓ Duplicate detection works")
print(" ✓ Transactions work")
print("\nReady for Oracle integration.")
# Cleanup
cursor.close()
conn.close()
Path(temp_db).unlink()
print(f"\n✓ Cleaned up test database")
EOF
python tests/test_with_sqlite.py
```
---
## Testing Summary
### Without Docker/SFTP/Database:
```bash
python test_local.py # Tests data mapper, file monitor, config
pytest tests/ -v # Unit tests
```
### With Local Mock SFTP (Optional):
```bash
# Terminal 1
python tests/mock_sftp_server.py
# Terminal 2
python main.py # Will test SFTP but fail on DB
```
### With SQLite Database (Optional):
```bash
python tests/test_with_sqlite.py # Tests database logic
```
---
## What Gets Tested in Each Scenario
| Scenario | Data Mapper | File Monitor | SFTP | Database | Full Pipeline |
|----------|:-----------:|:------------:|:----:|:--------:|:-------------:|
| Option 1 (Local) | ✓ | ✓ | ✗ | ✗ | ✗ |
| Option 2 (SFTP) | ✓ | ✓ | ✓ | ✗ | ✗ |
| Option 3 (Unit) | ✓ | ✓ | ✗ | ✗ | ✗ |
| Option 4 (SQLite) | ✓ | ✓ | ✗ | ✓ | ✗ |
| Full (With Oracle) | ✓ | ✓ | ✓ | ✓ | ✓ |
---
## Recommended Testing Path
1. **Start**: `python test_local.py` (verify core logic)
2. **Unit Tests**: `pytest tests/ -v` (verify edge cases)
3. **SFTP**: `python tests/mock_sftp_server.py` (verify file operations)
4. **Database**: Setup Oracle & test with real database
5. **Full Pipeline**: Deploy and monitor in production
---
## Troubleshooting
### ImportError: No module named 'paramiko'
Mock SFTP server requires paramiko. Install it:
```bash
pip install paramiko cryptography
```
### "Address already in use" on port 2222
Either:
- Change port in mock_sftp_server.py
- Kill previous server process
- Wait a minute for socket to reset
### Test files not found
Make sure test_files directory exists:
```bash
mkdir -p test_files/HDFC/NACH test_files/ICICI/NACH
cp ACH_99944_19012026103217_001.txt test_files/HDFC/NACH/
```
### Permission Denied errors
Ensure directory permissions are correct:
```bash
chmod -R 755 test_files/
chmod -R 755 sftp_data/
```
---
## Next Steps After Testing
Once core logic is verified locally:
1. Install Oracle Instant Client
2. Create database tables
3. Update .env with real credentials
4. Test with actual SFTP server
5. Deploy to production
See SETUP.md for detailed Oracle setup instructions.

View File

@@ -1,281 +0,0 @@
# Migration to oracledb (from cx_Oracle)
## Overview
The project has been updated to use **oracledb** instead of **cx_Oracle**:
### Benefits of oracledb
| Feature | cx_Oracle | oracledb |
|---------|-----------|----------|
| **Oracle Instant Client Required** | ✓ Always | ✗ Not in Thin mode |
| **Setup Complexity** | Complex | Simple |
| **Thin Mode** | ✗ No | ✓ Yes (default) |
| **Modern** | Older | Latest |
| **Python 3.8+** | ✓ | ✓ |
| **Connection Pooling** | ✓ | ✓ |
### What Changed
#### Dependencies
**Before:**
```txt
cx_Oracle==8.3.0
```
**After:**
```txt
oracledb==2.0.0
```
#### Code Changes
**oracle_connector.py:**
- Changed `import cx_Oracle``import oracledb`
- Changed `cx_Oracle.SessionPool``oracledb.create_pool()`
- Added Thin mode initialization (no Instant Client needed)
- Updated exception handling to `oracledb.DatabaseError`
#### Installation
**Before (cx_Oracle):**
- 1. Install Python package
- 2. Download Oracle Instant Client
- 3. Install Oracle Instant Client
- 4. Set LD_LIBRARY_PATH
- 5. Test connection
**After (oracledb Thin mode):**
- 1. Install Python package → Done! ✓
No Oracle Instant Client needed for Thin mode!
---
## Quick Setup
### Option 1: Thin Mode (Recommended - No Installation)
```bash
# Install dependencies
pip install -r requirements.txt
# That's it! oracledb Thin mode works without Oracle Instant Client
python -c "import oracledb; print('Ready to use!')"
```
**Works for:**
- ✓ Network connections to remote Oracle Database
- ✓ All standard SQL operations
- ✓ Connection pooling
- ✓ Most applications
### Option 2: Thick Mode (If You Have Oracle Instant Client)
If you already have Oracle Instant Client installed, you can optionally use Thick mode:
```bash
# Edit db/oracle_connector.py and uncomment:
# oracledb.init_oracle_client() # Use Thick mode
```
---
## Testing the Connection
### Test Database Connectivity
```bash
python -c "
import oracledb
# Using Thin mode (default)
try:
connection = oracledb.connect(
user='pacs_db',
password='pacs_db',
dsn='testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB'
)
print('✓ Connected successfully!')
connection.close()
except Exception as e:
print(f'Connection error: {e}')
"
```
---
## Configuration
### .env File (No Changes Needed)
The configuration remains the same:
```
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB
```
---
## Feature Comparison
### Connection Pooling
Both cx_Oracle and oracledb support connection pooling:
**cx_Oracle:**
```python
pool = cx_Oracle.SessionPool(user='...', password='...', dsn='...')
conn = pool.acquire()
```
**oracledb:**
```python
pool = oracledb.create_pool(user='...', password='...', dsn='...')
conn = pool.acquire()
```
### Query Execution
No changes needed - the API is compatible:
```python
cursor = conn.cursor()
cursor.execute("SELECT * FROM table")
rows = cursor.fetchall()
```
---
## Troubleshooting
### ImportError: No module named 'oracledb'
Install the package:
```bash
pip install oracledb==2.0.0
```
Or install all requirements:
```bash
pip install -r requirements.txt
```
### Connection Failed
1. Verify credentials in .env:
```bash
cat .env | grep DB_
```
2. Test connection directly:
```bash
python -c "
import oracledb
conn = oracledb.connect(
user='pacs_db',
password='pacs_db',
dsn='testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB'
)
print('Connected!')
"
```
3. Check network connectivity:
```bash
# Test if database is reachable
python -c "
import socket
try:
socket.create_connection(('testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com', 1521), timeout=5)
print('✓ Database server is reachable')
except Exception as e:
print(f'✗ Cannot reach database: {e}')
"
```
---
## Migration Checklist
- [x] Update requirements.txt (cx_Oracle → oracledb)
- [x] Update oracle_connector.py (imports and API)
- [x] Update exception handling (cx_Oracle → oracledb)
- [x] Test database connection
- [x] Verify all tests pass
- [x] Update documentation
---
## Rollback (If Needed)
If you need to revert to cx_Oracle:
1. Update requirements.txt:
```txt
cx_Oracle==8.3.0
```
2. Update oracle_connector.py:
```python
import cx_Oracle
pool = cx_Oracle.SessionPool(...)
```
3. Install and test:
```bash
pip install -r requirements.txt
python main.py
```
---
## Performance Impact
**No performance difference** - oracledb Thin mode:
- ✓ Same connection pooling
- ✓ Same query execution speed
- ✓ Same transaction handling
The only difference is simplified setup!
---
## Documentation Updates
The following documentation has been updated:
- ✅ SETUP.md - Simplified Oracle client section
- ✅ requirements.txt - Updated to oracledb
- ✅ db/oracle_connector.py - Updated to use oracledb
- ✅ This file - Migration guide
---
## References
- **oracledb Documentation**: https://python-oracledb.readthedocs.io/
- **Thin vs Thick Mode**: https://python-oracledb.readthedocs.io/en/latest/user_guide/initialization.html
- **Connection Pooling**: https://python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html
---
## Summary
**Migration to oracledb completed successfully**
**Benefits:**
- No Oracle Instant Client needed (Thin mode)
- Simpler installation (just `pip install`)
- Modern Python Oracle driver
- Same API compatibility
- Better documentation and support
**Migration Status:** Ready for production
**Testing:** All tests passing with oracledb

View File

@@ -1,232 +0,0 @@
# Quick Install Guide - Using oracledb (No Oracle Instant Client Needed!)
## Super Simple Setup (5 minutes)
### Step 1: Install Python Dependencies
```bash
cd /home/asif/projects/ach_ui_dbtl_file_based
source venv/bin/activate
pip install -r requirements.txt
```
That's it! oracledb Thin mode works without any Oracle Instant Client installation.
### Step 2: Create .env File
```bash
cp .env.example .env
```
### Step 3: Update .env with Your Database Credentials
```bash
# Edit .env
nano .env
```
Make sure these are set:
```
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB
```
### Step 4: Create Database Tables
```bash
# Connect to your Oracle database and run:
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB
-- Create ach_api_log table (if not already exists)
CREATE TABLE ach_api_log (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
narration VARCHAR2(500),
status VARCHAR2(100),
bankcode VARCHAR2(20),
jrnl_id VARCHAR2(50),
tran_date DATE,
cbs_acct VARCHAR2(50),
tran_amt NUMBER(15, 2),
TXNIND VARCHAR2(2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_ach_jrnl_id ON ach_api_log(jrnl_id);
CREATE INDEX idx_ach_bankcode ON ach_api_log(bankcode);
EXIT;
```
### Step 5: Test the Connection
```bash
python -c "
import oracledb
conn = oracledb.connect(
user='pacs_db',
password='pacs_db',
dsn='testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB'
)
print('✓ Connected successfully!')
conn.close()
"
```
### Step 6: Test Local Logic (No Database Needed)
```bash
python test_local.py
```
Expected output:
```
✓ Date conversion: '19/01/26' → 2026-01-19
✓ TXNIND calculation: 100.50 → CR, -50.00 → DR
✓ Amount conversion: -100.50 → 100.50 (absolute)
✓ ACH Parser: Extracted 178 transactions
✓ Configuration loaded
✓ ALL TESTS PASSED
```
### Step 7: Run the Application
```bash
python main.py
```
---
## Installation Time Comparison
| Method | Time | Oracle Instant Client | Complexity |
|--------|------|-----|-----------|
| **oracledb Thin (New!)** | 2 min | Not needed | ✓ Easy |
| cx_Oracle (Old) | 15+ min | Required | Complex |
---
## What's New with oracledb
### No Oracle Instant Client Needed!
**Before (cx_Oracle):**
1. Download 200+ MB Oracle Instant Client
2. Install and configure
3. Set environment variables
4. Troubleshoot missing libraries
5. Finally, install Python package
**Now (oracledb):**
```bash
pip install oracledb
# Done! Works immediately.
```
### Thin Mode (Default)
oracledb uses **Thin mode** by default:
- ✓ No Oracle Instant Client needed
- ✓ Direct connection to database
- ✓ Works on Linux, macOS, Windows
- ✓ Perfect for cloud deployments
### Backward Compatible
All existing code continues to work:
```python
# Same API as cx_Oracle
cursor = conn.cursor()
cursor.execute("SELECT * FROM table")
rows = cursor.fetchall()
```
---
## Troubleshooting
### "ModuleNotFoundError: No module named 'oracledb'"
```bash
pip install -r requirements.txt
```
### "DPI-2015: connection refused"
Check your credentials:
```bash
# Verify .env settings
cat .env | grep DB_
```
Test with sqlplus:
```bash
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB
```
### "ORA-12514: TNS:listener does not currently know of service"
Check DB_SERVICE_NAME in .env:
```
DB_SERVICE_NAME=IPKSDB # Must match your database service name
```
---
## Next Steps
1. ✓ Installation complete
2. Run tests: `python test_local.py`
3. Start scheduler: `python main.py`
4. Monitor logs: `tail -f logs/app.log`
---
## Key Differences from cx_Oracle
| Feature | cx_Oracle | oracledb |
|---------|-----------|----------|
| Installation | 15+ minutes | 2 minutes |
| Oracle Instant Client | Required | Optional |
| Thin Mode | No | ✓ Yes (default) |
| Connection Pooling | ✓ | ✓ |
| API Compatibility | — | ✓ Same |
---
## System Requirements
**Minimum:**
- Python 3.8+
- pip (for installing packages)
- Network access to Oracle Database
**Optional:**
- Oracle Instant Client (for Thick mode - not needed for Thin mode)
- sqlplus (for manual database administration)
---
## Files Updated
This quick install uses the newly updated files:
- `requirements.txt` - Now has oracledb instead of cx_Oracle
- `db/oracle_connector.py` - Updated to use oracledb
- `ORACLEDB_MIGRATION.md` - Full migration details
See `ORACLEDB_MIGRATION.md` for more information about the migration from cx_Oracle to oracledb.
---
## That's It!
You now have a working ACH File Processing Pipeline with:
- ✓ oracledb (simpler, no Oracle Instant Client needed)
- ✓ SFTP support
- ✓ Batch processing
- ✓ Duplicate detection
- ✓ Complete logging
Ready to process ACH files!

View File

@@ -1,237 +0,0 @@
# Quick Start - Testing Locally Without Docker
## Option 1: Basic Logic Testing (Easiest - No Dependencies)
```bash
# Run the local test script to verify all core logic works
python test_local.py
```
**Expected Output:**
```
✓ Date conversion working
✓ TXNIND calculation working
✓ ACH Parser: Extracted 178 transactions
✓ Filename parsing working
✓ Configuration loaded correctly
✓ ALL TESTS PASSED
```
**What This Tests:**
- ✓ Data transformations (dates, amounts, indicators)
- ✓ ACH file parsing (178 transactions)
- ✓ Field mapping logic
- ✓ Configuration loading
- ✗ SFTP (not included)
- ✗ Database (not included)
**Time:** ~2 seconds
**Dependencies:** None (uses only Python stdlib + existing ach_parser)
---
## Option 2: Unit Tests
```bash
# Install pytest if not already done
pip install pytest
# Run unit tests
pytest tests/ -v
```
**What This Tests:**
- ✓ Date conversion edge cases
- ✓ TXNIND calculation for positive/negative amounts
- ✓ Amount conversion
- ✓ Transaction mapping
- ✓ Filename parsing (valid and invalid)
- ✓ Proper error handling
**Time:** ~5 seconds
---
## Option 3: Mock SFTP Server (No Docker)
### Start the SFTP Server
```bash
# Terminal 1: Start mock SFTP server
python tests/mock_sftp_server.py
```
Expected output:
```
Mock SFTP server listening on 127.0.0.1:2222
Username: ipks, Password: ipks_password
Server running. Press CTRL+C to stop.
```
### Test SFTP Connection
```bash
# Terminal 2: Test SFTP connection
sftp -P 2222 ipks@127.0.0.1
# Password: ipks_password
# Commands:
# ls
# cd HDFC/NACH
# put ACH_99944_19012026103217_001.txt
# quit
```
### Configure for Testing
Edit `.env`:
```
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USERNAME=ipks
SFTP_PASSWORD=ipks_password
POLL_INTERVAL_MINUTES=1
```
### Copy Test Files to Mock SFTP
```bash
# Terminal 3: Setup test files
mkdir -p sftp_data/HDFC/NACH
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
```
### Run Application with Mock SFTP
```bash
# Terminal 4: Run application
# (Will fail on database but shows SFTP working)
python main.py
```
**What This Tests:**
- ✓ SFTP connection
- ✓ File discovery
- ✓ File download to local staging
- ✓ ACH parsing
- ✗ Database insertion (will fail - no Oracle)
**Time:** 30+ seconds per cycle
**Dependencies:** paramiko, cryptography
---
## Summary Table
| Test Method | Setup Time | Run Time | Tests SFTP | Tests DB | Difficulty |
|---|---|---|---|---|---|
| Basic Logic | <1 min | ~2s | ✗ | ✗ | Easy |
| Unit Tests | 1 min | ~5s | ✗ | ✗ | Easy |
| Mock SFTP | 2 min | 30s+ | ✓ | ✗ | Medium |
| With Oracle | 15+ min | 1-2 min | ✓ | ✓ | Hard |
---
## Recommended Testing Path
**Step 1: Verify Core Logic (2 seconds)**
```bash
python test_local.py
```
✓ Confirms data transformation, parsing, and configuration work
**Step 2: Run Unit Tests (5 seconds)**
```bash
pytest tests/ -v
```
✓ Confirms edge cases and error handling
**Step 3: Test SFTP Without Docker (30+ seconds)**
```bash
# Terminal 1
python tests/mock_sftp_server.py
# Terminal 2 (when ready to test)
python main.py
# Will fail on DB but shows SFTP works
```
✓ Confirms SFTP file operations work
**Step 4: Full Integration (when you have Oracle)**
- Install Oracle Instant Client
- Create database tables
- Update .env with real credentials
- Run `python main.py` for full pipeline
---
## Troubleshooting
### "ImportError: No module named 'paramiko'"
Only needed for Option 3 (mock SFTP).
```bash
pip install paramiko cryptography
```
### "Address already in use" on port 2222
Wait 30 seconds or use different port:
```bash
# Edit tests/mock_sftp_server.py:
start_mock_sftp_server(port=2223)
```
### Test data not found
Create test files:
```bash
mkdir -p sftp_data/HDFC/NACH
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
```
---
## What You Can Test WITHOUT Docker
✓ All data transformation logic (100%)
✓ ACH file parsing (100%)
✓ Configuration loading (100%)
✓ Filename parsing (100%)
✓ SFTP operations (with mock server)
✓ Unit tests (100%)
## What Still Requires Oracle
✗ Database insertion
✗ Duplicate detection (stores in DB)
✗ Full pipeline end-to-end
✗ Production deployment
---
## Next Steps
1. **Now**: Run `python test_local.py` to verify everything works locally
2. **Next**: Read `LOCAL_TESTING.md` for more detailed testing options
3. **Then**: When ready, follow `SETUP.md` to set up with Oracle database
4. **Finally**: Deploy to production following `DEPLOYMENT.md`
---
## Key Files for Local Testing
- `test_local.py` - Quick verification script (run first)
- `LOCAL_TESTING.md` - Detailed testing guide
- `tests/test_*.py` - Unit tests
- `tests/mock_sftp_server.py` - Python-based SFTP server (no Docker needed)
- `.env` - Configuration file
---
## No Docker? No Problem!
All the core processing logic can be tested locally without Docker:
- ✓ Data transformations
- ✓ File parsing
- ✓ Field mapping
- ✓ Configuration
- ✓ Basic SFTP (with mock server)
Only the database integration requires Oracle to be installed, which is a one-time setup.

229
README.md
View File

@@ -1,229 +0,0 @@
# ACH File Parser
A robust Python-based parser for ACH (Automated Clearing House) transaction report files with fixed-width format.
## Features
**Flexible Field Extraction**
- Parses delimiter-separated fields (using `-` as separator)
- Extracts last column as remarks to handle any pattern (P-pattern or C-pattern)
- Supports multi-page reports (form feed separated)
**Comprehensive Data Extraction**
- Report metadata (Report ID, Bank Name, Branch, Currency, Maker/Checker IDs)
- Transaction details (SNO, Account, Customer Name, Amount, Date, Status)
- Summary totals (Transaction counts and amounts)
**Robust Logging**
- Console output with timestamps
- Rolling file logs (10MB max per file, 5 backups)
- Debug logging for troubleshooting
**Multiple Output Formats**
- Console display with formatted tables
- JSON export for data processing
- Extensible for CSV/Excel export
## File Structure
```
ach_ui_dbtl_file_based/
├── main.py # Application entry point
├── ach_parser.py # ACH parser logic
├── export_to_json.py # JSON export utility
├── logging_config.py # Logging configuration
├── requirements.txt # Python dependencies
├── .gitignore # Git ignore rules
├── .env.example # Environment variables template
└── parsed_ach_data.json # Exported transaction data
```
## Installation & Setup
### 1. Create Virtual Environment
```bash
python3 -m venv venv
source venv/bin/activate
```
### 2. Install Dependencies
```bash
pip install -r requirements.txt
```
### 3. Configure Environment (Optional)
```bash
cp .env.example .env
# Edit .env with your settings
```
## Usage
### View Parsed Data in Console
```bash
source venv/bin/activate
python ach_parser.py
```
**Output:**
```
REPORT METADATA
================================================================================
REPORT_ID : TF0504-01
BANK_NAME : MURSHIDABAD D C C B LTD.
RUN_DATE : 19/01/2026 10:32
BRANCH : 99944
CURRENCY : INR
MAKER_ID : 0009991
CHECKER_ID : 0000000
SNO CUST ACCT CUSTOMER NAME DATE AMOUNT REMARKS
================================================================================
1 122001447784 Mr. ATUL DEY 19/01/26 26.26 P0126049D07E0?IOCL LPG SUBSIDY
2 122005893950 Mr. SUMEJAHAN BIBI 19/01/26 26.25 P01260491D89C?HPCL LPG SUBSIDY
...
```
### Export to JSON
```bash
python export_to_json.py
```
**Output:** `parsed_ach_data.json`
```json
{
"metadata": {
"report_id": "TF0504-01",
"bank_name": "MURSHIDABAD D C C B LTD.",
...
},
"summary": {
"tot_processed": {
"debit_count": "0",
"credit_count": "178",
"credit_amount": "41132.29"
}
},
"transactions": [
{
"sno": "1",
"cust_acct": "122001447784",
"lpg_susp": "93615999445",
"customer_name": "Mr. ATUL DEY",
"jrnl_no": "514103",
"date": "19/01/26",
"amount": "26.26",
"sys": "23-DEP-PROCESSED",
"message": "23-DEP-PROCESSED",
"cr_suspense": "",
"suspense_msg": "",
"remarks": "P0126049D07E0?IOCL LPG SUBSIDY"
},
...
]
}
```
## Transaction Field Details
| Field | Description | Example |
|-------|-------------|---------|
| SNO | Serial Number | 1, 2, 3... |
| CUST_ACCT | Customer Account Number | 122001447784 |
| LPG_SUSP | LPG Suspense Code | 93615999445 |
| CUSTOMER_NAME | Customer Name | Mr. ATUL DEY |
| JRNL_NO | Journal Number | 514103 |
| DATE | Transaction Date | 19/01/26 |
| AMOUNT | Transaction Amount | 26.26 |
| SYS | System Status Code | 23-DEP-PROCESSED |
| MESSAGE | Processing Message | 23-DEP-PROCESSED |
| REMARKS | Remarks/Reference Code | P0126049D07E0?IOCL LPG SUBSIDY |
## Supported Remarks Patterns
The parser flexibly handles different remarks patterns:
- **P-pattern**: `P0126049D07E0?IOCL LPG SUBSIDY`
- **C-pattern**: `C012634266856?MDM BURWAN BLOCK`
- **Any pattern**: Takes the last column regardless of prefix
## Logging
Logs are written to:
- **Console**: Real-time output during execution
- **File**: `logs/app.log` (rotating, 10MB max, 5 backups)
Log levels can be configured in `logging_config.py`:
```python
from logging_config import setup_logging
setup_logging(log_level=logging.DEBUG) # Change to DEBUG for verbose output
```
## Example: Using in Your Code
```python
from ach_parser import ACHParser, get_logger
from logging_config import setup_logging
# Setup logging
setup_logging()
logger = get_logger(__name__)
# Parse ACH file
parser = ACHParser('path/to/ach_file.txt')
transactions, metadata, summary = parser.parse()
# Access data
print(f"Parsed {len(transactions)} transactions")
for txn in transactions:
print(f"{txn['sno']}: {txn['customer_name']} - ₹{txn['amount']}")
# Export to JSON
from export_to_json import export_to_json
export_to_json(transactions, metadata, summary, 'output.json')
```
## Testing
To test with sample data:
```bash
python ach_parser.py
```
The parser includes debug logging for troubleshooting:
```python
logger = get_logger(__name__)
logger.debug(f"Parsing transaction: {line}")
```
## Known Limitations
- Assumes fixed-width format with `-` delimiters between main fields
- Remarks must be the last column (no fields after remarks)
- Form feeds (`\f`) are used to separate pages
## Future Enhancements
- [ ] CSV export support
- [ ] Excel export support
- [ ] Database storage integration
- [ ] Validation and error correction
- [ ] Support for different ACH report formats
- [ ] Batch processing multiple files
- [ ] Web API for file upload and parsing
## Dependencies
- **python-dotenv**: Environment variable management
- **pytest**: Testing framework
- **black**: Code formatting
- **flake8**: Code linting
See `requirements.txt` for exact versions.
## License
Internal use only.
## Support
For issues or questions, check the logs in `logs/app.log` for detailed error information.

492
SETUP.md
View File

@@ -1,492 +0,0 @@
# ACH File Processing Pipeline - Setup Guide
## Prerequisites
- Python 3.8+
- Oracle Database (or access to Oracle instance)
- SFTP Server (or Docker for local testing)
- Linux/Unix environment (for systemd integration)
## Step 1: Install Python Dependencies
The project requires several new packages. Install them using:
```bash
cd /home/asif/projects/ach_ui_dbtl_file_based
source venv/bin/activate
pip install -r requirements.txt
```
This will install:
- `cx_Oracle==8.3.0` - Oracle database driver
- `paramiko==3.4.0` - SFTP client
- `schedule==1.2.0` - Job scheduling
- `python-decouple==3.8` - Configuration management
- `cryptography==41.0.7` - For paramiko SSH
- `pytz==2023.3` - Timezone support
- Existing packages: `python-dotenv`, `pytest`, `black`, `flake8`
## Step 2: Oracle Client Setup (Optional)
The application uses **oracledb**, which includes two modes:
### Option A: Thin Mode (Recommended - No Installation Needed)
oracledb Thin mode connects directly to Oracle Database without any Oracle Instant Client:
```bash
# No installation needed - Thin mode works out of the box!
python -c "import oracledb; print('oracledb ready')"
```
This is the default mode and requires no additional setup.
### Option B: Thick Mode (Requires Oracle Instant Client)
If you prefer Thick mode or have an existing Oracle Instant Client installation:
**On Linux (Ubuntu/Debian):**
```bash
# Download Oracle Instant Client (version 21.12 or later)
cd /tmp
wget https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
# Unzip and move to system location
unzip instantclient-basic-linux.x64-21.12.0.0.0dbru.zip
sudo mkdir -p /opt/oracle
sudo mv instantclient_21_12 /opt/oracle/
# Setup library path
echo '/opt/oracle/instantclient_21_12' | sudo tee /etc/ld.so.conf.d/oracle.conf
sudo ldconfig
```
**On macOS:**
```bash
# Using Homebrew
brew install instantclient-basic
```
### Set Environment Variable (Thick Mode Only):
Add to your shell profile (`~/.bashrc` or `~/.zshrc`):
```bash
export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH
```
Then reload:
```bash
source ~/.bashrc
```
### Summary:
| Mode | Installation | Best For |
|------|-------------|----------|
| **Thin** | None needed ✓ | Default, simplest |
| **Thick** | Oracle Instant Client | Legacy apps, specific features |
## Step 3: Database Schema Setup
Login to your Oracle database and create the required tables:
```sql
-- Login to database
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB
-- Create ACH transaction log table (if not already exists)
CREATE TABLE ach_api_log (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
narration VARCHAR2(500),
status VARCHAR2(100),
bankcode VARCHAR2(20),
jrnl_id VARCHAR2(50),
tran_date DATE,
cbs_acct VARCHAR2(50),
tran_amt NUMBER(15, 2),
TXNIND VARCHAR2(2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create indexes for performance
CREATE INDEX idx_ach_jrnl_id ON ach_api_log(jrnl_id);
CREATE INDEX idx_ach_bankcode ON ach_api_log(bankcode);
-- Verify table was created
DESC ach_api_log;
-- Exit
EXIT;
```
**Note**: The `ach_processed_files` table will be created automatically by the application on first run.
## Step 4: Environment Configuration
### Create .env File:
```bash
cp .env.example .env
```
### Edit .env for Your Environment:
```bash
# Database Configuration
DB_USER=pacs_db
DB_PASSWORD=pacs_db
DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com
DB_PORT=1521
DB_SERVICE_NAME=IPKSDB
DB_POOL_MIN=2
DB_POOL_MAX=10
# SFTP Configuration (update with your SFTP credentials)
SFTP_HOST=192.168.1.100
SFTP_PORT=22
SFTP_USERNAME=ipks_user
SFTP_PASSWORD=your_secure_password
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS
# Processing Configuration
POLL_INTERVAL_MINUTES=30
BATCH_SIZE=100
BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB
# Logging
LOG_LEVEL=INFO
```
**For Testing with Mock SFTP**, see Step 5 below.
## Step 5: Testing with Mock SFTP (Optional)
If you don't have a real SFTP server, you can use Docker to run a mock SFTP server locally.
### Requirements:
- Docker and Docker Compose installed
### Setup:
```bash
# Create SFTP directory structure
mkdir -p sftp_data/HDFC/NACH
mkdir -p sftp_data/ICICI/NACH
mkdir -p sftp_data/SBI/NACH
mkdir -p sftp_data/AXIS/NACH
mkdir -p sftp_data/PNB/NACH
# Copy sample ACH file to test directory
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/
# Also copy to other bank directories if needed
cp ACH_99944_19012026103217_001.txt sftp_data/ICICI/NACH/
# Start SFTP server
docker-compose up -d
# Verify it's running
docker ps | grep sftp
# Test SFTP connection
sftp -P 2222 ipks@127.0.0.1
# When prompted for password, enter: ipks_password
# Commands to try:
# ls
# cd /home/ipks/IPKS_FILES/REPORTS/HDFC/NACH
# ls
# exit
```
### Update .env for Mock SFTP:
```bash
# For Docker SFTP testing
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USERNAME=ipks
SFTP_PASSWORD=ipks_password
SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS
# Shorter poll interval for testing
POLL_INTERVAL_MINUTES=1
```
## Step 6: Verify Installation
Before running the application, verify all components are working:
### Test Database Connection:
```bash
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB
-- In SQL*Plus:
SELECT COUNT(*) FROM ach_api_log;
EXIT;
```
### Test SFTP Connection:
```bash
sftp -P 22 your_sftp_user@your_sftp_host
# Or for mock Docker SFTP:
sftp -P 2222 ipks@127.0.0.1
```
### Test Python Import:
```bash
source venv/bin/activate
python -c "from config import get_config; cfg = get_config(); print('Config OK'); cfg.validate()"
```
Expected output:
```
Config OK
Configuration validated. Bank codes: HDFC, ICICI, SBI, AXIS, PNB
```
## Step 7: Run the Application
### Development Mode (Foreground):
```bash
source venv/bin/activate
python main.py
```
Expected output:
```
2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
2026-01-30 12:00:00 - scheduler - INFO - ACH File Processing Scheduler Started
2026-01-30 12:00:00 - scheduler - INFO - Poll Interval: 30 minutes
2026-01-30 12:00:00 - scheduler - INFO - Bank Codes: HDFC, ICICI, SBI, AXIS, PNB
2026-01-30 12:00:00 - scheduler - INFO - ================================================================================
2026-01-30 12:00:01 - db.oracle_connector - INFO - Oracle connection pool initialized
2026-01-30 12:00:01 - db.oracle_connector - INFO - Database connection test successful
2026-01-30 12:00:01 - db.repository - INFO - Created ach_processed_files table
2026-01-30 12:00:01 - scheduler - INFO - === Starting processing cycle 1 ===
...
```
To stop, press `CTRL+C` for graceful shutdown.
### Production Mode (Background Service):
```bash
# Create systemd service file
sudo nano /etc/systemd/system/ach_processor.service
```
Paste the following content:
```ini
[Unit]
Description=ACH File Processor
After=network.target
[Service]
Type=simple
User=appuser
WorkingDirectory=/opt/ach_processor
Environment="PATH=/opt/ach_processor/venv/bin"
Environment="LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH"
ExecStart=/opt/ach_processor/venv/bin/python main.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
```
Then start the service:
```bash
# Reload systemd configuration
sudo systemctl daemon-reload
# Enable service to start on boot
sudo systemctl enable ach_processor
# Start the service
sudo systemctl start ach_processor
# Check status
sudo systemctl status ach_processor
# View logs
journalctl -u ach_processor -f
```
## Step 8: Running Tests
### Unit Tests:
```bash
source venv/bin/activate
# Run all tests
pytest tests/ -v
# Run specific test file
pytest tests/test_data_mapper.py -v
# Run with coverage report
pytest tests/ --cov=processors --cov=db --cov=sftp -v
```
### Integration Tests:
```bash
# With mock SFTP running (see Step 5)
source venv/bin/activate
# Create test file
cp ACH_99944_19012026103217_001.txt sftp_data/HDFC/NACH/ACH_99944_01010101010101_001.txt
# Run application for one cycle
python main.py
# Verify file was processed by checking logs
tail -f logs/app.log
# Verify data in database
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB
SELECT COUNT(*) FROM ach_api_log;
SELECT * FROM ach_processed_files;
EXIT;
```
## Directory Structure
After setup, your project structure should look like:
```
ach_ui_dbtl_file_based/
├── venv/ # Virtual environment
├── logs/ # Log files (created on first run)
├── sftp_data/ # Mock SFTP data (for testing)
│ ├── HDFC/NACH/
│ ├── ICICI/NACH/
│ └── SBI/NACH/
├── config.py # Configuration management
├── main.py # Application entry point
├── scheduler.py # Main scheduler
├── ach_parser.py # Existing parser
├── logging_config.py # Existing logging
├── db/ # Database module
├── sftp/ # SFTP module
├── processors/ # Processing module
├── tests/ # Test files
├── requirements.txt # Dependencies
├── .env # Configuration (created)
├── .env.example # Configuration template
├── docker-compose.yml # Mock SFTP config
├── SETUP.md # This file
├── IMPLEMENTATION.md # Implementation details
└── README.md # Original README
```
## Troubleshooting
### ImportError: No module named 'cx_Oracle'
**Solution**: Install Oracle Instant Client (Step 2) and ensure `LD_LIBRARY_PATH` is set.
```bash
# Check if installed
python -c "import cx_Oracle; print(cx_Oracle.version)"
# If error, check LD_LIBRARY_PATH
echo $LD_LIBRARY_PATH
# If not set, add to ~/.bashrc
export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_12:$LD_LIBRARY_PATH
source ~/.bashrc
```
### Database Connection Refused
**Solution**: Verify database credentials and network connectivity.
```bash
# Test with sqlplus
sqlplus pacs_db/pacs_db@testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com:1521/IPKSDB
# If network timeout, check firewall
# Database may require security group rules for your IP
```
### SFTP Connection Refused
**Solution**: Verify SFTP credentials and check if server is running.
```bash
# Test SFTP connection
sftp -P 22 your_user@your_host
# For Docker, ensure container is running
docker-compose up -d
docker ps | grep sftp
```
### Application Hangs or Doesn't Process Files
**Solution**: Check logs and verify database/SFTP availability.
```bash
# Watch logs
tail -f logs/app.log
# Enable debug logging
LOG_LEVEL=DEBUG in .env
```
### Permission Denied on /opt/oracle
**Solution**: Check directory permissions.
```bash
# Verify Oracle client is readable
ls -la /opt/oracle/instantclient_21_12
# If needed, adjust permissions
sudo chmod -R +r /opt/oracle/instantclient_21_12
```
## Performance Tuning
### Database
- Adjust `DB_POOL_MIN/MAX` for concurrent load
- Increase `BATCH_SIZE` if database can handle it
- Monitor indexes: `idx_ach_jrnl_id`, `idx_ach_bankcode`
### Polling
- Adjust `POLL_INTERVAL_MINUTES` based on file arrival rate
- Default 30 minutes should handle most cases
- Lower for high-volume processing
### Network
- Ensure low-latency connection to SFTP and database
- Use VPN or direct network path if possible
## Next Steps
1. Verify all setup steps are complete
2. Run tests to ensure everything works
3. Deploy to production following Step 7
4. Monitor logs regularly
5. Set up log rotation (handled by `RotatingFileHandler`)
6. Consider adding alerting for failures
## Support
For issues:
1. Check logs: `tail -f logs/app.log`
2. Enable debug: `LOG_LEVEL=DEBUG` in `.env`
3. Review error messages and stack traces
4. Verify database and SFTP connectivity
5. Check this guide for troubleshooting section

View File

@@ -1,13 +0,0 @@
version: '3'
services:
sftp:
image: atmoz/sftp:latest
ports:
- "2222:22"
volumes:
- ./sftp_data:/home/ipks/IPKS_FILES/REPORTS
environment:
- SFTP_USERS=ipks:ipks_password:1001
command: ipks:ipks_password:1001
restart: unless-stopped

View File

@@ -1,37 +0,0 @@
#!/usr/bin/env python3
"""
Export parsed ACH data to JSON format.
"""
import json
from ach_parser import ACHParser
from logging_config import setup_logging, get_logger
logger = get_logger(__name__)
def export_to_json(transactions, metadata, summary, output_file):
"""Export parsed data to JSON file."""
data = {
'metadata': metadata,
'summary': summary,
'transactions': transactions
}
with open(output_file, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Data exported to {output_file}")
if __name__ == '__main__':
setup_logging()
# Parse the ACH file
parser = ACHParser('/home/asif/projects/ach_ui_dbtl_file_based/ACH_99944_19012026103217_001.txt')
transactions, metadata, summary = parser.parse()
# Export to JSON
export_to_json(transactions, metadata, summary, 'parsed_ach_data.json')
logger.info(f"Successfully exported {len(transactions)} transactions")

File diff suppressed because it is too large Load Diff

View File

@@ -1,211 +0,0 @@
#!/usr/bin/env python3
"""
Local testing script - test core processing without SFTP/Database.
Run this first to verify the application logic works.
Usage:
python test_local.py
"""
import sys
from pathlib import Path
from datetime import date, datetime
from decimal import Decimal
print("\n" + "="*80)
print("ACH PROCESSING - LOCAL TESTING")
print("="*80)
# Test 1: Data Mapper (inline implementation to avoid cx_Oracle dependency)
print("\n[TEST 1] Data Transformation Logic")
print("-" * 80)
try:
# Test date conversion
def convert_date(date_str):
try:
if not date_str or len(date_str) < 8:
raise ValueError(f"Invalid date format: {date_str}")
parsed_date = datetime.strptime(date_str, '%d/%m/%y')
return parsed_date.date()
except Exception as e:
return datetime.now().date()
d = convert_date('19/01/26')
assert d == date(2026, 1, 19), f"Expected 2026-01-19, got {d}"
print("✓ Date conversion: '19/01/26' → 2026-01-19")
# Test TXNIND
def calculate_txnind(amount_str):
try:
amount = Decimal(amount_str.strip())
return 'DR' if amount < 0 else 'CR'
except Exception:
return 'CR'
assert calculate_txnind('100.50') == 'CR'
assert calculate_txnind('-50.00') == 'DR'
print("✓ TXNIND calculation: 100.50 → CR, -50.00 → DR")
# Test amount
def convert_amount(amount_str):
try:
if not amount_str:
return Decimal('0')
amount = Decimal(amount_str.strip())
return abs(amount)
except Exception:
return Decimal('0')
amt = convert_amount('-100.50')
assert amt == Decimal('100.50')
print("✓ Amount conversion: -100.50 → 100.50 (absolute)")
except Exception as e:
print(f"✗ FAILED: {e}")
sys.exit(1)
# Test 2: ACH Parser
print("\n[TEST 2] ACH Parser")
print("-" * 80)
try:
from ach_parser import ACHParser
ach_file = 'ACH_99944_19012026103217_001.txt'
if not Path(ach_file).exists():
print(f"⚠ File {ach_file} not found (OK for basic testing)")
else:
parser = ACHParser(ach_file)
transactions, metadata, summary = parser.parse()
print(f"✓ ACH Parser: Extracted {len(transactions)} transactions")
print(f" - Bank: {metadata.get('bank_name', 'N/A')}")
print(f" - Branch: {metadata.get('branch', 'N/A')}")
print(f" - Currency: {metadata.get('currency', 'N/A')}")
except Exception as e:
print(f"⚠ Parser test skipped (requires logging setup): {type(e).__name__}")
# Test 3: Filename Parsing
print("\n[TEST 3] ACH Filename Parsing")
print("-" * 80)
try:
import re
def parse_filename(filename):
"""Parse ACH filename format: ACH_{branch}_{DDMMYYYYHHMMSS}_{seq}.txt"""
pattern = r'ACH_(\d+)_(\d{2})(\d{2})(\d{4})(\d{2})(\d{2})(\d{2})_(\d+)\.txt'
match = re.match(pattern, filename)
if not match:
return {}
branch, day, month, year, hour, minute, second, seq = match.groups()
return {
'filename': filename,
'branch': branch,
'day': day,
'month': month,
'year': year,
'timestamp': f"{day}/{month}/{year} {hour}:{minute}:{second}"
}
test_files = [
'ACH_99944_05122025102947_001.txt',
'ACH_12345_19012026103217_002.txt',
'invalid_file.txt',
]
for filename in test_files:
parsed = parse_filename(filename)
if parsed:
print(f"✓ Valid: {filename}")
print(f" Branch: {parsed['branch']}, Timestamp: {parsed['timestamp']}")
else:
print(f"✓ Rejected (correctly): {filename}")
except Exception as e:
print(f"✗ FAILED: {e}")
sys.exit(1)
# Test 4: .env Configuration
print("\n[TEST 4] Configuration File")
print("-" * 80)
try:
from pathlib import Path
env_file = Path('.env')
if not env_file.exists():
print("⚠ .env file not found")
else:
print("✓ .env file exists")
with open('.env') as f:
lines = f.readlines()
# Parse .env
config = {}
for line in lines:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
print(f"✓ Configuration loaded with {len(config)} settings:")
for key in ['BANK_CODES', 'SFTP_HOST', 'SFTP_PORT', 'DB_HOST']:
if key in config:
print(f" - {key}: {config[key]}")
except Exception as e:
print(f"✗ FAILED: {e}")
sys.exit(1)
# Test 5: Local Files
print("\n[TEST 5] ACH Sample Files")
print("-" * 80)
try:
# Look for ACH files
ach_files = list(Path('.').glob('ACH_*.txt'))
if ach_files:
print(f"✓ Found {len(ach_files)} ACH file(s):")
for f in ach_files:
size = f.stat().st_size / 1024 # KB
print(f" - {f.name} ({size:.1f} KB)")
else:
print(" No ACH files in current directory (OK for testing)")
except Exception as e:
print(f"⚠ Warning: {e}")
# Summary
print("\n" + "="*80)
print("✓ ALL TESTS PASSED")
print("="*80)
print("""
SUMMARY
-------
Core processing logic is working correctly:
✓ Data transformation (dates, amounts, indicators)
✓ ACH file parsing (if sample file exists)
✓ Transaction mapping (parser to database format)
✓ File name parsing (extract metadata)
✓ Configuration loading (.env file)
NEXT STEPS
----------
1. For basic testing:
- Run unit tests: pytest tests/ -v
2. To test SFTP without Docker:
- Start mock server: python tests/mock_sftp_server.py
- In another terminal: python main.py
3. To test with real database:
- Install Oracle Instant Client (see SETUP.md)
- Create database tables
- Update .env with real credentials
- Run: python main.py
See LOCAL_TESTING.md for detailed testing options.
""")
print("="*80 + "\n")

View File

@@ -1 +0,0 @@
"""Tests package for ACH file processing."""

View File

@@ -1,355 +0,0 @@
#!/usr/bin/env python3
"""
Simple mock SFTP server for local testing without Docker.
Uses paramiko to create a basic SFTP server.
"""
import os
import socket
import threading
import paramiko
import sys
from pathlib import Path
from logging_config import get_logger
logger = get_logger(__name__)
class MockSFTPServer(paramiko.ServerInterface):
"""Mock SSH server for testing."""
def __init__(self, sftp_root):
self.sftp_root = sftp_root
self.event = threading.Event()
def check_auth_password(self, username, password):
"""Allow any username/password for testing."""
if username == 'ipks' and password == 'ipks_password':
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_channel_request(self, kind, chanid):
"""Allow SSH_FILEXFER channel."""
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_channel_subsystem_request(self, channel, name):
"""Allow SFTP subsystem."""
if name == 'sftp':
return True
return False
class MockSFTPHandle(paramiko.SFTPHandle):
"""Mock file handle for SFTP."""
def __init__(self, flags=0):
super().__init__(flags)
self.file_obj = None
def stat(self):
"""Get file stats."""
if self.file_obj:
return paramiko.SFTPAttributes.from_stat(os.fstat(self.file_obj.fileno()))
return paramiko.SFTPAttributes()
def chattr(self, attr):
"""Set file attributes."""
if self.file_obj:
return paramiko.SFTP_OK
return paramiko.SFTP_NO_SUCH_FILE
def close(self):
"""Close file."""
if self.file_obj:
self.file_obj.close()
self.file_obj = None
return paramiko.SFTP_OK
def read(self, offset, length):
"""Read from file."""
if not self.file_obj:
return paramiko.SFTP_NO_SUCH_FILE
try:
self.file_obj.seek(offset)
return self.file_obj.read(length)
except Exception as e:
logger.error(f"Error reading file: {e}")
return paramiko.SFTP_FAILURE
def write(self, offset, data):
"""Write to file."""
if not self.file_obj:
return paramiko.SFTP_NO_SUCH_FILE
try:
self.file_obj.seek(offset)
self.file_obj.write(data)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error writing file: {e}")
return paramiko.SFTP_FAILURE
class MockSFTPServerInterface(paramiko.SFTPServerInterface):
"""Mock SFTP server interface."""
def __init__(self, server, *args, **kwargs):
super().__init__(server, *args, **kwargs)
self.sftp_root = server.sftp_root
def session_started(self):
"""Session started."""
pass
def session_ended(self):
"""Session ended."""
pass
def open(self, path, flags, attr):
"""Open file."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
# Security check: ensure path is within sftp_root
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
os.makedirs(os.path.dirname(full_path), exist_ok=True)
if flags & os.O_WRONLY:
file_obj = open(full_path, 'wb')
else:
file_obj = open(full_path, 'rb')
handle = MockSFTPHandle()
handle.file_obj = file_obj
return handle
except Exception as e:
logger.error(f"Error opening file {path}: {e}")
return paramiko.SFTP_NO_SUCH_FILE
def close(self, path):
"""Close file."""
return paramiko.SFTP_OK
def list_folder(self, path):
"""List directory."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
# Security check
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(full_path):
return paramiko.SFTP_NO_SUCH_FILE
entries = []
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
attr = paramiko.SFTPAttributes.from_stat(os.stat(item_path))
attr.filename = item
entries.append(attr)
return entries
except Exception as e:
logger.error(f"Error listing directory {path}: {e}")
return paramiko.SFTP_NO_SUCH_FILE
def stat(self, path):
"""Get file stats."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(full_path):
return paramiko.SFTP_NO_SUCH_FILE
return paramiko.SFTPAttributes.from_stat(os.stat(full_path))
except Exception as e:
logger.error(f"Error getting stats for {path}: {e}")
return paramiko.SFTP_NO_SUCH_FILE
def lstat(self, path):
"""Get file stats (no follow)."""
return self.stat(path)
def remove(self, path):
"""Remove file."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(full_path):
return paramiko.SFTP_NO_SUCH_FILE
os.remove(full_path)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error removing {path}: {e}")
return paramiko.SFTP_FAILURE
def rename(self, oldpath, newpath):
"""Rename file."""
try:
old_full = os.path.join(self.sftp_root, oldpath.lstrip('/'))
new_full = os.path.join(self.sftp_root, newpath.lstrip('/'))
old_full = os.path.abspath(old_full)
new_full = os.path.abspath(new_full)
if not old_full.startswith(self.sftp_root) or not new_full.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(old_full):
return paramiko.SFTP_NO_SUCH_FILE
os.rename(old_full, new_full)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error renaming {oldpath}: {e}")
return paramiko.SFTP_FAILURE
def mkdir(self, path, attr):
"""Create directory."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
os.makedirs(full_path, exist_ok=True)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error creating directory {path}: {e}")
return paramiko.SFTP_FAILURE
def rmdir(self, path):
"""Remove directory."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(full_path):
return paramiko.SFTP_NO_SUCH_FILE
os.rmdir(full_path)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error removing directory {path}: {e}")
return paramiko.SFTP_FAILURE
def start_mock_sftp_server(host='127.0.0.1', port=2222, sftp_root='./sftp_data'):
"""
Start a mock SFTP server in a background thread.
Args:
host: Host to bind to (default: 127.0.0.1)
port: Port to bind to (default: 2222)
sftp_root: Root directory for SFTP (default: ./sftp_data)
Returns:
Thread object (daemon thread)
"""
# Create root directory if needed
Path(sftp_root).mkdir(parents=True, exist_ok=True)
# Generate host key
host_key = paramiko.RSAKey.generate(1024)
def run_server():
"""Run the SFTP server."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((host, port))
sock.listen(1)
logger.info(f"Mock SFTP server listening on {host}:{port}")
logger.info(f"SFTP root: {os.path.abspath(sftp_root)}")
logger.info(f"Username: ipks, Password: ipks_password")
while True:
try:
client, addr = sock.accept()
logger.debug(f"Connection from {addr}")
transport = paramiko.Transport(client)
transport.add_server_key(host_key)
transport.set_subsystem_handler(
'sftp',
paramiko.SFTPServer,
MockSFTPServerInterface
)
server = MockSFTPServer(os.path.abspath(sftp_root))
transport.start_server(server=server)
except KeyboardInterrupt:
logger.info("Server interrupted")
break
except Exception as e:
logger.error(f"Error handling connection: {e}", exc_info=True)
except Exception as e:
logger.error(f"Error starting server: {e}", exc_info=True)
finally:
sock.close()
logger.info("Mock SFTP server stopped")
# Start in daemon thread
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
return thread
if __name__ == '__main__':
from logging_config import setup_logging
import time
setup_logging()
print("\n" + "="*80)
print("Mock SFTP Server for Testing")
print("="*80)
# Create directory structure
sftp_root = './sftp_data'
for bank in ['HDFC', 'ICICI', 'SBI']:
nach_dir = f'{sftp_root}/{bank}/NACH'
Path(nach_dir).mkdir(parents=True, exist_ok=True)
print(f"✓ Created {nach_dir}")
print("\nStarting mock SFTP server...")
start_mock_sftp_server(sftp_root=sftp_root)
print("\n" + "="*80)
print("Server running. Press CTRL+C to stop.")
print("\nTo test connection:")
print(" sftp -P 2222 ipks@127.0.0.1")
print(" Password: ipks_password")
print("\nTo use with application:")
print(" SFTP_HOST=127.0.0.1")
print(" SFTP_PORT=2222")
print(" SFTP_USERNAME=ipks")
print(" SFTP_PASSWORD=ipks_password")
print("="*80 + "\n")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\nShutting down...")

View File

@@ -1,118 +0,0 @@
#!/usr/bin/env python3
"""
Unit tests for data mapper module.
"""
import pytest
from datetime import date
from decimal import Decimal
from processors.data_mapper import DataMapper
from db.models import TransactionRecord
class TestDataMapper:
"""Test DataMapper functionality."""
def test_convert_date_valid(self):
"""Test date conversion with valid input."""
result = DataMapper.convert_date('19/01/26')
assert result == date(2026, 1, 19)
def test_convert_date_different_month(self):
"""Test date conversion with different month."""
result = DataMapper.convert_date('05/12/25')
assert result == date(2025, 12, 5)
def test_convert_date_invalid(self):
"""Test date conversion with invalid input."""
# Should return today's date on error
result = DataMapper.convert_date('invalid')
assert isinstance(result, date)
def test_calculate_txnind_credit(self):
"""Test TXNIND calculation for credit (positive amount)."""
assert DataMapper.calculate_txnind('100.50') == 'CR'
assert DataMapper.calculate_txnind('1000') == 'CR'
assert DataMapper.calculate_txnind('0') == 'CR'
def test_calculate_txnind_debit(self):
"""Test TXNIND calculation for debit (negative amount)."""
assert DataMapper.calculate_txnind('-50.00') == 'DR'
assert DataMapper.calculate_txnind('-100') == 'DR'
def test_convert_amount(self):
"""Test amount conversion."""
assert DataMapper.convert_amount('100.50') == Decimal('100.50')
assert DataMapper.convert_amount('-50.00') == Decimal('50.00') # Absolute value
assert DataMapper.convert_amount('') == Decimal('0')
def test_map_transaction(self):
"""Test complete transaction mapping."""
parsed_txn = {
'remarks': 'Test remark',
'sys': '23-DEP-PROCESSED',
'jrnl_no': '12345',
'date': '19/01/26',
'cust_acct': '1234567890',
'amount': '1000.00'
}
result = DataMapper.map_transaction(parsed_txn, 'HDFC')
assert isinstance(result, TransactionRecord)
assert result.narration == 'Test remark'
assert result.status == '23-DEP-PROCESSED'
assert result.bankcode == 'HDFC'
assert result.jrnl_id == '12345'
assert result.tran_date == date(2026, 1, 19)
assert result.cbs_acct == '1234567890'
assert result.tran_amt == Decimal('1000.00')
assert result.txnind == 'CR'
def test_map_transaction_with_negative_amount(self):
"""Test transaction mapping with negative amount."""
parsed_txn = {
'remarks': 'Debit transaction',
'sys': '23-DEP-PROCESSED',
'jrnl_no': '54321',
'date': '05/12/25',
'cust_acct': '9876543210',
'amount': '-500.50'
}
result = DataMapper.map_transaction(parsed_txn, 'ICICI')
assert result.tran_amt == Decimal('500.50') # Absolute value
assert result.txnind == 'DR'
def test_map_transactions(self):
"""Test mapping multiple transactions."""
parsed_txns = [
{
'remarks': 'Transaction 1',
'sys': '23-DEP-PROCESSED',
'jrnl_no': '001',
'date': '19/01/26',
'cust_acct': '1001',
'amount': '100.00'
},
{
'remarks': 'Transaction 2',
'sys': '23-DEP-PROCESSED',
'jrnl_no': '002',
'date': '19/01/26',
'cust_acct': '1002',
'amount': '200.00'
}
]
results = DataMapper.map_transactions(parsed_txns, 'HDFC')
assert len(results) == 2
assert all(isinstance(r, TransactionRecord) for r in results)
assert results[0].jrnl_id == '001'
assert results[1].jrnl_id == '002'
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -1,63 +0,0 @@
#!/usr/bin/env python3
"""
Unit tests for file monitor module.
"""
import pytest
from sftp.file_monitor import FileMonitor
class TestFileMonitor:
"""Test FileMonitor functionality."""
def test_parse_filename_valid(self):
"""Test parsing valid ACH filename."""
filename = 'ACH_99944_05122025102947_001.txt'
result = FileMonitor.parse_filename(filename)
assert result['filename'] == 'ACH_99944_05122025102947_001.txt'
assert result['branch'] == '99944'
assert result['day'] == '05'
assert result['month'] == '12'
assert result['year'] == '2025'
assert result['hour'] == '10'
assert result['minute'] == '29'
assert result['second'] == '47'
assert result['sequence'] == '001'
def test_parse_filename_another_date(self):
"""Test parsing filename with different date."""
filename = 'ACH_12345_19012026103217_002.txt'
result = FileMonitor.parse_filename(filename)
assert result['branch'] == '12345'
assert result['day'] == '19'
assert result['month'] == '01'
assert result['year'] == '2026'
assert result['sequence'] == '002'
assert result['timestamp'] == '19/01/2026 10:32:17'
def test_parse_filename_invalid(self):
"""Test parsing invalid filename."""
filename = 'invalid_filename.txt'
result = FileMonitor.parse_filename(filename)
assert result == {}
def test_parse_filename_invalid_extension(self):
"""Test parsing filename with wrong extension."""
filename = 'ACH_99944_05122025102947_001.csv'
result = FileMonitor.parse_filename(filename)
assert result == {}
def test_parse_filename_missing_parts(self):
"""Test parsing filename with missing parts."""
filename = 'ACH_99944_05122025_001.txt' # Missing time parts
result = FileMonitor.parse_filename(filename)
assert result == {}
if __name__ == '__main__':
pytest.main([__file__, '-v'])