From 642c523570c4f3859b9dfea3e81b2051565065e7 Mon Sep 17 00:00:00 2001 From: Bishwajeet Kumar Rajak Date: Sun, 15 Mar 2026 16:09:28 +0530 Subject: [PATCH] rtgs_inward --- .claude/settings.local.json | 10 + .env.example | 23 ++ config.py | 92 +++++ db/__init__.py | 6 + db/__pycache__/__init__.cpython-39.pyc | Bin 0 -> 332 bytes db/__pycache__/models.cpython-39.pyc | Bin 0 -> 2501 bytes .../oracle_connector.cpython-39.pyc | Bin 0 -> 3597 bytes db/__pycache__/repository.cpython-39.pyc | Bin 0 -> 9521 bytes db/models.py | 83 +++++ db/oracle_connector.py | 111 ++++++ db/repository.py | 311 ++++++++++++++++ logging_config.py | 51 +++ processors/__init__.py | 6 + .../__pycache__/__init__.cpython-39.pyc | Bin 0 -> 343 bytes .../__pycache__/data_mapper.cpython-39.pyc | Bin 0 -> 5723 bytes .../__pycache__/file_processor.cpython-39.pyc | Bin 0 -> 5218 bytes processors/data_mapper.py | 160 ++++++++ processors/file_processor.py | 181 +++++++++ requirements.txt | 23 ++ rtgs_inward.py | 34 ++ rtgs_inward_parser.py | 344 ++++++++++++++++++ scheduler.py | 171 +++++++++ sftp/__init__.py | 6 + sftp/__pycache__/__init__.cpython-39.pyc | Bin 0 -> 323 bytes sftp/__pycache__/file_monitor.cpython-39.pyc | Bin 0 -> 3337 bytes sftp/__pycache__/sftp_client.cpython-39.pyc | Bin 0 -> 4485 bytes sftp/file_monitor.py | 108 ++++++ sftp/sftp_client.py | 157 ++++++++ 28 files changed, 1877 insertions(+) create mode 100644 .claude/settings.local.json create mode 100644 .env.example create mode 100644 config.py create mode 100644 db/__init__.py create mode 100644 db/__pycache__/__init__.cpython-39.pyc create mode 100644 db/__pycache__/models.cpython-39.pyc create mode 100644 db/__pycache__/oracle_connector.cpython-39.pyc create mode 100644 db/__pycache__/repository.cpython-39.pyc create mode 100644 db/models.py create mode 100644 db/oracle_connector.py create mode 100644 db/repository.py create mode 100644 logging_config.py create mode 100644 processors/__init__.py create mode 100644 processors/__pycache__/__init__.cpython-39.pyc create mode 100644 processors/__pycache__/data_mapper.cpython-39.pyc create mode 100644 processors/__pycache__/file_processor.cpython-39.pyc create mode 100644 processors/data_mapper.py create mode 100644 processors/file_processor.py create mode 100644 requirements.txt create mode 100644 rtgs_inward.py create mode 100644 rtgs_inward_parser.py create mode 100644 scheduler.py create mode 100644 sftp/__init__.py create mode 100644 sftp/__pycache__/__init__.cpython-39.pyc create mode 100644 sftp/__pycache__/file_monitor.cpython-39.pyc create mode 100644 sftp/__pycache__/sftp_client.cpython-39.pyc create mode 100644 sftp/file_monitor.py create mode 100644 sftp/sftp_client.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..54559c9 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,10 @@ +{ + "permissions": { + "allow": [ + "Bash(python3 -m venv:*)", + "Bash(source venv/bin/activate)", + "Bash(python:*)", + "Bash(pip install:*)" + ] + } +} diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ed5239f --- /dev/null +++ b/.env.example @@ -0,0 +1,23 @@ +# Database Configuration +DB_USER=pacs_db +DB_PASSWORD=pacs_db +DB_HOST=testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com +DB_PORT=1521 +DB_SERVICE_NAME=IPKSDB +DB_POOL_MIN=2 +DB_POOL_MAX=10 + +# SFTP Configuration +SFTP_HOST=192.168.1.100 +SFTP_PORT=22 +SFTP_USERNAME=ipks +SFTP_PASSWORD=secure_password +SFTP_BASE_PATH=/home/ipks/IPKS_FILES/REPORTS + +# Processing Configuration +POLL_INTERVAL_MINUTES=30 +BATCH_SIZE=100 +BANK_CODES=HDFC,ICICI,SBI,AXIS,PNB + +# Logging Configuration +LOG_LEVEL=INFO diff --git a/config.py b/config.py new file mode 100644 index 0000000..de3f895 --- /dev/null +++ b/config.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +Configuration management for ACH file processing pipeline. +Loads and validates environment variables. +""" + +import os +from pathlib import Path +from logging_config import get_logger + +logger = get_logger(__name__) + + +class Config: + """Application configuration from environment variables.""" + + def __init__(self): + """Initialize configuration from environment.""" + self._validate_env_file() + self._load_database_config() + self._load_sftp_config() + self._load_processing_config() + + def _validate_env_file(self): + """Check if .env file exists.""" + if not Path('.env').exists(): + logger.warning(".env file not found. Using environment variables or defaults.") + + def _load_database_config(self): + """Load database configuration.""" + self.db_user = os.getenv('DB_USER', 'pacs_db') + self.db_password = os.getenv('DB_PASSWORD', 'pacs_db') + self.db_host = os.getenv('DB_HOST', 'testipksdb.c7q7defafeea.ap-south-1.rds.amazonaws.com') + self.db_port = int(os.getenv('DB_PORT', '1521')) + self.db_service_name = os.getenv('DB_SERVICE_NAME', 'IPKSDB') + self.db_pool_min = int(os.getenv('DB_POOL_MIN', '2')) + self.db_pool_max = int(os.getenv('DB_POOL_MAX', '10')) + + def _load_sftp_config(self): + """Load SFTP configuration.""" + self.sftp_host = os.getenv('SFTP_HOST', '43.225.3.224') + self.sftp_port = int(os.getenv('SFTP_PORT', '4650')) + self.sftp_username = os.getenv('SFTP_USERNAME', 'ipkssftp') + self.sftp_password = os.getenv('SFTP_PASSWORD', 'CqBx@j6G9vqNW#1') + self.sftp_base_path = os.getenv('SFTP_BASE_PATH', '/home/ipks/IPKS_FILES/REPORTS') + + def _load_processing_config(self): + """Load processing configuration.""" + self.poll_interval_minutes = int(os.getenv('POLL_INTERVAL_MINUTES', '30')) + self.batch_size = int(os.getenv('BATCH_SIZE', '100')) + self.bank_codes = self._parse_bank_codes() + self.log_level = os.getenv('LOG_LEVEL', 'INFO') + + def _parse_bank_codes(self): + """Parse bank codes from comma-separated environment variable.""" + codes_str = os.getenv('BANK_CODES', '0001,0002,0003,0004,0005,0006,0007,0009,0012,0013,0014,0015,0016,0017,0018,0020,0021') + return [code.strip() for code in codes_str.split(',') if code.strip()] + + def get_db_connection_string(self): + """Generate Oracle connection string.""" + return f"{self.db_user}/{self.db_password}@{self.db_host}:{self.db_port}/{self.db_service_name}" + + def validate(self): + """Validate critical configuration.""" + if not self.db_user or not self.db_password: + raise ValueError("Database credentials not configured") + if not self.sftp_username: + logger.warning("SFTP username not configured") + if not self.bank_codes: + raise ValueError("No bank codes configured") + logger.info(f"Configuration validated. Bank codes: {', '.join(self.bank_codes)}") + + +# Global config instance +config = None + + +def get_config(): + """Get or create global config instance.""" + global config + if config is None: + config = Config() + return config + + +if __name__ == '__main__': + cfg = get_config() + cfg.validate() + print(f"Bank Codes: {cfg.bank_codes}") + print(f"SFTP Host: {cfg.sftp_host}:{cfg.sftp_port}") + print(f"Database: {cfg.db_host}:{cfg.db_port}/{cfg.db_service_name}") + print(f"Poll Interval: {cfg.poll_interval_minutes} minutes") diff --git a/db/__init__.py b/db/__init__.py new file mode 100644 index 0000000..701b306 --- /dev/null +++ b/db/__init__.py @@ -0,0 +1,6 @@ +"""Database module for ACH file processing.""" + +from .oracle_connector import OracleConnector +from .repository import Repository + +__all__ = ['OracleConnector', 'Repository'] diff --git a/db/__pycache__/__init__.cpython-39.pyc b/db/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..37a735fd6a1a6143408319eaae8a831d9b389204 GIT binary patch literal 332 zcmYjMu};G<5RIKQK~XFCgpAEni-idxP#Ks&g3ZgxiJjKSu_HUEDu0B8_@Jyz`~p+Y zMF>v1clS>BPVdBGQ6?Dt{9S)w{fy*)P$buwVTm9Uo=768AwA`k#6C4?pYaU)OwKlx z&t(3T)Wz%aR*a$%K~=qzFGf|J^VQAzuIek4yD1!D^b=;du|0j%^C=kW~yd}Pfha!Amu&wJA xYjoJ|#IsT(v#J|G+nw+dqECnqNG2NwIMyS;YS>RFmp?CEMdu+$$mpER=>Y~+S@8e> literal 0 HcmV?d00001 diff --git a/db/__pycache__/models.cpython-39.pyc b/db/__pycache__/models.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..2a2bd16777c0f98c1a78c8070765e6b8905a2ffd GIT binary patch literal 2501 zcma)8OK;mo5GE;#q+Yfh$9c3x%eHA#HHh3pk3kScqEn@@97syg^0HWSS2kTf)LlBM zjoy-dcBtVZPz)V|Fz&`z>j!RX5jX- z!5rqkG?@FsdSQlj-QuwtwA6GjsQ*$~L#uUa4!Pu9vz|>Sn21rCuxbda2u`-YE5@Qg4>}@*RV1 zu`4f)&|_C2+}Bil6zoig*>ft`ScmVlw-Q?F[{`8dj73|kGYpVIjpx{KT&-1nnA zIZh$m$|-)z;*2TxNMs3#nQ{c5kt|mpbPyeG7vE@Fs0JjFaY5plj8qF!81rXKbFJ*7 zQB>gc_8FU|T&)Y9#zg@W7%ym?pog*{&StrG_)L(c0~j8?F>$^l3tAjY)e!uM!y*xs z9Zu4Q;8f=1JA67OO9wE#|Hj1m&Sct~KAD7#aquvJMfA48hc?DRZ?ZQETlWK~OVS$*#=&F~ zdb^VcWcvN%ptC7z5O(oC0y3Ncax+8|p@p!9u#V71*g&|1u!(@V6I%#Z5UwIzL%5Fc z9>8l;*^(!-*M|5I%zwY$KFm|Ty&ucN=k$p4VjBvkAhAAH1i6bKL9uPNzr7%~?)*f# zMNTktpMeYc1%Oep%?tegKE3wtq3xEgMxC1S2+5$d2~l-IV6|gdov0=uKOWPh>~RTU zc?8WBSqQaG2+gt_P6DzeA;N{d2nWGO!2h@cAED_1U@YaA0Cmf@Jah4DmU?NQ(UzWf zK9$bE*#)W80r~_gWlrd7ngt4FUg+@}w>7l_g@RoNyP<8MQr2KipuQ&a-~_c)>+#|~ z;cN#o^e*o$`xaNv;%d>Si$;f}yoor|$(24_Knd?E4^xdHRvuD=CX<>@|C|(JtV9D^y)9D^wUZa9}?<~IuW7w=cN znqIk@J8qxP9e6?N%XEc7T!GaoItaLyxP_n($8CJ;0(5E@7$mANQdP42x%dDlzAY?W zQ$Dl}K_s5avxlMi&*|2gbS*2s(-I%UVDSmUrwBI?@Jxx%0aPWou*z*H85)Wvw}-uL64{);J+344W(SR_@K~-3a-bK rIef${ju~cAds~TyjthoFd3P2k_%3?Yaj~ngzS%UJ70YaYWtsm0S2=45 literal 0 HcmV?d00001 diff --git a/db/__pycache__/oracle_connector.cpython-39.pyc b/db/__pycache__/oracle_connector.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..b6caff02938478dd52f4a910442b04a616ca685e GIT binary patch literal 3597 zcma)9-;dkI5#A*!isIer&ViHKiF0AnG_?>{XD8{)p$Uv!&uNNA0pe@j3MdF3cXi^0 zB9$d&OT?WY>bwT%durg^w*YT+xnw;jW*c#NjHO{0d?I*lj z|A>j2u)k!&eqwYTVFV3P7tWWg>k8vOYc+~*-A`2z#?li(9vlXl^ujbrWSB>3;tkU@ z_69)`97*MkvM4$7Qq3(6+wLcNK3m|=yhl;q*HXk9B*N>LmILZTCCajdfbQbdO~2$LSGHG#lZ-z33<;j>X5o;4a?mS0IwX z)~ss?-Zh0GOjytomZ+hHeMKFuEgaE6TNkcaL+gljv4OTBnqm{JE4EiC7xKeY&+yoY4%W`blt<7y#ONk3G-RbpD?u07+*DtS3XXnJPP8dxZvS- zr)8EF`C1XqO1m=1H@vhm8TYzI}DIFiYQSSR!Nl^=g88$Djt(c=dV_P~TDMlfp5BYC&iu{Bf zvt$0XJ}!C&dWN1e(KB_=ny=QrW=|WZJhzYQCom8A9ZehL#IGzE%FWl#)`eZIL9Sk{ zL(Y1}IeWl_bDvoZJ8Vq;&e^HKS@A}`cH9(=6MXxr0r$IKVfF$00~2ef#$!XQ-=Fqw zljP=~+2mhKk{kbzy&spYz@C2nsC=2+fkx97IHdakmjA zq0%8$+?+ye>X>qGHIC31q z#l5t1uXyvfi5w4+Mj$DbLNLySDK!w-cD>e?vY~NV7l%F~SCwn%41+9tlq#WT`)QWz z&M;MZxrt@4J&Z!>C&56LPK7gZ*j>X3VH=+|uC_tPCllyB^&l9l8hy^L!oREuc{n;M zH-0(IeEXzlS<~v4js0;bhm-|drg9+Gy>f0eKg$)4QF5t6qap;)a&vynN|)=H%-SH# zFs!TG{@$9s-y3y{8{Edv_>WWHM!UtY@|QF%UR#Zdm3@CWIR1)n5p=WW2|MG@7-0G` zjn9l{3;|%CS%MRwPxu3-0bM(`Ct?mu^SF*MssL>Yivazn3eq)#bn>Cltp>C+`OA`= z{XgW~nH=C;1Dxx{^^at}h@+lL2O1dd;?=$LJ17XJ*?qoVY@b7Ny1KB>u6Lj)3wDa1 zUcg|*O#nRiC1BXlxKM9Es;p0UR5vhEuM#02s%;{#5%EB#m?$mUN4-wdlp_~_(4>3j z{3}1lYIY5Ta-L%t+{JI^@h1FUY_EEJd4nZ~Q@WpL!{1{-4ps-nHSK8a^xQZzD@Q+I z$$Q8?*5ord3yv>tV66rxBX>SEs@#T*WuEfKJaMb`HiMf?xXCJB+KbbRRY4o`=RVSP zLP&g~xT?d*qtSV!cUpE8do|6ajayojE3`SaVOxfdvrDb%YB3$%`#5g44nmeBJ2vqv zUS74{xxkX;C;`kZ_ZWlXLxc&V&^X}~E~fzh87@s7BCsK5Ow3pmNZ`Fv=(G;)VPBzuDhA z@a}l`KKt|&PmF@N`0*@XEr8g|X_IUehBC`~qxhhB{Rg>vI-l=)oqGWOPhbLE_#>62 zs#}+18IE$#NwBbA`H!Auc@BCkd?u0ki@Z)#cE}C|*RsENe54a@1a2-2`*X z(gv+A(R?zLcmG=lG zn_sT`{ujX2bfxb5A`N|CtzlQ45;aD&j_PfiyG4Yqv;~E9@}sy?WQ8kuvn>#ZTZU6} zZa7;`!`XB!y&7U+>6sgi(smAWjhJdu`5IuPO?Q*73i>xt6+lOEdKknr`0Lt&{t$!` z)--UoY1&Pd{@!J3mKzJ%HC4y>a7LvFGaVpMy@y_FO}&FgXaBNEB?HnKZH@m5id5FM si>{c}4h{8%LC06`5}`dV{j~Fo%Jd)5CRr22avZ+S*A2&P{?0Q03xzeEl>h($ literal 0 HcmV?d00001 diff --git a/db/__pycache__/repository.cpython-39.pyc b/db/__pycache__/repository.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bb58334a8f3d63f6573a70de7d469356b498ffe1 GIT binary patch literal 9521 zcmcIqU5p#ob)G+x!&xr9`iWzyNi(+N)J)bZIcOV~Rt>Lqx%MU!*B~h=BH9jSd1rSi zk;C(M^dS#}q7NxrBtRbuv|M`j}{WG0pKPpb%#Mirt zhDmLS$xP|VeWk7R)wU|rx!TkExpof68q4+a{X)ASj`KaeUu+lqM%$1jC+93ZlG~-X zBvxShV~OdHl=dOY92lFnZ<}_f<9MFgvyU8Z?hUwEuWhx=u6xhs%-rkt9P^M5 zNTloTuNiOKF6%j-xzV_>X$}q@Zu{MV>zTN0`rLLsyF(YuzU|ukPTz6;HRGpPg;Y^O z?ONCKLw)CvBpC5Oz%got?AaFPG@Q&wezVyos;(5RD_XW$d8BLlT z!3s}Qrn4fhEIB2d8<1FHC7ds_XPvTB#JC)_tc;##*vjKv`x(fcc_Op3>>T7)*m?Fj z%yNb;W0yZ48V%=g;B~Q3%J_;G z=DFh!x!VzAXdFH|z7!?Vg0cLO?49+cV=VYX8J?~*vHUyAvHGF(0QdEQ8vI@0j+A2! z=PJd-V~p689?1_hjLMBPF)IHcH&T!DBRW@P3G#&xC0X+IkqpT^NwUHy|6N4N_f#4& z%HNW1Nh6j2A;uL+7GuQ!rld)}ar7fudVt6a)KKua-{uVC$+oJjnoe${)@@$m!SbSycGPcVaA`U!`@xQtLJb( zirwtqqQf7=(QJu3X&-heTgDCaJni}Fh zPAIDn`@4V^D2R-7n8^qCymZ3gLVa*zMpIZ!>*3_~^%H?WK0*=R#MdKF+lL?ge(8Zc zl8>birSB;3YR$-%;n@X$V6kq;hsP;vm)oj;-xXdSHOm+s8drK)UT4rhd}9^o|ADU* zd?VwvyVzbhHko5(_hs!$Z_u%O-WxbbG}yOt@Y>|YB>%VT&03>nZr58ov+jiX^3(_# z+4^>UyS}+Pv+tXY`Za5NPUqG2`nNWAHfytoHfmc|eP?zbrnENKTXRNI@A~!D?4iwC zeY4iEwzrxavjrNpjqTdag(9(z^^J|zToX^otY4pFYNK|2yVb&Eo3+;Z_O*Ey#X{Tl ztvMS4qi;5>g$89s=GxP2t+#G8=b7=%8Wz0hjtPNAZN0fupRMC+4G&3ceRH!>YtEVG zdh;!-b^D##ls{F>o9owZ)S7eLu)4?(s|y{my4Vw|i(HYWUEqtNd#*E9PxVI3eX2Vq zS{B~^lR|SH0P{VCM~V-G~u>;W)L=N;V_={i*G)Td@ zHqwqk<45ucl<$cmD4a%=k7*YqP`ez{?rMUtX}k{EoFHs!^j{FR{L%M8*zzE3ula>L zI?EB+14+xX0+BR9(Le+rg7%Fx{wI?F#wqCT-?I2AAx6)vROp ziG2A7PBaVHRfnccjUcv>+?}d<6{?XkFo$SZB$!Ihe%h zG|Q^F1=a+UU=Y98^_*0DJ@~8im~5g?vvv(nM{{H6M!og&S1VKfrn%MFxo&22lsJV# zY|ti*d1j;#?X9oSb@OxKI@xYV2;1iaryBQ3>XfRI=6`e$0PBpW#a1fMB#86Z#+ zAfW$zB?bfP3%-y6gDK_U-sN;AF>!DO`dM8p}_i zgUbJWN@HOH9aR1glQKXDmH%Y&YK#tg@b9TxMNc~<^HlRdP$3!@csT`Y-{yCt5C@K= zTfPt`QYmp`s^&Y{%1WHyMMVS(5wq|3Ha&|@$@mgP@P?wcv^VT62zvWo77&vl&E`yt zmOVRnO5%AnjoekSG1j5&AFO7R&FJ|#n^k+h?GL?GQ#_%qK1yIHX|0ZT*0TNdZY6e~ z#-_)Xr9iS8;ZJN^HIrj5GOn6fSDi4gI=-i~>^os`aPc~Ph8{0;ce7pta0;G};EUXf z&})WFj9<$W_{Gm4gn5OUFH$2A>(}T=K-Cz^o~LVBD0_u2q$t)d(wu_*@Gn!(EI2zy z_DulVA4AxyqLGXk(7hb6fb9be^HhbBALJh9L|yCq(|V%bv^MoOlM({L{%P`R3}Hs_m#K@r z<@lx`0H7bKR|-F(2d}pfnUcqB)lBNeX)Kcl-{Gf#(6zW~MJ^3>W@5=}E@eib4@x+5}@tac&6AaQf2fjWzM_4ar}0Ym0yt+_GNEa9WU z@yx#e&t?6RW`%gdM2nTnM=J= z0EjoMrWd>r!60hXyz9I-&Evd;$W8G$0f8}xvwOXn94;SmH~}Ro@Pz{SgxAi-ycxEC zjP-eMpc!Kx<#kFv|5+ZHp7Ot%j;t;($lQNXR(D>GNSyxja+b_xac{;n*mr?=L~xT2 z)nik{1I&yOPX?AK!HB!n&!b^*kiyk6D8;##0G8$FX~Wcj}?%cc8)sqRNn(uBCYHR z;6;LoiIVC;27wo*c0#EzF-}aFIp%h~qXq0P_|^udCTpH@;bdrH-pR$Ut+mfgDG9!K zwf5F_eOmkLCMwde2d_?z$n5vb#k7L2MFCGcdy3&zGiE?blME=OJfTi{Xal3{ZOn=U z4I=XMkrrV<8vZk5{#nv9eua~`&%Z!HM0NBecQaJV)k?|=NDr>X`I|N~e-p)==QF4{ zdzKUUd?MWUro#OLIe0DjBnkKcVL}9lg4RbxCJp#2BH$0>F#R(6@Ip2UQz47m%p>`I zIj+bc6!Fq54arz*KGj-?K*5c){`Cp1`U$PY`C5g2My#m~`}CuqM1b@@NR#_ES^R-~ zv=(b$^R+t#wiH84i7gYf_&O^`SLV+{p!0#u-<*UF0Vy*7_T=>#Ql1IM#f+N{@9rH% zl!iO+4!ay6K&ctMoHg-}UeY`F!GXr0v5g{7)Ik`~?}}IY$c7x=@4B{+VJH#7Q}?;u zACIaBzcW#UiApZ<8{ss*P%(|kOk?L({l;}rnOBFz+JF4yPZ81dbAqw=lCZEPve6q- zSp4|OpDhq{2UJKJx~ysjx3|_?>({Car29i4E5ReGX+E|FMcE}>#eQbtKX@(zOwoBHc2&4p!i$2JV@oH+Cj7N(R*Xp;6>yn?X#}B! znW51<@=$B9-S7Glulh|~ogjbVxyU3da(hd}1Ck@jdlSM^jsOz?2)Hx=j%9g8R=|@k z<|fXyvQmQF3K-K@6avh+2A*X~GbXCY^~zcLRRr-k%L)z4>JQiu39S{DEbCqPW871) zEH>y^7BABpD3|0^5erpReK>v~gd@+#YlhvypvTF(IQ=?;uTi5?^L1)|i<)mxQ##qu zdv0DtLut83>cv;ers%C#%KUY7hI#+!5O*;$g!G4HdL4pG^uwIJ z?mnj{K)5pCHV6h?i+-#lep=GbW06kJ6Dx@9d=1i34S bool: + """ + Validate if account number exists in dep_account table. + + Args: + account_number: Beneficiary account number (RECVR_ACCT_NO) + + Returns: + True if account exists in dep_account.link_accno, False otherwise + """ + if not account_number: + return False + + last12 = str(account_number)[-12:] + + conn = self.connector.get_connection() + try: + cursor = conn.cursor() + cursor.execute( + "SELECT COUNT(*) FROM dep_account WHERE link_accno = :accno", + {'accno': last12} + ) + count = cursor.fetchone()[0] + return count > 0 + except Exception as e: + logger.warning(f"Error validating account {account_number}: {e}") + return False + finally: + cursor.close() + conn.close() + + # --------------------------------------------------------- + # UPDATED: bulk_insert_transactions WITH VALIDATION + # --------------------------------------------------------- + def bulk_insert_transactions(self, transactions: List[RTGSInwardRecord]) -> tuple: + """ + Bulk insert NEFT transactions into inward_rtgs_api_log. + Records with invalid beneficiary account numbers are skipped. + + Args: + transactions: List of RTGSInwardRecord objects + + Returns: + (inserted_count, skipped_count) + """ + if not transactions: + logger.warning("No transactions to insert") + return 0, 0 + + valid_transactions = [] + skipped_count = 0 + + + for txn in transactions: + acct = txn.recvr_acct_no + + if self.validate_account_exists(acct): + valid_transactions.append(txn) + else: + skipped_count += 1 + + if not valid_transactions: + logger.debug(f"All {skipped_count} transactions skipped (invalid beneficiary accounts)") + return 0, skipped_count + + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + + batch_data = [txn.to_dict() for txn in valid_transactions] + logger.info(batch_data) + + insert_sql = """ + INSERT INTO inward_rtgs_api_log ( + TXNIND, + JRNL_ID, + BANKCODE, + REF_NO, + TXN_DATE, + AMOUNT, + SENDER_IFSC, + RECIEVER_IFSC, + REMMITER_ACCT, + REMMITER_NAME, + REMMITER_ADDRS, + REMITTER_INFO, + BENF_ACCT_NO, + BENF_NAME, + STATUS, + REJECT_CODE, + BENF_ADDRS, + MSG_TYP, + CREDITOR_AMT + ) VALUES ( + :TXNIND, + :JRNL_ID, + :BANKCODE, + :REF_NO, + :TXN_DATE, + :AMOUNT, + :SENDER_IFSC, + :RECIEVER_IFSC, + :REMMITER_ACCT, + :REMMITER_NAME, + :REMMITER_ADDRS, + :REMITTER_INFO, + :BENF_ACCT_NO, + :BENF_NAME, + :STATUS, + :REJECT_CODE, + :BENF_ADDRS, + :MSG_TYP + :CREDITOR_AMT + ) + """ + + cursor.executemany(insert_sql, batch_data) + conn.commit() + + inserted_count = len(valid_transactions) + logger.info(f"Inserted {inserted_count} NEFT transactions into inward_rtgs_api_log") + return inserted_count, skipped_count + + except Exception as e: + if conn: + conn.rollback() + logger.error(f"Error inserting NEFT transactions: {e}", exc_info=True) + raise + finally: + if cursor: + cursor.close() + conn.close() + + + + def is_file_processed(self, filename: str, bankcode: str) -> bool: + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + cursor.execute( + """ + SELECT COUNT(*) + FROM neft_processed_files + WHERE filename = :filename + AND bankcode = :bankcode + """, + {'filename': filename, 'bankcode': bankcode} + ) + count = cursor.fetchone()[0] + return count > 0 + except Exception as e: + logger.error(f"Error checking processed file: {e}", exc_info=True) + return False + finally: + if cursor: + cursor.close() + conn.close() + + def mark_file_processed(self, processed_file: ProcessedFile) -> bool: + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + + file_data = processed_file.to_dict() + insert_sql = """ + INSERT INTO neft_processed_files ( + filename, bankcode, file_path, transaction_count, + status, error_message, processed_at + ) VALUES ( + :filename, :bankcode, :file_path, :transaction_count, + :status, :error_message, :processed_at + ) + """ + + cursor.execute(insert_sql, file_data) + conn.commit() + + logger.info(f"Marked file as processed: {processed_file.filename}") + return True + + except Exception as e: + if conn: + conn.rollback() + logger.error(f"Error marking file as processed: {e}", exc_info=True) + return False + finally: + if cursor: + cursor.close() + conn.close() + + def get_processed_files(self, bankcode: Optional[str] = None) -> List[str]: + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + + if bankcode: + cursor.execute( + """ + SELECT filename + FROM neft_processed_files + WHERE bankcode = :bankcode + ORDER BY processed_at DESC + """, + {'bankcode': bankcode} + ) + else: + cursor.execute( + """ + SELECT filename + FROM neft_processed_files + ORDER BY processed_at DESC + """ + ) + + filenames = [row[0] for row in cursor.fetchall()] + return filenames + + except Exception as e: + logger.error(f"Error retrieving processed files: {e}", exc_info=True) + return [] + finally: + if cursor: + cursor.close() + conn.close() + + def call_rtgs_api_txn_post(self) -> bool: + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + logger.info("Calling rtgs_api_txn_post procedure to process all inserted transactions...") + + try: + cursor.callproc('rtgs_api_txn_post') + except Exception: + cursor.execute("BEGIN rtgs_api_txn_post; END;") + + conn.commit() + logger.info("rtgs_api_txn_post procedure executed successfully") + return True + except Exception as e: + logger.error(f"Error calling rtgs_api_txn_post procedure: {e}", exc_info=True) + return False + finally: + if cursor: + cursor.close() + conn.close() + + def verify_tables_exist(self): + conn = self.connector.get_connection() + cursor = None + try: + cursor = conn.cursor() + + try: + cursor.execute("SELECT COUNT(*) FROM inward_rtgs_api_log WHERE ROWNUM = 1") + logger.info("✓ inward_rtgs_api_log table exists") + except Exception as e: + logger.error(f"✗ inward_rtgs_api_log table not found: {e}") + raise SystemExit( + "FATAL: inward_rtgs_api_log table must be created manually before running this application" + ) + + try: + cursor.execute("SELECT COUNT(*) FROM neft_processed_files WHERE ROWNUM = 1") + logger.info("✓ neft_processed_files table exists") + except Exception as e: + logger.error(f"✗ neft_processed_files table not found: {e}") + raise SystemExit( + "FATAL: neft_processed_files table must be created manually before running this application" + ) + + logger.info("Database tables verified successfully") + + except SystemExit: + raise + except Exception as e: + logger.error(f"Error verifying tables: {e}", exc_info=True) + raise SystemExit(f"FATAL: Error verifying database tables: {e}") + finally: + if cursor: + cursor.close() + conn.close() \ No newline at end of file diff --git a/logging_config.py b/logging_config.py new file mode 100644 index 0000000..1b22df3 --- /dev/null +++ b/logging_config.py @@ -0,0 +1,51 @@ +import logging +import logging.handlers +import os +from pathlib import Path + +def setup_logging(log_level=logging.INFO, log_dir="logs"): + """ + Configure logging with both console and file handlers. + + Args: + log_level: logging level (default: logging.INFO) + log_dir: directory to store log files + """ + # Create logs directory if it doesn't exist + Path(log_dir).mkdir(exist_ok=True) + + # Get root logger + logger = logging.getLogger() + logger.setLevel(log_level) + + # Clear existing handlers + logger.handlers.clear() + + # Create formatter + formatter = logging.Formatter( + fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(log_level) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + # File handler (rotating) + log_file = os.path.join(log_dir, 'app.log') + file_handler = logging.handlers.RotatingFileHandler( + log_file, + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5 + ) + file_handler.setLevel(log_level) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger + +def get_logger(name): + """Get a logger instance for a specific module.""" + return logging.getLogger(name) diff --git a/processors/__init__.py b/processors/__init__.py new file mode 100644 index 0000000..256bfa1 --- /dev/null +++ b/processors/__init__.py @@ -0,0 +1,6 @@ +"""Processors module for ACH file processing.""" + +from .data_mapper import RTGSDataMapper +from .file_processor import FileProcessor + +__all__ = ['RTGSDataMapper', 'FileProcessor'] diff --git a/processors/__pycache__/__init__.cpython-39.pyc b/processors/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1d8ef1e42c385669eafe641356d9143a021420f8 GIT binary patch literal 343 zcmYjMu};G<5Vf5&MbPvcG7^SH?5F|(6$=#-OkPfI?X*UA9N9q?{1Flxf0LDoUtr># zqKcFJe9!mpoloTXF~Q*HANmvTuSos}Lvo2HoFYiViKLPlY8huV=7~vc%2Ui!IodEj zlG!6E$8Tp3-mO&#&WFM}`C?R2J6~K~-xak+((cQ;+04EvT9>q+E^il4H=+~wqHUGO zmwbWJgOQJa@d?7R%=$5abZY>9BC!SR&@)K*G){oO3C<5KQ7?=EcqcxKJ`sNA)z(>6 zt#sJ##IsUe)u_4y-Ry*yFxad_P_jChg$l5&JAhgH+8TSyt@WaU3>@yV^pQ1*dfzHBDn%cI(zw49kgPq6I;5X4kYy z?s~|zt;NbHq8x$%DSRplBmpF#DB7avrKbY@58A(ghhBOLP~_H24n@B=$-WzXfs;1%j+m$~Ce>tgX|D??LV<7Vu{>cUst~nZKTn|~q9LDI|2=&Nt z4E1e>W>j)Yk>yxX*(oboDXc_Qr>efKuog`?6HF6TF}cm0sUK;)%qt&gyt1u3bzWW5 znzjBPYAasq*^$@n3Td}G(q6rC^|rls_4ZA>B?Mor-SE1J-SuQ5cwUxDFHSr^4LWgR z&-1`flct?^RN=Mwktg{r;ddlg);O$hc!{usI1!TSEY{B2s~uS6Jc!$Nr)7J--|5Av z9rvOQAz|~r=%%)poDGr%JC#8c3Eo_+eU3TNnk=iprbvTG;Jdsc{2=nerjeN|=t*W= zj5jm$dXS_jt+hq!hMjg>NaSW{WaAz5$>8`?Lgp>}liwf_n!~UJI%kd{Oi>agu79d? zgPR|im`)3_GGrylEM8W!N}*SQtjcRjR#kd6r8mJRm25&xiYZtuxAYSl-gQ3psln@f zT9rjs>(#SgtOC^nd?lq)XVee2AfGtn82H^=)< z|M(SZ%J%co(H2trjW3PHvc1}z$c#|LSqW>S!{nK%q;! zNPV@`SM|zcA6pNbG_&$;la%Xuc>5JpF0Ntt=5sRQ@If2^yfNPRD{1tFN#7F-sfi`8^x||(oTP%Jx z1yr|L_Y`M4#xB4((B)#v27qpCrC{N0MzH0^eNA5CW~}p40bX^HJ^r zsB!sLLoux{1rJht{>>lQOU(tlu~Nvn*dXy6kd7h!qE9QG*ad&uFi(hy+lvsjyk3Yh zaD}qoDd4@=Xr^UsNfI?U`pT_LzkI8IrhsKJ2Eyg-_g1g1uBguCy|x>23UyYS<*XJY zI1y4W_Jt%`$qY0vr|~7{D0z;OV@R5&d=anA%1vZO`Z$#Y5(U3Jd4UvW{NePu>Ejr) zwg71|i$tsFti~*ThRw0r97``9#L|Jfz>|gErM={9!1NK!_pcAMWO5w`^cpdt2KW%e zff2WKFrfZWOO}uaCjvgYyZ}uTnz^I|$&VqiNK!z`*!-Y?lufM3b1><|gad6|Q~al3 z^emjbGgy@oqE?A*!k{m(xHK5M6^IaL`bgodeXT`Ql6qJTyHV^>!5j9xk6n9~PVG1e z(%_+J+GpRi8#nV^42}IZ#Dks}!otm4Z?941P80z)G|Mp0QMq{j9~%Vva$onmPH3|L_R-Ylkg>e$(@O*al9g|QDGQ2QGh6a&gS{ktJ zA!4n~4-3S8D=qI-xT#36v`2!VP}C~0YY`a&yXvi|5a@4~`;SM9bGbOX3*G<|4T;K& z`EZTBjieLyK$kc>dt#5B-T@2>?m%x%%r{2Xb2->loh{%i6z)$vSC@g%*7UWAb zEyY3?=L;?L0s5-SZ$MA+mV6msSy_s1=z+?Jz%m0Jdv@^^iy7#ue3dF6Mq%~gk(Rf& z4!xYften7}$LiN|0z0_w`)k4-uH`N}abZ^3@Z$Tl4RbOhz8yYwd=Sm5lkrn<$5YmS z57eY054dUuxpo6%mu+E5cXaiIg-1CY;a$CZo|{`Z@pg=X1)Z`gDebcEPq?X|Q&AF> zb$?b!s?g3Yl*s~W)s#*JI=?7%CQ>+mgIuc$`LA=i(yJl=yCPp6m`XMQ*}LU7dY<1#HrouAj0uN zs$}$1=}ZKz#CLJxVW&7Xm65`KD6^SdR@n65G*6G3&8<085MYvogag{@WD`-+lOk>8=|ynuuh)zTK-rg{%YQ%R%T%VOASGUa!v z_V*}xolI1Qeec`(2}x^9tf@}_u~lETZ&Ua#9{XyJR-3iVK!h&Olc_V5oT5ZIfoJK9 zVxxmZTF(qWOyp}+^leIrS0x>l^7}}f$>Gen(tG3_8Ra;SqD?%`aGnS*PHk_1C{U*^ zrdLMt3T)AA(?!OiTs@6Zx?_V2Q6agCM5~(mBxt&3%-}!EW)1vwJWR)dJ%e=krd*!a z;g_o4zpUfApV~it;9x({92oxSaF_1wt0bQi75|L0d>P#$pto_{subol1=zle2+Ee3ff}sq`)DiS?k8zlo_H zRG?Sl1~?o?J_31I9>zig9Q(~u|BKPizM)tw#C;9j1mz=o1~@7dq4^lUy|IVa7JJ_8 zR}(9s z5Ar=Rh8Uao;FX{T#q)3?`(;c<7gqYG4w(VEnj>Pcn`QY^tRMm+D93zne`ICcy&>>v zXC_gQq@x|m*YJ|pDWUB`t_t-np+G8j4CD@l_sjvjCBH@G+!?8wa6@$!mDkxkJIVS7 zqp5>+SDUk0#dTvai0fuG*Nr;7hl>F>n{-_Y9Ey^%>++6IV%my3 zhg*o(b*A)Io73`Ls-$jE$~?b&%;Q%J?2WmXs&Um%YB#IXJqqMF2;x@Imb4R{D&JVd t6bSH`qPR7(sUlwL%~%grhgEc2c_W{*H>v4_me#cBW@HXrGpSdM{{qh``6B=T literal 0 HcmV?d00001 diff --git a/processors/__pycache__/file_processor.cpython-39.pyc b/processors/__pycache__/file_processor.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..4ac7a7662d50ffe687562c4a6a3cab43a63eeeae GIT binary patch literal 5218 zcmbVQO^h5z74GW4>FN2|*|ohtNq`na!4qx92|qzB#>Cn6CKk!E@)}!=fL8BR&2F!I zx+m2&v8UA&q1YTP`G7bC2kjm~9CF}9Eijn$qo|lRnPRyY@*0=x4OEz z>eZ|FURAyKz4mG~TY;dR-Z^Bk;GSTyL!;Ya5P5Xn>3cke{&lD4_4~5o0SD>{Z{LfVv@dcCKt+cdzc;ynJoT-36|))xO61QS6I|4}qq( zn~TioHYk8B_O`F@ZU#OJ#m5BLLsK33Y{C3*gFuXTaMsz&fRESUiElu`6j%!d*P*S^ z2Gf~AwTCLzsd3L}o2(>ni<%F0TB6oHyvRF$b!q3t ztcC)PrHy#<)rsJ-gW@^4n$pPejOAN9O*ON5p>kGSd_4+LXla5_4>Gl zhrwSg<4a~^q2+Rd>D@Sjlvh<^=RwDVbo(6_Wk(qws*vsxDc|gaQ*PM$Ek}P zUEHL*s-HSoI_GxRkT~`7%&cX$dK)ks4E;KT@bQGTt*nJAc2dybL~S z4<)J{*HaDntMNZXh1#ix-0zb2iC9SWyJTdh=5gah`LQ-CjjT~QEj=Ld2XdZ@lv+w6 z7E!BI!_!CW_l=P)bzMsBTu#tweX7#~!f$-8Ru@sL2W0q*r`1cGIX;_~(9WZZl;m7m zIU%F!M9%axg&4DRUxihFd7>3q|BP5kjk|&@*=bykD+Hn7=tg*5;-n+RBbNfi+zkZ^b|Nb;H zhr!%jI=7!-E`9js)64~@(Sm5O>irv{X5pvDLCWzQ4C#On3Lr2A2Q#tZX2frGgXs1& zKD^R$w#LXH4=G(9^`y5i=J;C6k-^;=;2bzZ9_tH&ueThC+>VH*AOhe?&l3g3H(JhB z2`grair#PHcz(&~_p0P>2&A>+6Gm=z3?b9=%AFa~BH3lG%|C$XIKDENl+ zh%}RDF@`x_z?nyf6FNqv8i@+jE7E|_GG{~R_`(&=0Y`SwzBtERzq4~aS=;u2H$1|`4>vcj?CmuTeipv@IczNB;^g6Tab~dF zfw3Yp0gs45oUH5>rU_;hO3Gt!%4o8fi$1lA13=PZcRCR0yHRFtU%vX*l`a3T0707O zR094C%JM80&!Ipyh8A#pun^t>OTIMKw}(3t#l>yem1ck?Z?0x)6lc0b0QynnhkOC~ zm^moUbevtHm~7FHb9^$Ek2U`<6zX~tP^n>XZ2hh zXptMBZA*QmI1>Hk#S)589kY^*z5*@rM^Gqh4b>v6TAeHt{rf5)I-8s&b+Z1E4sG?4 zRj#WhX{ahG?#E`SO7s_1phDUPS^t}^RpBluh>A4oD$!SFsQ#s^OXQhZD&SdvWLgd2 z@q${_XK4T1nE7fwp7tcq;91?BWO1!or67-J;C_o;5xL_Ij(D|$S;OQ)NR5v~m>)nz zKpp|n9PR_ak`MKfnvyN$CqD%=rj7KGkt)Xo(OYVqXn@pIsu{|flA8Py)kDalzNKi& zMFrZ8ar+HQpj|K8%{2uOEOIbu>7j;b?VV+z9qZIWZfgqQjWX11S*wh3EatA&k`9)Y3Zej6V zEG|LO)TU=DgYzL>e{ybY6`ZjX^+067(5GME-+?hnWhO>6jZ6cb`M04XGw~ROl;b(z z&m;R%er`9g*FfRp>3kf|F#jGFm{QFgs@hm%d8p!FLTBtip;XNLynJGo;1rQnxUIb9 zf41q6JmQRHhum|HO9#FGxd@Fl|H3Z*>zzQ0qN==Py>`m0XO#PWHl#hsyjQ)1x1&*2UbWmr&LikCbj$a4hgQLUF5>_J1gQT8IDu literal 0 HcmV?d00001 diff --git a/processors/data_mapper.py b/processors/data_mapper.py new file mode 100644 index 0000000..38ea66d --- /dev/null +++ b/processors/data_mapper.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +""" +Data mapper for NEFT SFTP feed. +Maps parsed NEFT transactions (dicts) to RTGSInwardRecord for database insertion. +- No padding of account numbers (kept as-is, trimmed). +""" + +from datetime import datetime +from decimal import Decimal +from typing import Dict, Any, List + +from logging_config import get_logger +from db.models import RTGSInwardRecord + +logger = get_logger(__name__) + + +class RTGSDataMapper: + """Maps parsed NEFT transactions to RTGSInwardRecord objects.""" + + # ------------------------- + # Helpers + # ------------------------- + + @staticmethod + def convert_date(date_str: str) -> str: + """ + Convert NEFT date (YYYYMMDD) to DDMMYYYY. + + Input : '20260306' + Output: '06032026' + + On error, returns today's date in DDMMYYYY format. + """ + try: + if not date_str or len(date_str.strip()) != 8 or not date_str.isdigit(): + raise ValueError(f"Invalid NEFT date format: {date_str!r}") + dt = datetime.strptime(date_str, "%Y%m%d") + return dt.strftime("%d%m%Y") + except Exception as e: + logger.error(f"Error converting date '{date_str}': {e}") + return datetime.now().strftime("%d%m%Y") + + + + @staticmethod + def process_status(status: str) -> str: + + try: + if not status: + return '' + s = status.strip() + if s == 'PROS': + return 'PROCESSED' + if s == 'SUSP': + return 'SUSPENDED' + if s == 'FAIL': + return 'FAILED' + if s == 'RVRS': + return 'REVERSED' + return s + + @staticmethod + def convert_amount(amount_in: Any) -> Decimal: + """ + Convert amount to Decimal and return absolute value. + Use TXNIND to capture the sign semantics. + """ + try: + if isinstance(amount_in, Decimal): + val = amount_in + else: + txt = (str(amount_in) or '').replace(',', '').strip() + val = Decimal(txt) if txt else Decimal('0') + return abs(val) + except Exception as e: + logger.error(f"Error converting amount '{amount_in}': {e}") + return Decimal('0') + + # ------------------------- + # Mapping + # ------------------------- + + @classmethod + def map_transaction(cls, parsed_txn: Dict[str, Any], bankcode: str) -> NEFTInwardRecord: + """ + Map a single parsed NEFT transaction (dict) to NEFTInwardRecord. + + Args: + parsed_txn: Dict emitted by SFTPUtrParser + bankcode : Bank code for this transaction (mapped to NEFTInwardRecord.bank_code) + """ + try: + # Amount handling + amount_in = parsed_txn.get('amount', '0') + txn_amt = cls.convert_amount(amount_in) + txnind = "CR" + creditor_amt=parsed_txn.get('amount', '0') + + # Date handling + txn_date_raw = parsed_txn.get('tran_date', '') or '' + txn_date_ddmmyyyy = cls.convert_date(txn_date_raw) + + # Account numbers: NO padding, just trim + sender_acct = (parsed_txn.get('remitter_acct_no') or '').lstrip('/').strip() + recvr_acct = (parsed_txn.get('benef_acct_no') or '').strip() + + # Status normalization + status_norm = cls.process_status(parsed_txn.get('status', '')) + + # Receiver account name: best available proxy is beneficiary_details + recvr_acct_name = (parsed_txn.get('beneficiary_details') or '').strip() + + record = NEFTInwardRecord( + + bank_code=bankcode, + txnind=txnind, + jrnl_id=(parsed_txn.get('journal_no') or '').strip(), + ref_no=(parsed_txn.get('utr') or '').strip(), + txn_date=txn_date_ddmmyyyy, + txn_amt=txn_amt, + sender_ifsc=(parsed_txn.get('ifsc_sender') or '').strip(), + reciever_ifsc=(parsed_txn.get('ifsc_recvr') or '').strip(), + sender_acct_no=sender_acct, + sender_acct_name=(parsed_txn.get('sender_acct_name') or '').strip(), + remitter_detail=(parsed_txn.get('remitter_detail') or '').strip(), + remitter_info=(parsed_txn.get('remmiter_info') or '').strip(), + recvr_acct_no=recvr_acct, + recvr_acct_name=recvr_acct_name, + status=status_norm, + reject_code=(parsed_txn.get('reject_code') or '').strip(), + benef_address=(parsed_txn.get('benef_address') or '').strip(), + msg_type=(parsed_txn.get('sub_msg_type') or '').strip(), + creditor_amt=creditor_amt, + ) + + return record + + except Exception as e: + logger.error(f"Error mapping NEFT transaction: {e}", exc_info=True) + raise + + @classmethod + def map_transactions(cls, parsed_transactions: List[Dict[str, Any]], bankcode: str) -> List[NEFTInwardRecord]: + """ + Map a list of parsed NEFT transactions to RTGSInwardRecord objects. + + Args: + parsed_transactions: List of dicts from SFTPUtrParser + bankcode : Bank code to be applied to each record + """ + records: List[NEFTInwardRecord] = [] + for txn in parsed_transactions: + try: + rec = cls.map_transaction(txn, bankcode) + records.append(rec) + except Exception as e: + logger.warning(f"Skipping transaction due to error: {e}") + logger.info(f"Mapped {len(records)} NEFT transactions for bank {bankcode}") + return records diff --git a/processors/file_processor.py b/processors/file_processor.py new file mode 100644 index 0000000..f124b25 --- /dev/null +++ b/processors/file_processor.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +""" +Main file processor for end-to-end ACH file processing. +Orchestrates download, parsing, mapping, and database insertion. +""" + +import os +import tempfile +from pathlib import Path +from logging_config import get_logger +from rtgs_inward_parser import RTGS_INWARD_Parser +from db.repository import Repository +from db.models import ProcessedFile +from sftp.sftp_client import SFTPClient +from .data_mapper import RTGSDataMapper + +logger = get_logger(__name__) + + +class FileProcessor: + """Processes RTGS INWARD files end-to-end.""" + + def __init__(self, repository: Repository = None, sftp_client: SFTPClient = None): + """ + Initialize file processor. + + Args: + repository: Repository instance (optional) + sftp_client: SFTPClient instance (optional) + """ + self.repository = repository or Repository() + self.sftp_client = sftp_client or SFTPClient() + self.temp_dir = tempfile.gettempdir() + + def process_file( + self, + filename: str, + bankcode: str, + remote_path: str + ) -> bool: + """ + Process a single INWARD file end-to-end. + + Workflow: + 1. Download file from SFTP + 2. Parse using RTGS_INWARD_Parser + 3. Map to database format + 4. Insert to database + 5. Mark as processed + 6. Cleanup local file + + Args: + filename: Name of file to process + bankcode: Bank code for this file + remote_path: Full remote path on SFTP + + Returns: + True if successful, False otherwise + """ + local_path = os.path.join(self.temp_dir, filename) + + try: + logger.info(f"Starting processing: {filename} (bank: {bankcode})") + + # Step 1: Check if already processed for this bank + if self.repository.is_file_processed(filename, bankcode): + logger.info(f"File already processed for {bankcode}: {filename}") + return True + + # Step 2: Download file + if not self.sftp_client.download_file(remote_path, local_path): + raise Exception(f"Failed to download file: {remote_path}") + + # Step 3: Parse file + + + parser = RTGS_INWARD_Parser(local_path) + + + transactions, metadata, summary = parser.parse() + + if not transactions: + logger.warning(f"No transactions found in {filename}") + # Still mark as processed but with 0 transactions + processed_file = ProcessedFile( + filename=filename, + bankcode=bankcode, + file_path=remote_path, + transaction_count=0, + status='SUCCESS' + ) + self.repository.mark_file_processed(processed_file) + return True + + # Step 4: Map transactions + mapped_records = RTGSDataMapper.map_transactions(transactions, bankcode) + + # Step 5: Insert to database (with account validation) + inserted_count, skipped_count = self.repository.bulk_insert_transactions(mapped_records) + + # Step 6: Mark file as processed + processed_file = ProcessedFile( + filename=filename, + bankcode=bankcode, + file_path=remote_path, + transaction_count=inserted_count, + status='SUCCESS' + ) + self.repository.mark_file_processed(processed_file) + + logger.info(f"Successfully processed {filename}: {inserted_count} inserted, {skipped_count} skipped (non-ipks accounts)") + return True + + except Exception as e: + logger.error(f"Error processing {filename}: {e}", exc_info=True) + + # Mark file as failed + try: + processed_file = ProcessedFile( + filename=filename, + bankcode=bankcode, + file_path=remote_path, + transaction_count=0, + status='FAILED', + error_message=str(e)[:2000] + ) + self.repository.mark_file_processed(processed_file) + except Exception as mark_error: + logger.error(f"Failed to mark file as failed: {mark_error}") + + return False + + finally: + # Cleanup local file + try: + if os.path.exists(local_path): + os.remove(local_path) + logger.debug(f"Cleaned up local file: {local_path}") + except Exception as e: + logger.warning(f"Error cleaning up local file {local_path}: {e}") + + def process_files(self, files_to_process: list) -> dict: + """ + Process multiple files. + + Args: + files_to_process: List of (filename, bankcode, remote_path) tuples + + Returns: + Dictionary with processing statistics + """ + stats = { + 'total': len(files_to_process), + 'successful': 0, + 'failed': 0, + 'files': [] + } + + for filename, bankcode, remote_path in files_to_process: + success = self.process_file(filename, bankcode, remote_path) + stats['successful'] += 1 if success else 0 + stats['failed'] += 0 if success else 1 + stats['files'].append({ + 'filename': filename, + 'bankcode': bankcode, + 'success': success + }) + + logger.info(f"Processing complete: {stats['successful']}/{stats['total']} successful") + return stats + + def __enter__(self): + """Context manager entry.""" + if self.sftp_client and not self.sftp_client.sftp: + self.sftp_client.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + if self.sftp_client: + self.sftp_client.disconnect() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f55780d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,23 @@ +# Core dependencies +python-dotenv==1.0.0 + +# Database (modern Oracle driver - simpler than cx_Oracle) +oracledb==2.0.0 + +# SFTP +paramiko==3.4.0 +cryptography==41.0.7 + +# Scheduling +schedule==1.2.0 + +# Configuration +python-decouple==3.8 + +# Timezone support +pytz==2023.3 + +# Development dependencies +pytest==7.4.0 +black==23.7.0 +flake8==6.0.0 diff --git a/rtgs_inward.py b/rtgs_inward.py new file mode 100644 index 0000000..c282ee3 --- /dev/null +++ b/rtgs_inward.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +""" +Main application entry point. +Runs ACH file processing scheduler. +""" + +import logging +from logging_config import setup_logging, get_logger +from scheduler import Scheduler + +# Initialize logging +logging.getLogger("paramiko").setLevel(logging.WARNING) +logger = setup_logging(log_level=logging.INFO) +app_logger = get_logger(__name__) + + +def main(): + """Main application function.""" + app_logger.info("Application started") + + try: + # Run the scheduler + scheduler = Scheduler() + scheduler.run() + app_logger.info("Application completed successfully") + except KeyboardInterrupt: + app_logger.info("Application interrupted by user") + except Exception as e: + app_logger.error(f"An error occurred: {e}", exc_info=True) + raise + + +if __name__ == "__main__": + main() diff --git a/rtgs_inward_parser.py b/rtgs_inward_parser.py new file mode 100644 index 0000000..a2c1290 --- /dev/null +++ b/rtgs_inward_parser.py @@ -0,0 +1,344 @@ +#!/usr/bin/env python3 +""" +UTR Pipe-Delimited File Parser (SFTP feed) +- Robust parsing for files with '|' separator and inconsistent whitespace. +- Returns (transactions, file_metadata, summary_data) exactly like UIHParser style. +- TXN_DATE is left as-is from the file (no time concatenation or conversion). +""" + +import csv +import os +import re +from decimal import Decimal, InvalidOperation +from typing import Dict, List, Tuple, Optional + +from logging_config import get_logger + +logger = get_logger(__name__) + + +# ------------------------- +# Helpers & Normalization +# ------------------------- + +WS_COLLAPSE_RE = re.compile(r'[ \t\u00A0]+') + + +def normalize_text(s: Optional[str]) -> str: + """ + Normalize internal whitespace to single spaces, strip ends. + Keep None as ''. + """ + if s is None: + return '' + s = s.replace('\u00A0', ' ') + s = WS_COLLAPSE_RE.sub(' ', s) + return s.strip() + + +def to_decimal(value: str) -> Optional[Decimal]: + if value is None: + return None + v = value.replace(',', '').strip() + if v == '': + return None + try: + return Decimal(v) + except InvalidOperation: + logger.warning(f"Amount not a valid decimal: {value!r}") + return None + + +IFSC_RE = re.compile(r'^[A-Z]{4}0[A-Z0-9]{6}$', re.IGNORECASE) + + +def validate_ifsc(code: str) -> bool: + """ + Gentle IFSC validation: standard format is 11 chars (AAAA0XXXXXX). + Returns False if it doesn't match; NEVER rejects the record. + """ + if not code: + return False + return bool(IFSC_RE.match(code)) + + +# ------------------------- +# Parser +# ------------------------- + +class RTGS_INWARD_Parser: + """ + Parser for SFTP UTR pipe-delimited files. + Returns: (transactions, file_metadata, summary_data) + """ + + # Canonical order (maps to snake_case keys) as they appear in the file header + EXPECTED_HEADER = [ + "utr", "amount", "sender_acct_name", "remitter_detail", "remmiter_info", + "benef_address", "reject_code", "reject_reason", "journal_no", + "status", "sub_msg_type", "tran_date", "tran_time", "ifsc_sender", + "ifsc_recvr", "remitter_acct_no", "benef_acct_no", + "remitter_details", "beneficiary_details", + ] + + def __init__(self, file_path: str, encoding_priority: Optional[List[str]] = None): + self.file_path = file_path + self.encoding_priority = encoding_priority or ["utf-8-sig", "cp1252", "latin-1"] + self.transactions: List[Dict] = [] + self.file_metadata: Dict = {} + self.summary_data: Dict = {} + + def parse(self) -> Tuple[List[Dict], Dict, Dict]: + """ + Main parse method: returns (transactions, file_metadata, summary_data) + """ + try: + rows, header = self._read_rows_with_fallback() + header_map = self._prepare_header_map(header) + + + self.file_metadata = { + "source_file": os.path.basename(self.file_path), + "columns_detected": header, + "row_count": len(rows), + } + + for idx, raw in enumerate(rows, start=1): + rec = self._row_to_transaction(raw, header_map, row_num=idx) + if rec: + self.transactions.append(rec) + + self.summary_data = self._build_summary(self.transactions) + + logger.info( + f"Parsed {len(self.transactions)} rows from {self.file_path}" + ) + return self.transactions, self.file_metadata, self.summary_data + + except Exception as e: + logger.error(f"Error parsing SFTP UTR file: {e}", exc_info=True) + raise + + # ------------------------- + # Internals + # ------------------------- + + def _read_rows_with_fallback(self) -> Tuple[List[List[str]], List[str]]: + """ + Try multiple encodings. Return (rows, header) + """ + last_err = None + for enc in self.encoding_priority: + try: + with open(self.file_path, 'r', encoding=enc, errors='replace', newline='') as f: + reader = csv.reader(f, delimiter='|') + all_rows = list(reader) + if not all_rows: + raise ValueError("Empty file") + + header = [normalize_text(c) for c in all_rows[0]] + rows = [r for r in all_rows[1:]] + + logger.info(f"Read {len(rows)} data rows using encoding {enc}") + return rows, header + except Exception as e: + last_err = e + logger.warning(f"Failed to read with encoding={enc}: {e}") + continue + # If we fall through all encodings + raise last_err or RuntimeError("File read failed for all encodings") + + def _prepare_header_map(self, header: List[str]) -> Dict[int, str]: + """ + Map column index -> canonical snake_case key. + Unknown/extra headers become normalized snake_case as-is. + """ + def canon(name: str) -> str: + name = name.strip() + name = name.replace('/', '_').replace('-', '_').replace(' ', '_') + return name.lower() + + header_norm = [canon(h) for h in header] + + if len(header_norm) < len(self.EXPECTED_HEADER): + logger.warning( + f"Header has fewer columns ({len(header_norm)}) than expected ({len(self.EXPECTED_HEADER)}). " + f"Will pad rows defensively." + ) + + idx_to_key: Dict[int, str] = {} + for i, h in enumerate(header_norm): + idx_to_key[i] = h + + return idx_to_key + + def _row_to_transaction(self, row: List[str], header_map: Dict[int, str], row_num: int) -> Optional[Dict]: + """ + Convert raw CSV row to a normalized dict (no data-model mapping here). + """ + # Pad or trim to header length (defensive) + max_idx = max(header_map.keys()) if header_map else -1 + if len(row) - 1 < max_idx: + row = row + [''] * (max_idx + 1 - len(row)) + elif len(row) - 1 > max_idx: + logger.debug(f"Row {row_num} has extra fields; trimming to header size") + + # Build base dict with normalized text + raw = {header_map[i]: normalize_text(row[i] if i < len(row) else '') for i in range(max_idx + 1)} + + # Collect expected keys; leave as strings except amount where we coerce safely + txn: Dict[str, object] = {k: raw.get(k, '') for k in self.EXPECTED_HEADER} + + + txn['creditor_amt'] = raw.get('amount', '') + + # Amount normalization + amt = to_decimal(str(txn.get('amount', '') or '')) + txn['amount'] = amt if amt is not None else '' + + # IFSC checks (gentle logs only) + ifsc_sender = str(txn.get('ifsc_sender') or '') + ifsc_recvr = str(txn.get('ifsc_recvr') or '') + if ifsc_sender and not validate_ifsc(ifsc_sender): + logger.debug(f"Row {row_num} sender IFSC looks non-standard: {ifsc_sender}") + if ifsc_recvr and not validate_ifsc(ifsc_recvr): + logger.debug(f"Row {row_num} receiver IFSC looks non-standard: {ifsc_recvr}") + + # TXN_DATE: keep as-is from file; ignore time entirely + txn['tran_date'] = str(txn.get('tran_date') or '') + txn['tran_time'] = '' # explicitly blank to signal unused + + # Basic sanity: UTR presence + if not str(txn.get('utr') or '').strip(): + logger.debug(f"Row {row_num} skipped: missing UTR") + return None + + return txn + + def _build_summary(self, txns: List[Dict]) -> Dict: + """ + Build compact summary: + - total_count + - amount_total + - by_status: count, amount + """ + total_count = len(txns) + amount_total = Decimal('0') + by_status: Dict[str, Dict[str, object]] = {} + + for t in txns: + amt = t.get('amount') + if isinstance(amt, Decimal): + pass + elif isinstance(amt, str): + try: + amt = Decimal(amt) + except InvalidOperation: + amt = Decimal('0') + elif amt is None: + amt = Decimal('0') + + amount_total += amt + + st = (str(t.get('status') or '')).upper() + if st not in by_status: + by_status[st] = {'count': 0, 'amount': Decimal('0')} + by_status[st]['count'] += 1 + by_status[st]['amount'] = by_status[st]['amount'] + amt + + by_status_str = {k: {'count': v['count'], 'amount': f"{v['amount']:.2f}"} for k, v in by_status.items()} + + return { + 'total_count': total_count, + 'amount_total': f"{amount_total:.2f}", + 'by_status': by_status_str + } + + +# ------------------------- +# Printing Utilities +# ------------------------- + +def print_transactions(transactions: List[Dict], limit: Optional[int] = 50): + """ + Console print (raw transaction dict view similar to UIH print). + Includes all fields except time, REJECT_CODE, and REJECT_REASON. + """ + cols = [ + ('utr', 20), + ('amount', 12), + ('status', 8), + ('journal_no', 14), + ('tran_date', 10), + ('sender_acct_name', 28), + ('remitter_acct_no', 22), + ('benef_acct_no', 22), + ('ifsc_sender', 12), + ('ifsc_recvr', 12), + ('remitter_detail', 28), + ('remmiter_info', 24), + ('beneficiary_details', 30), + ('benef_address', 30), + ('sub_msg_type', 10), + ] + header = " ".join([f"{name.upper():<{w}}" for name, w in cols]) + print("\n" + "=" * len(header)) + print(header) + print("=" * len(header)) + + shown = 0 + for txn in transactions: + row = [] + for name, w in cols: + val = txn.get(name, '') + if isinstance(val, Decimal): + val = f"{val:.2f}" + row.append(f"{str(val)[:w]:<{w}}") + print(" ".join(row)) + shown += 1 + if limit and shown >= limit: + print(f"... ({len(transactions) - shown} more rows not shown)") + break + + print("=" * len(header)) + print(f"Total transactions parsed: {len(transactions)}\n") + + +def print_metadata(metadata: Dict): + """Print file metadata (UIH-like).""" + print("\n" + "=" * 80) + print("FILE METADATA") + print("=" * 80) + for key, value in metadata.items(): + print(f"{key.upper():<20}: {value}") + print("=" * 80 + "\n") + + +def print_summary(summary: Dict): + """Print summary data.""" + if summary: + print("\n" + "=" * 80) + print("SUMMARY DATA") + print("=" * 80) + for key, value in summary.items(): + print(f"{key.upper()}: {value}") + print("=" * 80 + "\n") + + +# ------------------------- +# Runner +# ------------------------- + +if __name__ == '__main__': + from logging_config import setup_logging + + setup_logging() + + parser = SFTPUtrParser('/home/bishwajeet/test_parser/06032026_14_NEFT_INWARD.TXT') + transactions, metadata, summary = parser.parse() + + print_metadata(metadata) + print_transactions(transactions, limit=80) + print_summary(summary) + + logger.info(f"Parsing complete. Extracted {len(transactions)} transactions") \ No newline at end of file diff --git a/scheduler.py b/scheduler.py new file mode 100644 index 0000000..a92a0c3 --- /dev/null +++ b/scheduler.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +""" +RTGS file processing scheduler. +Runs polling loop every 30 minutes to process new files. +""" + +import signal +import time +import sys +from datetime import datetime +from logging_config import get_logger, setup_logging +from config import get_config +from db import OracleConnector, Repository +from sftp import SFTPClient, FileMonitor +from processors import FileProcessor + +logger = get_logger(__name__) + + +class Scheduler: + """Main scheduler for RTGS file processing.""" + + def __init__(self): + """Initialize scheduler.""" + self.config = get_config() + self.config.validate() + self.running = True + self.cycle_count = 0 + + # Setup signal handlers for graceful shutdown + signal.signal(signal.SIGTERM, self._signal_handler) + signal.signal(signal.SIGINT, self._signal_handler) + + def _signal_handler(self, signum, frame): + """Handle shutdown signals gracefully.""" + logger.info(f"Received signal {signum}, shutting down gracefully...") + self.running = False + + def initialize_database(self): + """Initialize database connection and verify tables exist.""" + try: + connector = OracleConnector() + if connector.test_connection(): + logger.info("Database connection test passed") + repository = Repository() + repository.verify_tables_exist() + return True + else: + logger.error("Database connection test failed") + return False + except SystemExit as e: + logger.error(f"Database initialization failed: {e}") + raise + except Exception as e: + logger.error(f"Error initializing database: {e}", exc_info=True) + return False + + def run_processing_cycle(self): + """Run single file processing cycle.""" + self.cycle_count += 1 + logger.info(f"=== Starting processing cycle {self.cycle_count} ===") + + sftp_client = SFTPClient() + repository = Repository() + + try: + # Connect to SFTP + if not sftp_client.connect(): + logger.error("Failed to connect to SFTP server") + return + + # Scan for new files across all banks + monitor = FileMonitor(sftp_client) + new_files = [] + today_str = datetime.now().strftime("%d%m%Y") + logger.info(f'listing file for {today_str}') + + for bank_code in self.config.bank_codes: + # Get list of files already processed for this specific bank + bank_processed = repository.get_processed_files(bank_code) + remote_path = f"{self.config.sftp_base_path}/{bank_code}/RTGS" + + + pattern = f"{today_str}_*_RTGS_INWARD.TXT" + files = sftp_client.list_files(remote_path, pattern=pattern) + + + for filename in files: + if filename not in bank_processed: + full_path = f"{remote_path}/{filename}" + new_files.append((filename, bank_code, full_path)) + logger.info(f"Found new file: {filename} (bank: {bank_code})") + else: + logger.debug(f"Skipping already processed file for {bank_code}: {filename}") + + if not new_files: + logger.info("No new files to process") + return + + logger.info(f"Found {len(new_files)} new files to process") + + # Process files + processor = FileProcessor(repository, sftp_client) + stats = processor.process_files(new_files) + + # Log summary + logger.info(f"Cycle {self.cycle_count} complete:") + logger.info(f" Total files: {stats['total']}") + logger.info(f" Successful: {stats['successful']}") + logger.info(f" Failed: {stats['failed']}") + + # Call rtgs_api_txn_post procedure once per cycle to process all inserted transactions + if stats['successful'] > 0: + logger.info("Calling rtgs_api_txn_post procedure for all inserted transactions...") + if repository.call_rtgs_api_txn_post(): + logger.info("Transaction post-processing completed successfully") + else: + logger.error("Transaction post-processing failed") + + except Exception as e: + logger.error(f"Error in processing cycle: {e}", exc_info=True) + + finally: + sftp_client.disconnect() + + def run(self): + """Run scheduler main loop.""" + logger.info("="*80) + logger.info("RTGS_INWARD File Processing Scheduler Started") + logger.info(f"Poll Interval: {self.config.poll_interval_minutes} minutes") + logger.info(f"Bank Codes: {', '.join(self.config.bank_codes)}") + logger.info("="*80) + + # Initialize database + try: + if not self.initialize_database(): + logger.error("Failed to initialize database. Exiting.") + return + except SystemExit as e: + logger.error(f"Fatal error: {e}") + raise + + # Run processing loop + poll_interval_seconds = self.config.poll_interval_minutes * 60 + + while self.running: + try: + self.run_processing_cycle() + except Exception as e: + logger.error(f"Unexpected error in processing cycle: {e}", exc_info=True) + + # Wait for next cycle + if self.running: + logger.info(f"Waiting {self.config.poll_interval_minutes} minutes until next cycle...") + time.sleep(poll_interval_seconds) + + logger.info("Scheduler shutdown complete") + + +def main(): + """Main entry point.""" + # Setup logging + setup_logging() + + # Create and run scheduler + scheduler = Scheduler() + scheduler.run() + + +if __name__ == '__main__': + main() diff --git a/sftp/__init__.py b/sftp/__init__.py new file mode 100644 index 0000000..588ec13 --- /dev/null +++ b/sftp/__init__.py @@ -0,0 +1,6 @@ +"""SFTP module for ACH file processing.""" + +from .sftp_client import SFTPClient +from .file_monitor import FileMonitor + +__all__ = ['SFTPClient', 'FileMonitor'] diff --git a/sftp/__pycache__/__init__.cpython-39.pyc b/sftp/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5976c3b8dd953a9da12116d60cceb8901a2e51e3 GIT binary patch literal 323 zcmYjMu};G<5RIKQL2ddE85qdYh>ZauP^F3mgj6Okr!;n2BPWh*qY8e88Hq2-%ET`) zaTgdk$?xvT@4d6@<#HZjiSrNrhW(Ss|Kdol@%S?Y8Hq?T$qnnc;Ed+j#9bm1oD(_Q zb1{?YHY&5%)9vQ*q3Eo9HmYc?D{j`eMXQkv&Nj+>-S1al3`0vc&PlvBTJ-_hVuP|f z+v{MRqx~*MxG&Q&1CX`>7#F?`12mIkl8BtLq5E@@fz-7z0Pm5@D23qrqB_`4RXgnu v$NEXBQ1z+}p!;L(BupW@+ACSnh6?aP2Y}V^GM-%i{^p7v?Nfw==Pc(xdizyf literal 0 HcmV?d00001 diff --git a/sftp/__pycache__/file_monitor.cpython-39.pyc b/sftp/__pycache__/file_monitor.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..7f59b9e7d3d84c06ff00324a3058e1b700f0c927 GIT binary patch literal 3337 zcmai0OK%*<5$>M%&MqlRmV}50XQBiSCoxDyHVlUl0ttNB3L?f32_#u49t@{@cZZ&x zS@rZvlW$($}1P%AF@y_3W-zq~J6;wf(5BuCA)Ds^g_4pFsQf z*1uAFiID%|V6mAncmQAdH4sL+gi&T>v`gXF%#5MgHHTK$8rof(;+mB?L$~YdzMXl) zR<{Lxhq;+QYu+oARpS8~{a9D?R*wPv4 zF0o~H4SpM!T~}@N*yF}w)!MW(bq2Z%9dOlU`*5dr*^{FvX)a}e?z4t{{I=mMe*#33 zm!u{Ut?6h*8oF;X3K3&e&$NrsO^+aE(l|>it{pxLBJeP-VtXk1W#g%490Us*Z0Y!E zOdO*>KihyXm1i~AdcV#qg+ z(sJ-3KIU9* zV}av87>Od`C5Y3KF|14F2rkDWPZmv#I=A4{CjKXM`eb6%QxNqsBDRuKqW71|mO*vrMD; zz2Miehh{~r&-{Kh65eCDtpK4K6g@aV@R*$jb0!^O+PZ-(g7TYm^j7v0ZS6~eHj}i5 zn&8HdwZ=)opw}B`StR(dkUSd2au5a*vqL#o_kwOz_6FXhy8f%nsRfYa!Y;{0UftR& z#*i@Qn8VYyg31flFxzbf7c{IsIskP?I=5cA*4$XIX;lwOG;mU2l9M2+kze`x2yNBS z7j1|i<>LtvUZ_;d8dn(B`aT*D%Fn=>IxHb13b zW85@X43BzrnXb@P)d1RD+f^G#lHtd<{|V|EK1%a;+c&wboq%CaUz2umobZFynvD2lMz%>4n?*Q!&ttX zIDf1r&Or7~ZJbU1SdE-3dljBDogo6J07C6Yc&Yz`Jgl&o!Qu7rB9(*Ljp1IiI|opu z_;4goA%B1=(q1Ds=*4OF7K7F1*K1GN-|kJqg`ZC+VKf~s!tyD;4SoT@w0l|HFRPz! z7vl_IS;(pAInFQ@Yz5*5M8*XTDrakR6aGU-hm$f{e5!0H2XdgSQyvRt4T`Z)Etx`M zm+k5~u5-*(4VWGmdhhBIgjaA#Jk9 zrHizp2zGH1Mfip)v4|4@x=$f*F2MUOm#(+S1r7UiRWXrh6)CDXeTv+=qT)#^*Y_be z;Nw8~`Wf5kXKDSE$4`<-o{qS3vG+XAl%uDPF8cu90UV@3sRvP|uH?q$S3tuM(*5un zz8f&KqDc8sG%VN{o*9wqMA5TxoK08UC}KqtMa=`|8VD9BSed01!o=avaoB-ClW78I z85B&Q8N?=%Pmp|$*%W$P5Xuli;s!IRlM092&Q#3rZXefJG%NcItG5t*c@6NOQ$v~wuer~ zQQvm#4!w@2Yuw?r$9kv!iN+k}KGB%_*yt=VV^?c?c~IZozI)s6#t~1X-%Ewxi(>Al zBQ8Q2rAf9~zZE7d=9%A3lZ1Dv@>1D}MOioff{V!|A6<|Rzri~3#ax%gNpT*aFS2(IDK8XdS}GJ}~A3=WG&Pzp{Bu3Ju}!cTS2+WR@rHcZLu}Bj^9&k z1Ijs5DBWCFs>v@ZJss&q>FV0`Tcs*SBJJ`li<16kSI2he4}BYWuj9?mLP+hgHq`G?|XojQq^6l1N5j9Oc|!Fv{jm+Y+RU!YLJ1SXobw3Ny?0UqG)Jk9#jPv4kG4 zKDxZ04*BK1DBFJ!4mg*W6W)_SlspIp3&^s;UYK!qnPyNTC@sG^niO7uj*$$4)3jL! zLNh$w(eu-^n^Fy%^FFYhBe)!kDmEm^7FDeSZ2;tM;`-7albUE->)_19q5mAR@#XJ#u)=~(7(T`zaQnJ?3FPv{K>ZuzhDt-Ja8?GPqbjWe>o z)(?pB_`@zN-Ai}2+bgt;2#3-Av{T#Ny;WLSc-T_syBU}|j1;Pa{WO!oeLe|BX&iMY zo#wT%Os_F^TNMj;ww@r^6g7gE8Z!G-M{BCu>U?E!e#lM@)Jg*CB3zooDSbp?(N2rn z&4*n+BHvTkT!>V>Fa+tX(78f>DH^B|#Io<;&3p*WHH^BxY&yVk`DgsYsWtR-x?{BT zH9cQl!1nA6{~pl$m@wZ2=rplPl7L+TRGdfID9h43w3uE3GWig2v?qHrIKGWGhZ!Xt z%N~HDNl;Ac<*x#w2_Rbe#`QP_WJ+IfguQ&@L=C(*`my|-o1__4f+IRPZp+_kyQOYz zH9%Qed=F<5=SaLm!YA>4668>h)LPV)w^tG1XK2ZmAvD8)R_p3*=`B59U(n`J|F&5y z2|i3E4@L+Ag$pTxa?!#RU5at@hj3T9kH4WnTF%y_eh61d`s!=DTK@aKc3|eq$^zyT z^#|5HEx8CyTa!CFETU@|VK3^ruQ?$7J&Xp;O~gs(z@Az+wBMXNaHjU5GbI;fJU7H= zQ}fWH60P?6XPBqPY+0Y0kKsb#NDN3<+0UUKa*0}KUaa1j{1tX~;db7W`oLfw^<%ZE zOAd;H5jmeec0qe#)`9!Th5J#z%IX%Z-pK!6d4^A;3?Uhe&ir90yZdC+ndl&CgOe;RrsA6*l90<9=b{h=r5Et8=Ev)%KLn5tZn{FtgN`WEM@%Si^(-c%6Jeu?IcX3&<1e-?OX>4v_nH|Ryw_Q1xL;p=sH zxwnq<@@ST}skTzokB|d`I_gVNrw-~Y z<=<3jx-rj1Nx^*~e;6KvkNPS!Wr=&`sal)KRpw;Map~wG{E{F@T z!S>y@L2gtaY0oI26Ef2K`O5sEpz?b-=42ukJ( zr(C75hxfsY!bJLh6fTgN5;jUi|CGMo1l;C)Sp{B>ZrKJzg+A_J>u7h}lG zjA~%pDA!R@gZ4ImjzM&Q+BZUUZ6RJ5Q)7Ubp@3&k*bI2@syKs)qEbr*yedZ973kEL z0v(dgwv&Hdf$kG7{fx4Ac)FQ$CL{bG&{i$+#%yz&Fs>GcciWpto?Y^h2fY6iNnb-CHHe5xs)CCWH6ByNvXK^#LTiyC=n|T;X z-05`jE=J7=ROaMzGUA0x+4&b?TsZV0_fGB1_4XEK_Hr#u(0;hG%oc-`@=00anPceAwEP|e55KP zDD)Rpyh`E*iKAepzEmo1qk-~7Wo{2)%5z%hyrtD2tiHXx?%9Ic+YNDx#-D*Gj5Jf( z;eH(LmF}lS*8sSy0LeN2Ngya&RF@2OB^EcSs8U#(>rI;|ypPL+Um;y( List[Tuple[str, str, str]]: + """ + Scan all bank directories for new RTGS files. + + Args: + processed_filenames: List of already processed filenames to skip + + Returns: + List of (filename, bankcode, full_remote_path) tuples + """ + new_files: List[Tuple[str, str, str]] = [] + + for bank_code in self.config.bank_codes: + # Adjust subfolder name here if required (e.g., 'RTGS_INWARD' or other) + remote_path = f"{self.config.sftp_base_path}/{bank_code}/RTGS" + + # Match any RTGS inward file for any date/hour + files = self.sftp_client.list_files(remote_path, pattern='*_RTGS_INWARD.TXT') + + for filename in files: + if filename not in processed_filenames: + full_path = f"{remote_path}/{filename}" + new_files.append((filename, bank_code, full_path)) + logger.info(f"Found new RTGS file: {filename} (bank: {bank_code})") + else: + logger.debug(f"Skipping already processed RTGS file: {filename}") + + logger.info(f"RTGS scan complete: Found {len(new_files)} new files") + return new_files + + @staticmethod + def parse_filename(filename: str) -> Dict[str, str]: + """ + Parse RTGS filename to extract metadata. + + Expected format: + DDMMYYYY_HH_RTGS_INWARD.TXT + Example: + 06032026_14_RTGS_INWARD.TXT + + Args: + filename: Filename to parse + + Returns: + Dictionary with extracted metadata or empty dict if parse fails + """ + # Groups: DD, MM, YYYY, HH + pattern = r'^(\d{2})(\d{2})(\d{4})_(\d{2})_RTGS_INWARD\.TXT$' + match = re.match(pattern, filename, flags=re.IGNORECASE) + + if not match: + logger.warning(f"Could not parse RTGS filename: {filename}") + return {} + + day, month, year, hour = match.groups() + + return { + 'filename': filename, + 'day': day, + 'month': month, + 'year': year, + 'hour': hour, + 'timestamp': f"{day}/{month}/{year} {hour}:00:00" + } + + def __enter__(self): + """Context manager entry.""" + if not self.sftp_client.sftp: + self.sftp_client.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.sftp_client.disconnect() + diff --git a/sftp/sftp_client.py b/sftp/sftp_client.py new file mode 100644 index 0000000..381509a --- /dev/null +++ b/sftp/sftp_client.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +SFTP client for file operations. +Handles connection, file discovery, and download operations. +""" + +import paramiko +import os +from pathlib import Path +from logging_config import get_logger +from config import get_config + +logger = get_logger(__name__) + + +class SFTPClient: + """SFTP operations for RTGS file processing.""" + + def __init__(self): + """Initialize SFTP client.""" + self.config = get_config() + self.sftp = None + self.ssh = None + + def connect(self) -> bool: + """ + Establish SFTP connection. + + Returns: + True if successful, False otherwise + """ + try: + # Create SSH client + self.ssh = paramiko.SSHClient() + self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + # Connect + self.ssh.connect( + self.config.sftp_host, + port=self.config.sftp_port, + username=self.config.sftp_username, + password=self.config.sftp_password, + timeout=10 + ) + + # Get SFTP channel + self.sftp = self.ssh.open_sftp() + logger.info(f"Connected to SFTP server: {self.config.sftp_host}:{self.config.sftp_port}") + return True + + except Exception as e: + logger.error(f"Failed to connect to SFTP server: {e}", exc_info=True) + return False + + def disconnect(self): + """Close SFTP connection.""" + try: + if self.sftp: + self.sftp.close() + if self.ssh: + self.ssh.close() + logger.info("SFTP connection closed") + except Exception as e: + logger.error(f"Error closing SFTP connection: {e}") + + def list_files(self, remote_path: str, pattern: str) -> list: + """ + List files matching pattern in remote directory. + + Args: + remote_path: Path on SFTP server + pattern: File pattern to match (e.g., 'ACH_*.txt') + + Returns: + List of matching filenames + """ + if not self.sftp: + logger.error("SFTP not connected") + return [] + + try: + files = [] + try: + items = self.sftp.listdir_attr(remote_path) + except FileNotFoundError: + logger.warning(f"Directory not found: {remote_path}") + return [] + + import fnmatch + for item in items: + if fnmatch.fnmatch(item.filename, pattern): + files.append(item.filename) + + logger.debug(f"Found {len(files)} files matching {pattern} in {remote_path}") + return sorted(files) + + except Exception as e: + logger.error(f"Error listing files in {remote_path}: {e}", exc_info=True) + return [] + + def download_file(self, remote_path: str, local_path: str) -> bool: + """ + Download file from SFTP server. + + Args: + remote_path: Full path on SFTP server + local_path: Local destination path + + Returns: + True if successful, False otherwise + """ + if not self.sftp: + logger.error("SFTP not connected") + return False + + try: + # Create local directory if needed + Path(local_path).parent.mkdir(parents=True, exist_ok=True) + + # Download file + self.sftp.get(remote_path, local_path) + logger.info(f"Downloaded file: {remote_path} -> {local_path}") + return True + + except Exception as e: + logger.error(f"Error downloading file {remote_path}: {e}", exc_info=True) + return False + + def get_file_size(self, remote_path: str) -> int: + """ + Get size of remote file. + + Args: + remote_path: Full path on SFTP server + + Returns: + File size in bytes, or -1 if error + """ + if not self.sftp: + logger.error("SFTP not connected") + return -1 + + try: + stat = self.sftp.stat(remote_path) + return stat.st_size + except Exception as e: + logger.error(f"Error getting file size {remote_path}: {e}") + return -1 + + def __enter__(self): + """Context manager entry.""" + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.disconnect()