Duplicate File Finder

import os

import hashlib

import json

import shutil

from pathlib import Path

from collections import defaultdict

from datetime import datetime


# ============================================================

# CONFIGURATION

# ============================================================


REPORT_FILE   = "duplicate_report.json"

CHUNK_SIZE    = 8192   # bytes read per chunk for hashing


# ============================================================

# HASH A FILE (MD5)

# ============================================================


def get_file_hash(filepath, algorithm="md5"):

    """

    Compute hash of a file in chunks to handle large files.

    Returns hex digest string or None on error.

    """

    try:

        h = hashlib.new(algorithm)

        with open(filepath, "rb") as f:

            while chunk := f.read(CHUNK_SIZE):

                h.update(chunk)

        return h.hexdigest()

    except (PermissionError, OSError):

        return None



# ============================================================

# QUICK PRE-FILTER: Group by File Size First

# ============================================================


def group_by_size(folder, recursive=True, extensions=None):

    """

    First pass: group files by size.

    Only files sharing the same size are candidates for duplication.

    This avoids hashing every single file.

    """

    size_map = defaultdict(list)


    if recursive:

        all_files = Path(folder).rglob("*")

    else:

        all_files = Path(folder).glob("*")


    for f in all_files:

        if not f.is_file():

            continue

        if extensions:

            if f.suffix.lower() not in extensions:

                continue

        try:

            size = f.stat().st_size

            if size > 0:   # skip empty files

                size_map[size].append(f)

        except OSError:

            continue


    # Keep only groups with 2+ files (potential duplicates)

    return {size: files for size, files in size_map.items() if len(files) > 1}



# ============================================================

# SECOND PASS: Hash Files Within Same-Size Groups

# ============================================================


def find_duplicates(folder, recursive=True, extensions=None, progress=True):

    """

    Full duplicate detection:

    1. Group by size  (fast)

    2. Hash same-size files (thorough)

    Returns dict: hash -> list of duplicate file paths

    """

    print(f"\n  Scanning: {folder}")

    print(f"  Mode    : {'Recursive' if recursive else 'Top-level only'}")

    if extensions:

        print(f"  Filter  : {', '.join(extensions)}")


    # Step 1 — Group by size

    print("\n  Step 1/2: Grouping files by size...")

    size_groups = group_by_size(folder, recursive, extensions)

    candidate_count = sum(len(v) for v in size_groups.values())

    print(f"  Found {candidate_count} candidate file(s) in {len(size_groups)} size group(s)")


    if not size_groups:

        print("\n  No duplicate candidates found.")

        return {}


    # Step 2 — Hash candidates

    print("\n  Step 2/2: Computing file hashes...")

    hash_map  = defaultdict(list)

    processed = 0


    for size, files in size_groups.items():

        for f in files:

            file_hash = get_file_hash(f)

            if file_hash:

                hash_map[file_hash].append(f)

            processed += 1

            if progress and processed % 50 == 0:

                print(f"  Processed {processed}/{candidate_count} files...", end="\r")


    print(f"  Processed {processed}/{candidate_count} files.         ")


    # Keep only actual duplicates (2+ files with same hash)

    duplicates = {h: files for h, files in hash_map.items() if len(files) > 1}

    return duplicates



# ============================================================

# DISPLAY RESULTS

# ============================================================


def display_results(duplicates):

    if not duplicates:

        print("\n  No duplicates found!")

        return


    total_groups = len(duplicates)

    total_files  = sum(len(v) for v in duplicates.values())

    total_wasted = 0


    print("\n" + "="*60)

    print(f"  DUPLICATE FILES FOUND")

    print("="*60)

    print(f"  Duplicate groups : {total_groups}")

    print(f"  Total duplicates : {total_files}")

    print("="*60)


    for i, (file_hash, files) in enumerate(duplicates.items(), 1):

        size_bytes = files[0].stat().st_size

        size_str   = _format_size(size_bytes)

        wasted     = size_bytes * (len(files) - 1)

        total_wasted += wasted


        print(f"\n  Group {i} | {len(files)} files | {size_str} each | "

              f"Wasted: {_format_size(wasted)}")

        print(f"  Hash: {file_hash[:16]}...")

        print("  " + "-"*54)


        for j, f in enumerate(files):

            try:

                mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%d-%m-%Y %H:%M")

            except:

                mtime = "unknown"

            marker = "  [ORIGINAL?]" if j == 0 else "  [DUPLICATE]"

            print(f"  {marker}  {f}")

            print(f"            Modified: {mtime}")


    print("\n" + "="*60)

    print(f"  Total wasted space: {_format_size(total_wasted)}")

    print("="*60)



# ============================================================

# SAVE REPORT

# ============================================================


def save_report(duplicates, folder):

    report = {

        "scanned_folder": str(folder),

        "scan_time":      datetime.now().strftime("%d-%m-%Y %H:%M:%S"),

        "total_groups":   len(duplicates),

        "total_files":    sum(len(v) for v in duplicates.values()),

        "groups": []

    }


    for file_hash, files in duplicates.items():

        group = {

            "hash":  file_hash,

            "size":  _format_size(files[0].stat().st_size),

            "files": [str(f) for f in files]

        }

        report["groups"].append(group)


    with open(REPORT_FILE, "w", encoding="utf-8") as f:

        json.dump(report, f, indent=2)


    print(f"\n  Report saved: {REPORT_FILE}")



# ============================================================

# DELETE DUPLICATES (Keep First / Newest / Oldest)

# ============================================================


def delete_duplicates(duplicates, strategy="keep_first", move_to=None):

    """

    strategy:

      keep_first  - keep the first file in each group, delete rest

      keep_newest - keep most recently modified file

      keep_oldest - keep oldest file

    move_to: if set, move duplicates here instead of deleting

    """

    if not duplicates:

        print("  No duplicates to remove.")

        return


    if move_to:

        Path(move_to).mkdir(parents=True, exist_ok=True)

        action = f"move to '{move_to}'"

    else:

        action = "DELETE PERMANENTLY"


    print(f"\n  Strategy : {strategy}")

    print(f"  Action   : {action}")


    confirm = input(f"\n  Confirm? This will {action} duplicates. (yes/no): ").strip().lower()

    if confirm != "yes":

        print("  Cancelled.")

        return


    removed_count = 0

    freed_bytes   = 0


    for file_hash, files in duplicates.items():

        # Sort by strategy to pick which to keep

        if strategy == "keep_newest":

            files_sorted = sorted(files, key=lambda f: f.stat().st_mtime, reverse=True)

        elif strategy == "keep_oldest":

            files_sorted = sorted(files, key=lambda f: f.stat().st_mtime)

        else:

            files_sorted = files  # keep_first = as found


        keeper    = files_sorted[0]

        to_remove = files_sorted[1:]


        print(f"\n  Keeping : {keeper.name}")


        for dup in to_remove:

            try:

                size = dup.stat().st_size

                if move_to:

                    dest = Path(move_to) / dup.name

                    # Handle name collision in destination

                    if dest.exists():

                        dest = Path(move_to) / f"{dup.stem}_{file_hash[:6]}{dup.suffix}"

                    shutil.move(str(dup), str(dest))

                    print(f"  Moved   : {dup.name} -> {dest}")

                else:

                    dup.unlink()

                    print(f"  Deleted : {dup}")

                removed_count += 1

                freed_bytes   += size

            except Exception as e:

                print(f"  Error   : {dup} -> {e}")


    print(f"\n  Done! {removed_count} file(s) removed. "

          f"Freed: {_format_size(freed_bytes)}")



# ============================================================

# INTERACTIVE MODE: Pick Which to Delete Per Group

# ============================================================


def interactive_delete(duplicates):

    if not duplicates:

        print("  No duplicates to review.")

        return


    print("\n  Interactive mode: Review each group and choose what to delete.")

    print("  Enter file numbers to DELETE (comma separated), or 'skip' to keep all.\n")


    total_freed = 0


    for i, (file_hash, files) in enumerate(duplicates.items(), 1):

        size_str = _format_size(files[0].stat().st_size)

        print(f"\n  Group {i}/{len(duplicates)}  |  {size_str} each")

        print("  " + "-"*50)


        for j, f in enumerate(files, 1):

            try:

                mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%d-%m-%Y %H:%M")

            except:

                mtime = "unknown"

            print(f"  [{j}] {f}  (modified: {mtime})")


        choice = input("\n  Delete file numbers (e.g. 2,3) or 'skip': ").strip().lower()


        if choice == "skip" or not choice:

            continue


        try:

            indices = [int(x.strip()) - 1 for x in choice.split(",")]

            for idx in indices:

                if 0 <= idx < len(files):

                    f = files[idx]

                    size = f.stat().st_size

                    f.unlink()

                    print(f"  Deleted: {f}")

                    total_freed += size

                else:

                    print(f"  Invalid index: {idx + 1}")

        except ValueError:

            print("  Invalid input, skipping group.")


    print(f"\n  Interactive cleanup done. Freed: {_format_size(total_freed)}")



# ============================================================

# HELPER: Format File Size

# ============================================================


def _format_size(size_bytes):

    for unit in ["B", "KB", "MB", "GB", "TB"]:

        if size_bytes < 1024:

            return f"{size_bytes:.1f} {unit}"

        size_bytes /= 1024

    return f"{size_bytes:.1f} PB"



# ============================================================

# MAIN MENU

# ============================================================


def print_menu():

    print("\n" + "-"*48)

    print("  DUPLICATE FILE FINDER")

    print("-"*48)

    print("  1. Scan folder for duplicates")

    print("  2. Scan with file type filter")

    print("  3. Auto-delete duplicates (keep first)")

    print("  4. Auto-delete duplicates (keep newest)")

    print("  5. Move duplicates to a folder")

    print("  6. Interactive delete (review each group)")

    print("  7. Save report to JSON")

    print("  0. Exit")

    print("-"*48)



def main():

    print("\n" + "="*55)

    print("     DUPLICATE FILE FINDER")

    print("="*55)

    print("\n  Uses MD5 hashing for accurate duplicate detection.")

    print("  Pre-filters by file size for maximum speed.\n")


    duplicates = {}

    last_folder = ""


    while True:

        print_menu()

        choice = input("  > ").strip()


        if choice in ["1", "2", "3", "4", "5", "6"]:

            if choice in ["1", "2"]:

                folder = input("\n  Enter folder path to scan: ").strip()

                if not os.path.isdir(folder):

                    print("  Invalid folder path.")

                    continue

                last_folder = folder


                recursive = input("  Include subfolders? (y/n, default y): ").strip().lower()

                recursive = recursive != "n"


                extensions = None

                if choice == "2":

                    ext_input = input("  File types (e.g. .jpg .png .pdf): ").strip()

                    extensions = [e.lower() if e.startswith(".") else f".{e.lower()}"

                                  for e in ext_input.split()] if ext_input else None


                duplicates = find_duplicates(folder, recursive, extensions)

                display_results(duplicates)


            elif not duplicates:

                print("\n  Please scan a folder first (Option 1 or 2).")

                continue


            if choice == "3":

                delete_duplicates(duplicates, strategy="keep_first")


            elif choice == "4":

                delete_duplicates(duplicates, strategy="keep_newest")


            elif choice == "5":

                move_path = input("\n  Move duplicates to folder: ").strip()

                delete_duplicates(duplicates, strategy="keep_first", move_to=move_path)


            elif choice == "6":

                interactive_delete(duplicates)


        elif choice == "7":

            if not duplicates:

                print("\n  No scan results to save. Run a scan first.")

            else:

                save_report(duplicates, last_folder)


        elif choice == "0":

            print("\n  Goodbye!\n")

            break


        else:

            print("  Invalid choice.")



# ============================================================

# RUN

# ============================================================


if __name__ == "__main__":

    main()

No comments: