import os
import hashlib
import json
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
HASH_DB = "file_hashes.json"
# -----------------------------------------
# Hash Function
# -----------------------------------------
def calculate_hash(file_path):
sha256 = hashlib.sha256()
try:
with open(file_path, "rb") as f:
while chunk := f.read(4096):
sha256.update(chunk)
return sha256.hexdigest()
except:
return None
# -----------------------------------------
# Load & Save Hash Database
# -----------------------------------------
def load_hash_db():
if os.path.exists(HASH_DB):
with open(HASH_DB, "r") as f:
return json.load(f)
return {}
def save_hash_db(db):
with open(HASH_DB, "w") as f:
json.dump(db, f, indent=4)
# -----------------------------------------
# Initial Scan
# -----------------------------------------
def initial_scan(folder_path):
db = {}
print(" Performing initial scan...\n")
for root, dirs, files in os.walk(folder_path):
for file in files:
path = os.path.join(root, file)
file_hash = calculate_hash(path)
if file_hash:
db[path] = file_hash
save_hash_db(db)
print(" Initial scan complete.")
return db
# -----------------------------------------
# Event Handler
# -----------------------------------------
class IntegrityHandler(FileSystemEventHandler):
def __init__(self, folder_path):
self.folder_path = folder_path
self.hash_db = load_hash_db()
if not self.hash_db:
self.hash_db = initial_scan(folder_path)
def on_modified(self, event):
if event.is_directory:
return
file_path = event.src_path
new_hash = calculate_hash(file_path)
if file_path in self.hash_db:
if self.hash_db[file_path] != new_hash:
print(f"⚠ ALERT: File modified → {file_path}")
self.hash_db[file_path] = new_hash
save_hash_db(self.hash_db)
else:
print(f" New file detected → {file_path}")
self.hash_db[file_path] = new_hash
save_hash_db(self.hash_db)
def on_deleted(self, event):
if event.src_path in self.hash_db:
print(f" File deleted → {event.src_path}")
del self.hash_db[event.src_path]
save_hash_db(self.hash_db)
# -----------------------------------------
# MAIN
# -----------------------------------------
if __name__ == "__main__":
folder = input("Enter folder path to monitor: ").strip()
if not os.path.isdir(folder):
print(" Invalid folder path.")
exit()
print("\n Monitoring started... (Press Ctrl+C to stop)\n")
event_handler = IntegrityHandler(folder)
observer = Observer()
observer.schedule(event_handler, folder, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
No comments:
Post a Comment