Local File Integrity Monitor

import os

import hashlib

import json

from watchdog.observers import Observer

from watchdog.events import FileSystemEventHandler

import time


HASH_DB = "file_hashes.json"



# -----------------------------------------

# Hash Function

# -----------------------------------------

def calculate_hash(file_path):

    sha256 = hashlib.sha256()


    try:

        with open(file_path, "rb") as f:

            while chunk := f.read(4096):

                sha256.update(chunk)

        return sha256.hexdigest()

    except:

        return None



# -----------------------------------------

# Load & Save Hash Database

# -----------------------------------------

def load_hash_db():

    if os.path.exists(HASH_DB):

        with open(HASH_DB, "r") as f:

            return json.load(f)

    return {}



def save_hash_db(db):

    with open(HASH_DB, "w") as f:

        json.dump(db, f, indent=4)



# -----------------------------------------

# Initial Scan

# -----------------------------------------

def initial_scan(folder_path):

    db = {}


    print(" Performing initial scan...\n")


    for root, dirs, files in os.walk(folder_path):

        for file in files:

            path = os.path.join(root, file)

            file_hash = calculate_hash(path)

            if file_hash:

                db[path] = file_hash


    save_hash_db(db)

    print(" Initial scan complete.")

    return db



# -----------------------------------------

# Event Handler

# -----------------------------------------

class IntegrityHandler(FileSystemEventHandler):

    def __init__(self, folder_path):

        self.folder_path = folder_path

        self.hash_db = load_hash_db()


        if not self.hash_db:

            self.hash_db = initial_scan(folder_path)


    def on_modified(self, event):

        if event.is_directory:

            return


        file_path = event.src_path

        new_hash = calculate_hash(file_path)


        if file_path in self.hash_db:

            if self.hash_db[file_path] != new_hash:

                print(f"⚠ ALERT: File modified → {file_path}")

                self.hash_db[file_path] = new_hash

                save_hash_db(self.hash_db)


        else:

            print(f" New file detected → {file_path}")

            self.hash_db[file_path] = new_hash

            save_hash_db(self.hash_db)


    def on_deleted(self, event):

        if event.src_path in self.hash_db:

            print(f" File deleted → {event.src_path}")

            del self.hash_db[event.src_path]

            save_hash_db(self.hash_db)



# -----------------------------------------

# MAIN

# -----------------------------------------

if __name__ == "__main__":

    folder = input("Enter folder path to monitor: ").strip()


    if not os.path.isdir(folder):

        print(" Invalid folder path.")

        exit()


    print("\n Monitoring started... (Press Ctrl+C to stop)\n")


    event_handler = IntegrityHandler(folder)

    observer = Observer()

    observer.schedule(event_handler, folder, recursive=True)

    observer.start()


    try:

        while True:

            time.sleep(1)

    except KeyboardInterrupt:

        observer.stop()


    observer.join()

No comments: