import os
import hashlib
import json
import shutil
from pathlib import Path
from collections import defaultdict
from datetime import datetime
# ============================================================
# CONFIGURATION
# ============================================================
REPORT_FILE = "duplicate_report.json"
CHUNK_SIZE = 8192 # bytes read per chunk for hashing
# ============================================================
# HASH A FILE (MD5)
# ============================================================
def get_file_hash(filepath, algorithm="md5"):
"""
Compute hash of a file in chunks to handle large files.
Returns hex digest string or None on error.
"""
try:
h = hashlib.new(algorithm)
with open(filepath, "rb") as f:
while chunk := f.read(CHUNK_SIZE):
h.update(chunk)
return h.hexdigest()
except (PermissionError, OSError):
return None
# ============================================================
# QUICK PRE-FILTER: Group by File Size First
# ============================================================
def group_by_size(folder, recursive=True, extensions=None):
"""
First pass: group files by size.
Only files sharing the same size are candidates for duplication.
This avoids hashing every single file.
"""
size_map = defaultdict(list)
if recursive:
all_files = Path(folder).rglob("*")
else:
all_files = Path(folder).glob("*")
for f in all_files:
if not f.is_file():
continue
if extensions:
if f.suffix.lower() not in extensions:
continue
try:
size = f.stat().st_size
if size > 0: # skip empty files
size_map[size].append(f)
except OSError:
continue
# Keep only groups with 2+ files (potential duplicates)
return {size: files for size, files in size_map.items() if len(files) > 1}
# ============================================================
# SECOND PASS: Hash Files Within Same-Size Groups
# ============================================================
def find_duplicates(folder, recursive=True, extensions=None, progress=True):
"""
Full duplicate detection:
1. Group by size (fast)
2. Hash same-size files (thorough)
Returns dict: hash -> list of duplicate file paths
"""
print(f"\n Scanning: {folder}")
print(f" Mode : {'Recursive' if recursive else 'Top-level only'}")
if extensions:
print(f" Filter : {', '.join(extensions)}")
# Step 1 — Group by size
print("\n Step 1/2: Grouping files by size...")
size_groups = group_by_size(folder, recursive, extensions)
candidate_count = sum(len(v) for v in size_groups.values())
print(f" Found {candidate_count} candidate file(s) in {len(size_groups)} size group(s)")
if not size_groups:
print("\n No duplicate candidates found.")
return {}
# Step 2 — Hash candidates
print("\n Step 2/2: Computing file hashes...")
hash_map = defaultdict(list)
processed = 0
for size, files in size_groups.items():
for f in files:
file_hash = get_file_hash(f)
if file_hash:
hash_map[file_hash].append(f)
processed += 1
if progress and processed % 50 == 0:
print(f" Processed {processed}/{candidate_count} files...", end="\r")
print(f" Processed {processed}/{candidate_count} files. ")
# Keep only actual duplicates (2+ files with same hash)
duplicates = {h: files for h, files in hash_map.items() if len(files) > 1}
return duplicates
# ============================================================
# DISPLAY RESULTS
# ============================================================
def display_results(duplicates):
if not duplicates:
print("\n No duplicates found!")
return
total_groups = len(duplicates)
total_files = sum(len(v) for v in duplicates.values())
total_wasted = 0
print("\n" + "="*60)
print(f" DUPLICATE FILES FOUND")
print("="*60)
print(f" Duplicate groups : {total_groups}")
print(f" Total duplicates : {total_files}")
print("="*60)
for i, (file_hash, files) in enumerate(duplicates.items(), 1):
size_bytes = files[0].stat().st_size
size_str = _format_size(size_bytes)
wasted = size_bytes * (len(files) - 1)
total_wasted += wasted
print(f"\n Group {i} | {len(files)} files | {size_str} each | "
f"Wasted: {_format_size(wasted)}")
print(f" Hash: {file_hash[:16]}...")
print(" " + "-"*54)
for j, f in enumerate(files):
try:
mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%d-%m-%Y %H:%M")
except:
mtime = "unknown"
marker = " [ORIGINAL?]" if j == 0 else " [DUPLICATE]"
print(f" {marker} {f}")
print(f" Modified: {mtime}")
print("\n" + "="*60)
print(f" Total wasted space: {_format_size(total_wasted)}")
print("="*60)
# ============================================================
# SAVE REPORT
# ============================================================
def save_report(duplicates, folder):
report = {
"scanned_folder": str(folder),
"scan_time": datetime.now().strftime("%d-%m-%Y %H:%M:%S"),
"total_groups": len(duplicates),
"total_files": sum(len(v) for v in duplicates.values()),
"groups": []
}
for file_hash, files in duplicates.items():
group = {
"hash": file_hash,
"size": _format_size(files[0].stat().st_size),
"files": [str(f) for f in files]
}
report["groups"].append(group)
with open(REPORT_FILE, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2)
print(f"\n Report saved: {REPORT_FILE}")
# ============================================================
# DELETE DUPLICATES (Keep First / Newest / Oldest)
# ============================================================
def delete_duplicates(duplicates, strategy="keep_first", move_to=None):
"""
strategy:
keep_first - keep the first file in each group, delete rest
keep_newest - keep most recently modified file
keep_oldest - keep oldest file
move_to: if set, move duplicates here instead of deleting
"""
if not duplicates:
print(" No duplicates to remove.")
return
if move_to:
Path(move_to).mkdir(parents=True, exist_ok=True)
action = f"move to '{move_to}'"
else:
action = "DELETE PERMANENTLY"
print(f"\n Strategy : {strategy}")
print(f" Action : {action}")
confirm = input(f"\n Confirm? This will {action} duplicates. (yes/no): ").strip().lower()
if confirm != "yes":
print(" Cancelled.")
return
removed_count = 0
freed_bytes = 0
for file_hash, files in duplicates.items():
# Sort by strategy to pick which to keep
if strategy == "keep_newest":
files_sorted = sorted(files, key=lambda f: f.stat().st_mtime, reverse=True)
elif strategy == "keep_oldest":
files_sorted = sorted(files, key=lambda f: f.stat().st_mtime)
else:
files_sorted = files # keep_first = as found
keeper = files_sorted[0]
to_remove = files_sorted[1:]
print(f"\n Keeping : {keeper.name}")
for dup in to_remove:
try:
size = dup.stat().st_size
if move_to:
dest = Path(move_to) / dup.name
# Handle name collision in destination
if dest.exists():
dest = Path(move_to) / f"{dup.stem}_{file_hash[:6]}{dup.suffix}"
shutil.move(str(dup), str(dest))
print(f" Moved : {dup.name} -> {dest}")
else:
dup.unlink()
print(f" Deleted : {dup}")
removed_count += 1
freed_bytes += size
except Exception as e:
print(f" Error : {dup} -> {e}")
print(f"\n Done! {removed_count} file(s) removed. "
f"Freed: {_format_size(freed_bytes)}")
# ============================================================
# INTERACTIVE MODE: Pick Which to Delete Per Group
# ============================================================
def interactive_delete(duplicates):
if not duplicates:
print(" No duplicates to review.")
return
print("\n Interactive mode: Review each group and choose what to delete.")
print(" Enter file numbers to DELETE (comma separated), or 'skip' to keep all.\n")
total_freed = 0
for i, (file_hash, files) in enumerate(duplicates.items(), 1):
size_str = _format_size(files[0].stat().st_size)
print(f"\n Group {i}/{len(duplicates)} | {size_str} each")
print(" " + "-"*50)
for j, f in enumerate(files, 1):
try:
mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%d-%m-%Y %H:%M")
except:
mtime = "unknown"
print(f" [{j}] {f} (modified: {mtime})")
choice = input("\n Delete file numbers (e.g. 2,3) or 'skip': ").strip().lower()
if choice == "skip" or not choice:
continue
try:
indices = [int(x.strip()) - 1 for x in choice.split(",")]
for idx in indices:
if 0 <= idx < len(files):
f = files[idx]
size = f.stat().st_size
f.unlink()
print(f" Deleted: {f}")
total_freed += size
else:
print(f" Invalid index: {idx + 1}")
except ValueError:
print(" Invalid input, skipping group.")
print(f"\n Interactive cleanup done. Freed: {_format_size(total_freed)}")
# ============================================================
# HELPER: Format File Size
# ============================================================
def _format_size(size_bytes):
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} PB"
# ============================================================
# MAIN MENU
# ============================================================
def print_menu():
print("\n" + "-"*48)
print(" DUPLICATE FILE FINDER")
print("-"*48)
print(" 1. Scan folder for duplicates")
print(" 2. Scan with file type filter")
print(" 3. Auto-delete duplicates (keep first)")
print(" 4. Auto-delete duplicates (keep newest)")
print(" 5. Move duplicates to a folder")
print(" 6. Interactive delete (review each group)")
print(" 7. Save report to JSON")
print(" 0. Exit")
print("-"*48)
def main():
print("\n" + "="*55)
print(" DUPLICATE FILE FINDER")
print("="*55)
print("\n Uses MD5 hashing for accurate duplicate detection.")
print(" Pre-filters by file size for maximum speed.\n")
duplicates = {}
last_folder = ""
while True:
print_menu()
choice = input(" > ").strip()
if choice in ["1", "2", "3", "4", "5", "6"]:
if choice in ["1", "2"]:
folder = input("\n Enter folder path to scan: ").strip()
if not os.path.isdir(folder):
print(" Invalid folder path.")
continue
last_folder = folder
recursive = input(" Include subfolders? (y/n, default y): ").strip().lower()
recursive = recursive != "n"
extensions = None
if choice == "2":
ext_input = input(" File types (e.g. .jpg .png .pdf): ").strip()
extensions = [e.lower() if e.startswith(".") else f".{e.lower()}"
for e in ext_input.split()] if ext_input else None
duplicates = find_duplicates(folder, recursive, extensions)
display_results(duplicates)
elif not duplicates:
print("\n Please scan a folder first (Option 1 or 2).")
continue
if choice == "3":
delete_duplicates(duplicates, strategy="keep_first")
elif choice == "4":
delete_duplicates(duplicates, strategy="keep_newest")
elif choice == "5":
move_path = input("\n Move duplicates to folder: ").strip()
delete_duplicates(duplicates, strategy="keep_first", move_to=move_path)
elif choice == "6":
interactive_delete(duplicates)
elif choice == "7":
if not duplicates:
print("\n No scan results to save. Run a scan first.")
else:
save_report(duplicates, last_folder)
elif choice == "0":
print("\n Goodbye!\n")
break
else:
print(" Invalid choice.")
# ============================================================
# RUN
# ============================================================
if __name__ == "__main__":
main()