Automatic Dataset Cleaner

#!/usr/bin/env python3

"""

Automatic Dataset Cleaner


Usage:

    python automatic_dataset_cleaner.py --file /path/to/data.csv

    python automatic_dataset_cleaner.py --file /path/to/data.csv --missing-strategy mean --outliers cap --encode --scale standard


Outputs:

    - /path/to/data_cleaned.csv           (cleaned dataset)

    - /path/to/data_cleaning_report.json  (summary of cleaning actions)

"""


import argparse

import pandas as pd

import numpy as np

import json

import os

from datetime import datetime

from sklearn.preprocessing import StandardScaler, MinMaxScaler


# ---------- Helpers ----------

def read_csv(path):

    df = pd.read_csv(path)

    return df


def save_csv(df, path):

    df.to_csv(path, index=False)


def save_json(obj, path):

    with open(path, "w", encoding="utf-8") as f:

        json.dump(obj, f, indent=2, default=str)


def summary_stats(df):

    return {

        "rows": int(df.shape[0]),

        "columns": int(df.shape[1]),

        "missing_per_column": df.isnull().sum().to_dict(),

        "dtypes": {c: str(t) for c, t in df.dtypes.items()},

        "sample_head": df.head(3).to_dict(orient="records")

    }


def auto_cast_columns(df):

    """Try to cast columns to numeric/datetime where appropriate."""

    conversions = {}

    for col in df.columns:

        if df[col].dtype == object:

            # try datetime

            try:

                parsed = pd.to_datetime(df[col], errors="coerce")

                non_null = parsed.notnull().sum()

                if non_null / max(1, len(parsed)) > 0.6:

                    df[col] = parsed

                    conversions[col] = "datetime"

                    continue

            except Exception:

                pass

            # try numeric

            coerced = pd.to_numeric(df[col].str.replace(",", "").replace(" ", ""), errors="coerce")

            if coerced.notnull().sum() / max(1, len(coerced)) > 0.6:

                df[col] = coerced

                conversions[col] = "numeric"

    return df, conversions


# ---------- Missing values ----------

def handle_missing(df, strategy="mean", fill_value=None, threshold_drop_col=0.5):

    """

    strategy: 'drop-row', 'drop-col', 'mean', 'median', 'mode', 'ffill', 'bfill', 'constant'

    threshold_drop_col: if fraction of missing > threshold, drop column

    """

    report = {"strategy": strategy, "dropped_columns": [], "details": {}}

    # drop columns with too many missing values

    missing_frac = df.isnull().mean()

    cols_to_drop = missing_frac[missing_frac > threshold_drop_col].index.tolist()

    if cols_to_drop:

        df = df.drop(columns=cols_to_drop)

        report["dropped_columns"] = cols_to_drop


    if strategy == "drop-row":

        before = len(df)

        df = df.dropna(axis=0)

        report["rows_dropped"] = before - len(df)

    elif strategy == "drop-col":

        before_cols = df.shape[1]

        df = df.dropna(axis=1)

        report["cols_dropped"] = before_cols - df.shape[1]

    elif strategy in ("mean", "median", "mode", "ffill", "bfill", "constant"):

        for col in df.columns:

            if df[col].isnull().any():

                if strategy == "mean" and pd.api.types.is_numeric_dtype(df[col]):

                    val = df[col].mean()

                    df[col] = df[col].fillna(val)

                    report["details"][col] = f"filled mean={val}"

                elif strategy == "median" and pd.api.types.is_numeric_dtype(df[col]):

                    val = df[col].median()

                    df[col] = df[col].fillna(val)

                    report["details"][col] = f"filled median={val}"

                elif strategy == "mode":

                    mode_val = df[col].mode()

                    if not mode_val.empty:

                        val = mode_val.iloc[0]

                        df[col] = df[col].fillna(val)

                        report["details"][col] = f"filled mode={val}"

                    else:

                        df[col] = df[col].fillna(fill_value)

                        report["details"][col] = f"filled mode_empty used const={fill_value}"

                elif strategy == "ffill":

                    df[col] = df[col].fillna(method="ffill").fillna(method="bfill")

                    report["details"][col] = "filled forward/backward"

                elif strategy == "bfill":

                    df[col] = df[col].fillna(method="bfill").fillna(method="ffill")

                    report["details"][col] = "filled backward/forward"

                else:  # constant

                    df[col] = df[col].fillna(fill_value)

                    report["details"][col] = f"filled const={fill_value}"

    else:

        raise ValueError("Unknown missing strategy")

    return df, report


# ---------- Outlier detection/treatment ----------

def iqr_outlier_bounds(series, k=1.5):

    q1 = series.quantile(0.25)

    q3 = series.quantile(0.75)

    iqr = q3 - q1

    low = q1 - k * iqr

    high = q3 + k * iqr

    return low, high


def handle_outliers(df, method="remove", k=1.5, numeric_only=True):

    """

    method: 'remove' (drop rows with outlier), 'cap' (clip to bounds), 'mark' (add boolean column)

    returns df, report

    """

    report = {"method": method, "columns": {}}

    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() if numeric_only else df.columns.tolist()

    rows_before = df.shape[0]

    for col in numeric_cols:

        series = df[col].dropna()

        if series.empty:

            continue

        low, high = iqr_outlier_bounds(series, k=k)

        is_out = (df[col] < low) | (df[col] > high)

        out_count = int(is_out.sum())

        if out_count == 0:

            continue

        report["columns"][col] = {"outliers": out_count, "bounds": (float(low), float(high))}

        if method == "remove":

            df = df.loc[~is_out]

        elif method == "cap":

            df[col] = df[col].clip(lower=low, upper=high)

        elif method == "mark":

            df[f"{col}_outlier"] = is_out.astype(int)

        else:

            raise ValueError("Unknown outlier method")

    rows_after = df.shape[0]

    report["rows_before"] = int(rows_before)

    report["rows_after"] = int(rows_after)

    return df, report


# ---------- Duplicates ----------

def handle_duplicates(df, subset=None, keep="first"):

    before = df.shape[0]

    df2 = df.drop_duplicates(subset=subset, keep=keep)

    after = df2.shape[0]

    report = {"rows_before": int(before), "rows_after": int(after), "dropped": int(before-after)}

    return df2, report


# ---------- Encoding ----------

def encode_categoricals(df, one_hot=False, max_unique_for_onehot=20):

    report = {"encoded_columns": {}}

    cat_cols = df.select_dtypes(include=["category", "object"]).columns.tolist()

    if not cat_cols:

        return df, report

    for col in cat_cols:

        nunique = df[col].nunique(dropna=False)

        if one_hot and nunique <= max_unique_for_onehot:

            dummies = pd.get_dummies(df[col].astype(str), prefix=col, dummy_na=True)

            df = pd.concat([df.drop(columns=[col]), dummies], axis=1)

            report["encoded_columns"][col] = {"method": "one_hot", "new_cols": list(dummies.columns)}

        else:

            # label encoding (simple mapping)

            mapping = {val: i for i, val in enumerate(df[col].astype(str).unique())}

            df[col] = df[col].astype(str).map(mapping)

            report["encoded_columns"][col] = {"method": "label", "mapping_sample": dict(list(mapping.items())[:10])}

    return df, report


# ---------- Scaling ----------

def scale_numeric(df, method="standard"):

    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()

    report = {"method": method, "scaled_columns": numeric_cols}

    if not numeric_cols:

        return df, report

    arr = df[numeric_cols].values.astype(float)

    if method == "standard":

        scaler = StandardScaler()

    elif method == "minmax":

        scaler = MinMaxScaler()

    else:

        raise ValueError("Unknown scaling method")

    scaled = scaler.fit_transform(arr)

    df[numeric_cols] = scaled

    return df, report


# ---------- Main cleaning pipeline ----------

def clean_dataset(

    input_path,

    missing_strategy="mean",

    missing_constant=None,

    missing_drop_threshold=0.5,

    outlier_method="remove",

    outlier_k=1.5,

    outlier_numeric_only=True,

    dedupe_subset=None,

    encode=False,

    one_hot=False,

    scale_method=None

):

    # read

    df = read_csv(input_path)

    report = {"input_path": input_path, "start_time": str(datetime.now()), "initial_summary": summary_stats(df), "steps": {}}


    # auto-cast columns

    df, convs = auto_cast_columns(df)

    report["steps"]["auto_cast"] = convs


    # missing

    df, missing_report = handle_missing(df, strategy=missing_strategy, fill_value=missing_constant, threshold_drop_col=missing_drop_threshold)

    report["steps"]["missing"] = missing_report


    # duplicates

    df, dup_report = handle_duplicates(df, subset=dedupe_subset, keep="first")

    report["steps"]["duplicates"] = dup_report


    # outliers

    df, out_report = handle_outliers(df, method=outlier_method, k=outlier_k, numeric_only=outlier_numeric_only)

    report["steps"]["outliers"] = out_report


    # encode

    if encode:

        df, enc_report = encode_categoricals(df, one_hot=one_hot)

        report["steps"]["encoding"] = enc_report


    # scale

    if scale_method:

        df, scale_report = scale_numeric(df, method=scale_method)

        report["steps"]["scaling"] = scale_report


    report["final_summary"] = summary_stats(df)

    report["end_time"] = str(datetime.now())

    # output

    base, ext = os.path.splitext(input_path)

    cleaned_path = f"{base}_cleaned{ext}"

    report_path = f"{base}_cleaning_report.json"

    save_csv(df, cleaned_path)

    save_json(report, report_path)

    return cleaned_path, report_path, report


# ---------- CLI ----------

def parse_args():

    p = argparse.ArgumentParser(description="Automatic Dataset Cleaner")

    p.add_argument("--file", "-f", required=True, help="Path to CSV file")

    p.add_argument("--missing-strategy", default="mean", choices=["drop-row","drop-col","mean","median","mode","ffill","bfill","constant"], help="Missing value strategy")

    p.add_argument("--missing-constant", default=None, help="Constant value to fill missing when strategy=constant")

    p.add_argument("--missing-drop-threshold", type=float, default=0.5, help="Drop columns with missing fraction > threshold")

    p.add_argument("--outliers", default="remove", choices=["remove","cap","mark","none"], help="Outlier handling method")

    p.add_argument("--outlier-k", type=float, default=1.5, help="IQR multiplier for outlier detection")

    p.add_argument("--dedupe-subset", default=None, help="Comma separated columns to consider for duplicates (default=all columns)")

    p.add_argument("--encode", action="store_true", help="Encode categorical columns")

    p.add_argument("--one-hot", action="store_true", help="One-hot encode small cardinality categoricals (used with --encode)")

    p.add_argument("--scale", default=None, choices=["standard","minmax"], help="Scale numeric columns")

    return p.parse_args()


def main_cli():

    args = parse_args()

    dedupe_subset = args.dedupe_subset.split(",") if args.dedupe_subset else None

    outlier_method = args.outliers if args.outliers != "none" else None


    cleaned_path, report_path, report = clean_dataset(

        input_path=args.file,

        missing_strategy=args.missing_strategy,

        missing_constant=args.missing_constant,

        missing_drop_threshold=args.missing_drop_threshold,

        outlier_method=outlier_method,

        outlier_k=args.outlier_k,

        outlier_numeric_only=True,

        dedupe_subset=dedupe_subset,

        encode=args.encode,

        one_hot=args.one_hot,

        scale_method=args.scale

    )


    print("Cleaning complete.")

    print("Cleaned file:", cleaned_path)

    print("Report saved to:", report_path)

    # Also print a short summary

    print(json.dumps({

        "rows_before": report["initial_summary"]["rows"],

        "rows_after": report["final_summary"]["rows"],

        "columns_before": report["initial_summary"]["columns"],

        "columns_after": report["final_summary"]["columns"],

        "missing_info": report["initial_summary"]["missing_per_column"]

    }, indent=2))


if __name__ == "__main__":

    main_cli()

 

No comments: