#!/usr/bin/env python3
"""
Automatic Dataset Cleaner
Usage:
python automatic_dataset_cleaner.py --file /path/to/data.csv
python automatic_dataset_cleaner.py --file /path/to/data.csv --missing-strategy mean --outliers cap --encode --scale standard
Outputs:
- /path/to/data_cleaned.csv (cleaned dataset)
- /path/to/data_cleaning_report.json (summary of cleaning actions)
"""
import argparse
import pandas as pd
import numpy as np
import json
import os
from datetime import datetime
from sklearn.preprocessing import StandardScaler, MinMaxScaler
# ---------- Helpers ----------
def read_csv(path):
df = pd.read_csv(path)
return df
def save_csv(df, path):
df.to_csv(path, index=False)
def save_json(obj, path):
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f, indent=2, default=str)
def summary_stats(df):
return {
"rows": int(df.shape[0]),
"columns": int(df.shape[1]),
"missing_per_column": df.isnull().sum().to_dict(),
"dtypes": {c: str(t) for c, t in df.dtypes.items()},
"sample_head": df.head(3).to_dict(orient="records")
}
def auto_cast_columns(df):
"""Try to cast columns to numeric/datetime where appropriate."""
conversions = {}
for col in df.columns:
if df[col].dtype == object:
# try datetime
try:
parsed = pd.to_datetime(df[col], errors="coerce")
non_null = parsed.notnull().sum()
if non_null / max(1, len(parsed)) > 0.6:
df[col] = parsed
conversions[col] = "datetime"
continue
except Exception:
pass
# try numeric
coerced = pd.to_numeric(df[col].str.replace(",", "").replace(" ", ""), errors="coerce")
if coerced.notnull().sum() / max(1, len(coerced)) > 0.6:
df[col] = coerced
conversions[col] = "numeric"
return df, conversions
# ---------- Missing values ----------
def handle_missing(df, strategy="mean", fill_value=None, threshold_drop_col=0.5):
"""
strategy: 'drop-row', 'drop-col', 'mean', 'median', 'mode', 'ffill', 'bfill', 'constant'
threshold_drop_col: if fraction of missing > threshold, drop column
"""
report = {"strategy": strategy, "dropped_columns": [], "details": {}}
# drop columns with too many missing values
missing_frac = df.isnull().mean()
cols_to_drop = missing_frac[missing_frac > threshold_drop_col].index.tolist()
if cols_to_drop:
df = df.drop(columns=cols_to_drop)
report["dropped_columns"] = cols_to_drop
if strategy == "drop-row":
before = len(df)
df = df.dropna(axis=0)
report["rows_dropped"] = before - len(df)
elif strategy == "drop-col":
before_cols = df.shape[1]
df = df.dropna(axis=1)
report["cols_dropped"] = before_cols - df.shape[1]
elif strategy in ("mean", "median", "mode", "ffill", "bfill", "constant"):
for col in df.columns:
if df[col].isnull().any():
if strategy == "mean" and pd.api.types.is_numeric_dtype(df[col]):
val = df[col].mean()
df[col] = df[col].fillna(val)
report["details"][col] = f"filled mean={val}"
elif strategy == "median" and pd.api.types.is_numeric_dtype(df[col]):
val = df[col].median()
df[col] = df[col].fillna(val)
report["details"][col] = f"filled median={val}"
elif strategy == "mode":
mode_val = df[col].mode()
if not mode_val.empty:
val = mode_val.iloc[0]
df[col] = df[col].fillna(val)
report["details"][col] = f"filled mode={val}"
else:
df[col] = df[col].fillna(fill_value)
report["details"][col] = f"filled mode_empty used const={fill_value}"
elif strategy == "ffill":
df[col] = df[col].fillna(method="ffill").fillna(method="bfill")
report["details"][col] = "filled forward/backward"
elif strategy == "bfill":
df[col] = df[col].fillna(method="bfill").fillna(method="ffill")
report["details"][col] = "filled backward/forward"
else: # constant
df[col] = df[col].fillna(fill_value)
report["details"][col] = f"filled const={fill_value}"
else:
raise ValueError("Unknown missing strategy")
return df, report
# ---------- Outlier detection/treatment ----------
def iqr_outlier_bounds(series, k=1.5):
q1 = series.quantile(0.25)
q3 = series.quantile(0.75)
iqr = q3 - q1
low = q1 - k * iqr
high = q3 + k * iqr
return low, high
def handle_outliers(df, method="remove", k=1.5, numeric_only=True):
"""
method: 'remove' (drop rows with outlier), 'cap' (clip to bounds), 'mark' (add boolean column)
returns df, report
"""
report = {"method": method, "columns": {}}
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() if numeric_only else df.columns.tolist()
rows_before = df.shape[0]
for col in numeric_cols:
series = df[col].dropna()
if series.empty:
continue
low, high = iqr_outlier_bounds(series, k=k)
is_out = (df[col] < low) | (df[col] > high)
out_count = int(is_out.sum())
if out_count == 0:
continue
report["columns"][col] = {"outliers": out_count, "bounds": (float(low), float(high))}
if method == "remove":
df = df.loc[~is_out]
elif method == "cap":
df[col] = df[col].clip(lower=low, upper=high)
elif method == "mark":
df[f"{col}_outlier"] = is_out.astype(int)
else:
raise ValueError("Unknown outlier method")
rows_after = df.shape[0]
report["rows_before"] = int(rows_before)
report["rows_after"] = int(rows_after)
return df, report
# ---------- Duplicates ----------
def handle_duplicates(df, subset=None, keep="first"):
before = df.shape[0]
df2 = df.drop_duplicates(subset=subset, keep=keep)
after = df2.shape[0]
report = {"rows_before": int(before), "rows_after": int(after), "dropped": int(before-after)}
return df2, report
# ---------- Encoding ----------
def encode_categoricals(df, one_hot=False, max_unique_for_onehot=20):
report = {"encoded_columns": {}}
cat_cols = df.select_dtypes(include=["category", "object"]).columns.tolist()
if not cat_cols:
return df, report
for col in cat_cols:
nunique = df[col].nunique(dropna=False)
if one_hot and nunique <= max_unique_for_onehot:
dummies = pd.get_dummies(df[col].astype(str), prefix=col, dummy_na=True)
df = pd.concat([df.drop(columns=[col]), dummies], axis=1)
report["encoded_columns"][col] = {"method": "one_hot", "new_cols": list(dummies.columns)}
else:
# label encoding (simple mapping)
mapping = {val: i for i, val in enumerate(df[col].astype(str).unique())}
df[col] = df[col].astype(str).map(mapping)
report["encoded_columns"][col] = {"method": "label", "mapping_sample": dict(list(mapping.items())[:10])}
return df, report
# ---------- Scaling ----------
def scale_numeric(df, method="standard"):
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
report = {"method": method, "scaled_columns": numeric_cols}
if not numeric_cols:
return df, report
arr = df[numeric_cols].values.astype(float)
if method == "standard":
scaler = StandardScaler()
elif method == "minmax":
scaler = MinMaxScaler()
else:
raise ValueError("Unknown scaling method")
scaled = scaler.fit_transform(arr)
df[numeric_cols] = scaled
return df, report
# ---------- Main cleaning pipeline ----------
def clean_dataset(
input_path,
missing_strategy="mean",
missing_constant=None,
missing_drop_threshold=0.5,
outlier_method="remove",
outlier_k=1.5,
outlier_numeric_only=True,
dedupe_subset=None,
encode=False,
one_hot=False,
scale_method=None
):
# read
df = read_csv(input_path)
report = {"input_path": input_path, "start_time": str(datetime.now()), "initial_summary": summary_stats(df), "steps": {}}
# auto-cast columns
df, convs = auto_cast_columns(df)
report["steps"]["auto_cast"] = convs
# missing
df, missing_report = handle_missing(df, strategy=missing_strategy, fill_value=missing_constant, threshold_drop_col=missing_drop_threshold)
report["steps"]["missing"] = missing_report
# duplicates
df, dup_report = handle_duplicates(df, subset=dedupe_subset, keep="first")
report["steps"]["duplicates"] = dup_report
# outliers
df, out_report = handle_outliers(df, method=outlier_method, k=outlier_k, numeric_only=outlier_numeric_only)
report["steps"]["outliers"] = out_report
# encode
if encode:
df, enc_report = encode_categoricals(df, one_hot=one_hot)
report["steps"]["encoding"] = enc_report
# scale
if scale_method:
df, scale_report = scale_numeric(df, method=scale_method)
report["steps"]["scaling"] = scale_report
report["final_summary"] = summary_stats(df)
report["end_time"] = str(datetime.now())
# output
base, ext = os.path.splitext(input_path)
cleaned_path = f"{base}_cleaned{ext}"
report_path = f"{base}_cleaning_report.json"
save_csv(df, cleaned_path)
save_json(report, report_path)
return cleaned_path, report_path, report
# ---------- CLI ----------
def parse_args():
p = argparse.ArgumentParser(description="Automatic Dataset Cleaner")
p.add_argument("--file", "-f", required=True, help="Path to CSV file")
p.add_argument("--missing-strategy", default="mean", choices=["drop-row","drop-col","mean","median","mode","ffill","bfill","constant"], help="Missing value strategy")
p.add_argument("--missing-constant", default=None, help="Constant value to fill missing when strategy=constant")
p.add_argument("--missing-drop-threshold", type=float, default=0.5, help="Drop columns with missing fraction > threshold")
p.add_argument("--outliers", default="remove", choices=["remove","cap","mark","none"], help="Outlier handling method")
p.add_argument("--outlier-k", type=float, default=1.5, help="IQR multiplier for outlier detection")
p.add_argument("--dedupe-subset", default=None, help="Comma separated columns to consider for duplicates (default=all columns)")
p.add_argument("--encode", action="store_true", help="Encode categorical columns")
p.add_argument("--one-hot", action="store_true", help="One-hot encode small cardinality categoricals (used with --encode)")
p.add_argument("--scale", default=None, choices=["standard","minmax"], help="Scale numeric columns")
return p.parse_args()
def main_cli():
args = parse_args()
dedupe_subset = args.dedupe_subset.split(",") if args.dedupe_subset else None
outlier_method = args.outliers if args.outliers != "none" else None
cleaned_path, report_path, report = clean_dataset(
input_path=args.file,
missing_strategy=args.missing_strategy,
missing_constant=args.missing_constant,
missing_drop_threshold=args.missing_drop_threshold,
outlier_method=outlier_method,
outlier_k=args.outlier_k,
outlier_numeric_only=True,
dedupe_subset=dedupe_subset,
encode=args.encode,
one_hot=args.one_hot,
scale_method=args.scale
)
print("Cleaning complete.")
print("Cleaned file:", cleaned_path)
print("Report saved to:", report_path)
# Also print a short summary
print(json.dumps({
"rows_before": report["initial_summary"]["rows"],
"rows_after": report["final_summary"]["rows"],
"columns_before": report["initial_summary"]["columns"],
"columns_after": report["final_summary"]["columns"],
"missing_info": report["initial_summary"]["missing_per_column"]
}, indent=2))
if __name__ == "__main__":
main_cli()