init
This commit is contained in:
200
script/filter_ben_discrepancies.py
Normal file
200
script/filter_ben_discrepancies.py
Normal file
@@ -0,0 +1,200 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import os
|
||||
from collections import Counter
|
||||
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("--input", "-i", required=True, help="Input CSV file (table dump).")
|
||||
p.add_argument("--outdir", "-o", help="Output directory")
|
||||
p.add_argument(
|
||||
"-write-sql",
|
||||
action="store_true",
|
||||
help="Write SQL suggestions for collapsed records",
|
||||
)
|
||||
p.add_argument(
|
||||
"--cols",
|
||||
nargs="*",
|
||||
default=[],
|
||||
help="Optional column mappings like customer_no=colname ...",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def apply_col_mappings(cols):
|
||||
# defaults
|
||||
mapping = {
|
||||
"customer_no": "customer_no",
|
||||
"beneficiary_name": "beneficiary_name",
|
||||
"beneficiary_acc_no": "beneficiary_acc_no",
|
||||
"transfer_type": "transfer_type",
|
||||
"beneficiary_ifsc_code": "beneficiary_ifsc_code",
|
||||
}
|
||||
for c in cols:
|
||||
if "=" in c:
|
||||
k, v = c.split("=", 1)
|
||||
if k in mapping:
|
||||
mapping[k] = v
|
||||
return mapping
|
||||
|
||||
|
||||
def normalize_str(s):
|
||||
if pd.isna(s):
|
||||
return ""
|
||||
return str(s).strip()
|
||||
|
||||
|
||||
def normalize_ifsc(s):
|
||||
s = normalize_str(s).upper()
|
||||
return s
|
||||
|
||||
|
||||
def normalize_acc(s):
|
||||
s = normalize_str(s).replace(" ", "")
|
||||
return s
|
||||
|
||||
|
||||
def normalize_name(s):
|
||||
s = normalize_str(s)
|
||||
return " ".join(s.split()).lower()
|
||||
|
||||
|
||||
def representative_value(series):
|
||||
vals = [v for v in series if v != "" and pd.notna(v)]
|
||||
if not vals:
|
||||
return ""
|
||||
c = Counter(vals)
|
||||
most_common_val, _ = c.most_common(1)[0]
|
||||
return most_common_val
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
mapping = apply_col_mappings(args.cols)
|
||||
os.makedirs(args.outdir, exist_ok=True)
|
||||
|
||||
df = pd.read_csv(args.input, dtype=str, keep_default_na=False)
|
||||
|
||||
for required in mapping.values():
|
||||
if required not in df.columns:
|
||||
print(
|
||||
f"ERROR: expected columns '{required}' not found in CSV columns: {list(df.columns)}"
|
||||
)
|
||||
return
|
||||
|
||||
df_proc = df.copy()
|
||||
# Add normalized columns for grouping/comparison
|
||||
df_proc["_cust"] = df_proc[mapping["customer_no"]].map(normalize_str)
|
||||
df_proc["_acc"] = df_proc[mapping["beneficiary_acc_no"]].map(normalize_acc)
|
||||
df_proc["_ifsc_raw"] = df_proc[mapping["beneficiary_ifsc_code"]].map(normalize_str)
|
||||
df_proc["_ifsc"] = df_proc["_ifsc_raw"].map(normalize_ifsc)
|
||||
df_proc["_name_raw"] = df_proc[mapping["beneficiary_name"]].map(normalize_str)
|
||||
df_proc["_name_norm"] = df_proc["_name_raw"].map(normalize_name)
|
||||
df_proc["_transfer_type"] = df_proc[mapping["transfer_type"]].map(normalize_str)
|
||||
|
||||
# Key used for grouping is (customer_no, beneficiary_acc_no)
|
||||
df_proc["_group_key"] = df_proc["_cust"] + "||" + df_proc["_acc"]
|
||||
|
||||
grouped = df_proc.groupby("_group_key", sort=False)
|
||||
|
||||
conflict_rows = []
|
||||
collapsed_records = []
|
||||
|
||||
for key, g in grouped:
|
||||
# find unique IFSC values (ignoring empty strings) and unique normalized names
|
||||
unique_ifsc = set([x for x in g["_ifsc"].unique() if x != ""])
|
||||
unique_names = set([x for x in g["_name_norm"].unique() if x != ""])
|
||||
|
||||
# If there's >1 unique IFSC or >1 unique name -> conflict
|
||||
is_conflict = (len(unique_ifsc) > 1) or (len(unique_names) > 1)
|
||||
|
||||
if is_conflict:
|
||||
# mark these rows for manual review
|
||||
conflict_rows.append(g)
|
||||
continue
|
||||
|
||||
# Non-conflict: create one representative collapsed row
|
||||
rep_ifsc = representative_value(
|
||||
g["_ifsc_raw"]
|
||||
) # preserve original casing from raw
|
||||
rep_name = representative_value(g["_name_raw"])
|
||||
rep_transfer_types = sorted(set(g["_transfer_type"].tolist()))
|
||||
orig_customer_no = representative_value(g["_cust"])
|
||||
orig_acc_no = representative_value(g["_acc"])
|
||||
|
||||
collapsed = {
|
||||
mapping["customer_no"]: orig_customer_no,
|
||||
mapping["beneficiary_acc_no"]: orig_acc_no,
|
||||
mapping["beneficiary_name"]: rep_name,
|
||||
mapping["beneficiary_ifsc_code"]: rep_ifsc,
|
||||
# note: new app doesn't need transfer_type; but we keep list of seen types for traceability
|
||||
"seen_transfer_types": ",".join(rep_transfer_types),
|
||||
"source_row_count": len(g),
|
||||
"source_indices": ",".join([str(i) for i in g.index.tolist()]),
|
||||
}
|
||||
# include additional columns if you want the first occurrence's others:
|
||||
# e.g. grab first occurrence entire row for other metadata
|
||||
first_row = g.iloc[0].to_dict()
|
||||
collapsed.update(
|
||||
{k: first_row.get(k, "") for k in df.columns if k not in collapsed}
|
||||
)
|
||||
collapsed_records.append(collapsed)
|
||||
|
||||
# Write outputs
|
||||
if conflict_rows:
|
||||
conflicts_df = pd.concat(conflict_rows, axis=0)
|
||||
# restore original columns + helpful columns
|
||||
out_conflicts = conflicts_df[
|
||||
df.columns.tolist()
|
||||
+ ["_ifsc", "_name_norm", "_transfer_type", "_group_key"]
|
||||
]
|
||||
out_conflicts.to_csv(
|
||||
os.path.join(args.outdir, "discrepancies.csv"), index=False
|
||||
)
|
||||
print(
|
||||
f"Wrote discrepancies to {os.path.join(args.outdir, 'discrepancies.csv')} ({len(conflicts_df)} rows in {len(conflict_rows)} groups)."
|
||||
)
|
||||
else:
|
||||
print("No discrepancies found.")
|
||||
|
||||
collapsed_df = pd.DataFrame(collapsed_records)
|
||||
# keep a sensible column order
|
||||
cols_order = [
|
||||
mapping["customer_no"],
|
||||
mapping["beneficiary_acc_no"],
|
||||
mapping["beneficiary_name"],
|
||||
mapping["beneficiary_ifsc_code"],
|
||||
"seen_transfer_types",
|
||||
"source_row_count",
|
||||
"source_indices",
|
||||
]
|
||||
for c in collapsed_df.columns:
|
||||
if c not in cols_order:
|
||||
cols_order.append(c)
|
||||
collapsed_df = collapsed_df[cols_order]
|
||||
collapsed_df.to_csv(os.path.join(args.outdir, "collapsed_clean.csv"), index=False)
|
||||
print(
|
||||
f"Wrote collapsed clean beneficiaries to {os.path.join(args.outdir, 'collapsed_clean.csv')} ({len(collapsed_df)} rows)."
|
||||
)
|
||||
|
||||
if args.write_sql:
|
||||
sql_path = os.path.join(args.outdir, "migration_suggestions.sql")
|
||||
with open(sql_path, "w", encoding="utf-8") as fh:
|
||||
for _, r in collapsed_df.iterrows():
|
||||
cust = str(r[mapping["customer_no"]]).replace("'", "''")
|
||||
acc = str(r[mapping["beneficiary_acc_no"]]).replace("'", "''")
|
||||
name = str(r[mapping["beneficiary_name"]]).replace("'", "''")
|
||||
ifsc = str(r[mapping["beneficiary_ifsc_code"]]).replace("'", "''")
|
||||
# Example insert - change table/columns to your new DB schema
|
||||
fh.write(
|
||||
f"INSERT INTO beneficiaries_new (customer_no, beneficiary_acc_no, beneficiary_name, beneficiary_ifsc_code) "
|
||||
f"VALUES ('{cust}','{acc}','{name}','{ifsc}');\n"
|
||||
)
|
||||
print(f"Wrote SQL suggestions to {sql_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user