Motion Detection Security Recorder

"""

Motion Detection Security Recorder

---------------------------------

Usage:

    python motion_recorder.py

    python motion_recorder.py --source 0                      # default webcam

    python motion_recorder.py --source "video.mp4"

    python motion_recorder.py --source "rtsp://...."


Outputs:

    ./recordings/YYYYMMDD_HHMMSS_motion.mp4

    ./recordings/recording_log.csv  (optional)


Notes:

- Adjust MIN_AREA and SENSITIVITY for your camera / scene.

- Pre-buffer keeps recent frames so clip contains seconds before detection.

"""


import cv2

import numpy as np

import argparse

import time

from datetime import datetime

from collections import deque

import os

import csv


# -------------------------

# Configuration (tweakable)

# -------------------------

OUTPUT_DIR = "recordings"

LOG_CSV = True            # write a CSV log of recorded clips

FPS = 20                  # expected framerate for recording (adjust to your camera)

FRAME_WIDTH = 640         # resize frames for faster processing

FRAME_HEIGHT = 480

MIN_AREA = 1200           # minimum contour area to be considered motion (tweak)

SENSITIVITY = 25          # how much difference triggers motion (lower => more sensitive)

PRE_BUFFER_SECONDS = 3    # include 3 seconds before motion started

POST_RECORD_SECONDS = 4   # record N seconds after motion stops

CODEC = "mp4v"            # codec fourcc (try 'XVID' or 'avc1' if 'mp4v' not available)


# -------------------------

# Helpers

# -------------------------

def ensure_dir(path):

    if not os.path.exists(path):

        os.makedirs(path, exist_ok=True)


def timestamp_str():

    return datetime.now().strftime("%Y%m%d_%H%M%S")


def make_output_filename():

    return f"{timestamp_str()}_motion.mp4"


def write_log(csv_path, row):

    header = ["filename","start_time","end_time","duration_s","frames","source"]

    exists = os.path.exists(csv_path)

    with open(csv_path, "a", newline="", encoding="utf-8") as f:

        w = csv.writer(f)

        if not exists:

            w.writerow(header)

        w.writerow(row)


# -------------------------

# Motion Recorder class

# -------------------------

class MotionRecorder:

    def __init__(self, source=0, output_dir=OUTPUT_DIR):

        self.source = source

        self.output_dir = output_dir

        ensure_dir(self.output_dir)

        self.log_path = os.path.join(self.output_dir, "recording_log.csv") if LOG_CSV else None


        self.cap = cv2.VideoCapture(self.source)

        if not self.cap.isOpened():

            raise RuntimeError(f"Cannot open source: {source}")


        # If camera provides FPS, override

        native_fps = self.cap.get(cv2.CAP_PROP_FPS)

        if native_fps and native_fps > 0:

            self.fps = native_fps

        else:

            self.fps = FPS


        # Use resize dims

        self.width = FRAME_WIDTH

        self.height = FRAME_HEIGHT


        # background subtractor (more robust) + simple diff fallback

        self.bg_sub = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16, detectShadows=True)

        self.pre_buffer = deque(maxlen=int(self.fps * PRE_BUFFER_SECONDS))

        self.is_recording = False

        self.writer = None

        self.record_start_time = None

        self.frames_recorded = 0

        self.last_motion_time = None


    def release(self):

        if self.cap:

            self.cap.release()

        if self.writer:

            self.writer.release()

        cv2.destroyAllWindows()


    def start(self):

        print("Starting motion detection. Press 'q' to quit.")

        try:

            while True:

                ret, frame = self.cap.read()

                if not ret:

                    print("Stream ended or cannot fetch frame.")

                    break


                # resize for consistent processing

                frame = cv2.resize(frame, (self.width, self.height))

                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

                gray_blur = cv2.GaussianBlur(gray, (5,5), 0)


                # background subtraction mask

                fgmask = self.bg_sub.apply(gray_blur)

                # threshold to reduce noise

                _, thresh = cv2.threshold(fgmask, SENSITIVITY, 255, cv2.THRESH_BINARY)

                # morphological operations to reduce small noise

                kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))

                clean = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=1)

                clean = cv2.morphologyEx(clean, cv2.MORPH_DILATE, kernel, iterations=2)


                # find contours

                contours, _ = cv2.findContours(clean, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

                motion_detected = False

                for cnt in contours:

                    if cv2.contourArea(cnt) >= MIN_AREA:

                        motion_detected = True

                        (x,y,w,h) = cv2.boundingRect(cnt)

                        # draw rectangle for preview

                        cv2.rectangle(frame, (x,y), (x+w, y+h), (0,255,0), 2)


                # push frame into pre-buffer (store color frames)

                self.pre_buffer.append(frame.copy())


                # Recording logic

                now = time.time()

                if motion_detected:

                    self.last_motion_time = now

                    if not self.is_recording:

                        # start new recording using pre-buffer

                        fname = make_output_filename()

                        out_path = os.path.join(self.output_dir, fname)

                        fourcc = cv2.VideoWriter_fourcc(*CODEC)

                        self.writer = cv2.VideoWriter(out_path, fourcc, self.fps, (self.width, self.height))

                        if not self.writer.isOpened():

                            print("Warning: VideoWriter failed to open. Check codec availability.")

                        # flush pre-buffer to writer

                        for bf in list(self.pre_buffer):

                            if self.writer:

                                self.writer.write(bf)

                        self.is_recording = True

                        self.record_start_time = datetime.now()

                        self.frames_recorded = len(self.pre_buffer)

                        self.current_out_path = out_path

                        print(f"[{self.record_start_time}] Motion started -> Recording to {out_path}")


                # If recording, write current frame and manage stop condition

                if self.is_recording:

                    if self.writer:

                        self.writer.write(frame)

                    self.frames_recorded += 1

                    # stop if no motion for POST_RECORD_SECONDS

                    if self.last_motion_time and (now - self.last_motion_time) > POST_RECORD_SECONDS:

                        # finalize

                        record_end = datetime.now()

                        duration = (record_end - self.record_start_time).total_seconds()

                        print(f"[{record_end}] Motion ended. Duration: {duration:.2f}s, Frames: {self.frames_recorded}")

                        # close writer

                        if self.writer:

                            self.writer.release()

                            self.writer = None

                        # write log

                        if LOG_CSV and self.log_path:

                            write_log(self.log_path, [

                                os.path.basename(self.current_out_path),

                                self.record_start_time.strftime("%Y-%m-%d %H:%M:%S"),

                                record_end.strftime("%Y-%m-%d %H:%M:%S"),

                                f"{duration:.2f}",

                                str(self.frames_recorded),

                                str(self.source)

                            ])

                        self.is_recording = False

                        self.frames_recorded = 0

                        # clear pre_buffer so next record begins clean

                        self.pre_buffer.clear()


                # Show simple preview window (optional)

                preview = frame.copy()

                status_text = f"REC" if self.is_recording else "Idle"

                cv2.putText(preview, f"Status: {status_text}", (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255) if self.is_recording else (0,255,0), 2)

                cv2.imshow("Motion Recorder - Preview", preview)


                # keyboard quit

                key = cv2.waitKey(1) & 0xFF

                if key == ord('q'):

                    print("Quit requested by user.")

                    break


        except KeyboardInterrupt:

            print("Interrupted by user.")

        finally:

            # cleanup

            if self.writer:

                self.writer.release()

                if LOG_CSV and self.log_path:

                    # if we were recording when interrupted, log end

                    end_time = datetime.now()

                    duration = (end_time - self.record_start_time).total_seconds() if self.record_start_time else 0

                    write_log(self.log_path, [

                        os.path.basename(self.current_out_path),

                        self.record_start_time.strftime("%Y-%m-%d %H:%M:%S") if self.record_start_time else "",

                        end_time.strftime("%Y-%m-%d %H:%M:%S"),

                        f"{duration:.2f}",

                        str(self.frames_recorded),

                        str(self.source)

                    ])

            self.release()

            print("Released resources. Exiting.")


# -------------------------

# CLI arg parsing

# -------------------------

def parse_args():

    parser = argparse.ArgumentParser(description="Motion Detection Security Recorder")

    parser.add_argument("--source", type=str, default="0", help="Video source: 0 (webcam), file path, or RTSP URL")

    parser.add_argument("--out", type=str, default=OUTPUT_DIR, help="Output recordings folder")

    parser.add_argument("--fps", type=int, default=FPS, help="Recording FPS fallback")

    parser.add_argument("--w", type=int, default=FRAME_WIDTH, help="Frame width (resize)")

    parser.add_argument("--h", type=int, default=FRAME_HEIGHT, help="Frame height (resize)")

    parser.add_argument("--min-area", type=int, default=MIN_AREA, help="Min contour area to detect motion")

    parser.add_argument("--sensitivity", type=int, default=SENSITIVITY, help="Threshold sensitivity for mask")

    args = parser.parse_args()

    return args


# -------------------------

# Entrypoint

# -------------------------

def main():

    args = parse_args()

    source = args.source

    # convert "0" -> 0 for webcam

    if source.isdigit():

        source = int(source)

    global FPS, FRAME_WIDTH, FRAME_HEIGHT, MIN_AREA, SENSITIVITY, OUTPUT_DIR

    FPS = args.fps

    FRAME_WIDTH = args.w

    FRAME_HEIGHT = args.h

    MIN_AREA = args.min_area

    SENSITIVITY = args.sensitivity

    OUTPUT_DIR = args.out


    ensure_dir(OUTPUT_DIR)

    recorder = MotionRecorder(source=source, output_dir=OUTPUT_DIR)

    recorder.start()


if __name__ == "__main__":

    main()


No comments: