Files
ach_ui_dbtl_file_based/tests/mock_sftp_server.py
2026-02-02 13:06:07 +05:30

356 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Simple mock SFTP server for local testing without Docker.
Uses paramiko to create a basic SFTP server.
"""
import os
import socket
import threading
import paramiko
import sys
from pathlib import Path
from logging_config import get_logger
logger = get_logger(__name__)
class MockSFTPServer(paramiko.ServerInterface):
"""Mock SSH server for testing."""
def __init__(self, sftp_root):
self.sftp_root = sftp_root
self.event = threading.Event()
def check_auth_password(self, username, password):
"""Allow any username/password for testing."""
if username == 'ipks' and password == 'ipks_password':
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_channel_request(self, kind, chanid):
"""Allow SSH_FILEXFER channel."""
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_channel_subsystem_request(self, channel, name):
"""Allow SFTP subsystem."""
if name == 'sftp':
return True
return False
class MockSFTPHandle(paramiko.SFTPHandle):
"""Mock file handle for SFTP."""
def __init__(self, flags=0):
super().__init__(flags)
self.file_obj = None
def stat(self):
"""Get file stats."""
if self.file_obj:
return paramiko.SFTPAttributes.from_stat(os.fstat(self.file_obj.fileno()))
return paramiko.SFTPAttributes()
def chattr(self, attr):
"""Set file attributes."""
if self.file_obj:
return paramiko.SFTP_OK
return paramiko.SFTP_NO_SUCH_FILE
def close(self):
"""Close file."""
if self.file_obj:
self.file_obj.close()
self.file_obj = None
return paramiko.SFTP_OK
def read(self, offset, length):
"""Read from file."""
if not self.file_obj:
return paramiko.SFTP_NO_SUCH_FILE
try:
self.file_obj.seek(offset)
return self.file_obj.read(length)
except Exception as e:
logger.error(f"Error reading file: {e}")
return paramiko.SFTP_FAILURE
def write(self, offset, data):
"""Write to file."""
if not self.file_obj:
return paramiko.SFTP_NO_SUCH_FILE
try:
self.file_obj.seek(offset)
self.file_obj.write(data)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error writing file: {e}")
return paramiko.SFTP_FAILURE
class MockSFTPServerInterface(paramiko.SFTPServerInterface):
"""Mock SFTP server interface."""
def __init__(self, server, *args, **kwargs):
super().__init__(server, *args, **kwargs)
self.sftp_root = server.sftp_root
def session_started(self):
"""Session started."""
pass
def session_ended(self):
"""Session ended."""
pass
def open(self, path, flags, attr):
"""Open file."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
# Security check: ensure path is within sftp_root
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
os.makedirs(os.path.dirname(full_path), exist_ok=True)
if flags & os.O_WRONLY:
file_obj = open(full_path, 'wb')
else:
file_obj = open(full_path, 'rb')
handle = MockSFTPHandle()
handle.file_obj = file_obj
return handle
except Exception as e:
logger.error(f"Error opening file {path}: {e}")
return paramiko.SFTP_NO_SUCH_FILE
def close(self, path):
"""Close file."""
return paramiko.SFTP_OK
def list_folder(self, path):
"""List directory."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
# Security check
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(full_path):
return paramiko.SFTP_NO_SUCH_FILE
entries = []
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
attr = paramiko.SFTPAttributes.from_stat(os.stat(item_path))
attr.filename = item
entries.append(attr)
return entries
except Exception as e:
logger.error(f"Error listing directory {path}: {e}")
return paramiko.SFTP_NO_SUCH_FILE
def stat(self, path):
"""Get file stats."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(full_path):
return paramiko.SFTP_NO_SUCH_FILE
return paramiko.SFTPAttributes.from_stat(os.stat(full_path))
except Exception as e:
logger.error(f"Error getting stats for {path}: {e}")
return paramiko.SFTP_NO_SUCH_FILE
def lstat(self, path):
"""Get file stats (no follow)."""
return self.stat(path)
def remove(self, path):
"""Remove file."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(full_path):
return paramiko.SFTP_NO_SUCH_FILE
os.remove(full_path)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error removing {path}: {e}")
return paramiko.SFTP_FAILURE
def rename(self, oldpath, newpath):
"""Rename file."""
try:
old_full = os.path.join(self.sftp_root, oldpath.lstrip('/'))
new_full = os.path.join(self.sftp_root, newpath.lstrip('/'))
old_full = os.path.abspath(old_full)
new_full = os.path.abspath(new_full)
if not old_full.startswith(self.sftp_root) or not new_full.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(old_full):
return paramiko.SFTP_NO_SUCH_FILE
os.rename(old_full, new_full)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error renaming {oldpath}: {e}")
return paramiko.SFTP_FAILURE
def mkdir(self, path, attr):
"""Create directory."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
os.makedirs(full_path, exist_ok=True)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error creating directory {path}: {e}")
return paramiko.SFTP_FAILURE
def rmdir(self, path):
"""Remove directory."""
try:
full_path = os.path.join(self.sftp_root, path.lstrip('/'))
full_path = os.path.abspath(full_path)
if not full_path.startswith(self.sftp_root):
return paramiko.SFTP_PERMISSION_DENIED
if not os.path.exists(full_path):
return paramiko.SFTP_NO_SUCH_FILE
os.rmdir(full_path)
return paramiko.SFTP_OK
except Exception as e:
logger.error(f"Error removing directory {path}: {e}")
return paramiko.SFTP_FAILURE
def start_mock_sftp_server(host='127.0.0.1', port=2222, sftp_root='./sftp_data'):
"""
Start a mock SFTP server in a background thread.
Args:
host: Host to bind to (default: 127.0.0.1)
port: Port to bind to (default: 2222)
sftp_root: Root directory for SFTP (default: ./sftp_data)
Returns:
Thread object (daemon thread)
"""
# Create root directory if needed
Path(sftp_root).mkdir(parents=True, exist_ok=True)
# Generate host key
host_key = paramiko.RSAKey.generate(1024)
def run_server():
"""Run the SFTP server."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((host, port))
sock.listen(1)
logger.info(f"Mock SFTP server listening on {host}:{port}")
logger.info(f"SFTP root: {os.path.abspath(sftp_root)}")
logger.info(f"Username: ipks, Password: ipks_password")
while True:
try:
client, addr = sock.accept()
logger.debug(f"Connection from {addr}")
transport = paramiko.Transport(client)
transport.add_server_key(host_key)
transport.set_subsystem_handler(
'sftp',
paramiko.SFTPServer,
MockSFTPServerInterface
)
server = MockSFTPServer(os.path.abspath(sftp_root))
transport.start_server(server=server)
except KeyboardInterrupt:
logger.info("Server interrupted")
break
except Exception as e:
logger.error(f"Error handling connection: {e}", exc_info=True)
except Exception as e:
logger.error(f"Error starting server: {e}", exc_info=True)
finally:
sock.close()
logger.info("Mock SFTP server stopped")
# Start in daemon thread
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
return thread
if __name__ == '__main__':
from logging_config import setup_logging
import time
setup_logging()
print("\n" + "="*80)
print("Mock SFTP Server for Testing")
print("="*80)
# Create directory structure
sftp_root = './sftp_data'
for bank in ['HDFC', 'ICICI', 'SBI']:
nach_dir = f'{sftp_root}/{bank}/NACH'
Path(nach_dir).mkdir(parents=True, exist_ok=True)
print(f"✓ Created {nach_dir}")
print("\nStarting mock SFTP server...")
start_mock_sftp_server(sftp_root=sftp_root)
print("\n" + "="*80)
print("Server running. Press CTRL+C to stop.")
print("\nTo test connection:")
print(" sftp -P 2222 ipks@127.0.0.1")
print(" Password: ipks_password")
print("\nTo use with application:")
print(" SFTP_HOST=127.0.0.1")
print(" SFTP_PORT=2222")
print(" SFTP_USERNAME=ipks")
print(" SFTP_PASSWORD=ipks_password")
print("="*80 + "\n")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\nShutting down...")