"""
Motion Detection Security Recorder
---------------------------------
Usage:
python motion_recorder.py
python motion_recorder.py --source 0 # default webcam
python motion_recorder.py --source "video.mp4"
python motion_recorder.py --source "rtsp://...."
Outputs:
./recordings/YYYYMMDD_HHMMSS_motion.mp4
./recordings/recording_log.csv (optional)
Notes:
- Adjust MIN_AREA and SENSITIVITY for your camera / scene.
- Pre-buffer keeps recent frames so clip contains seconds before detection.
"""
import cv2
import numpy as np
import argparse
import time
from datetime import datetime
from collections import deque
import os
import csv
# -------------------------
# Configuration (tweakable)
# -------------------------
OUTPUT_DIR = "recordings"
LOG_CSV = True # write a CSV log of recorded clips
FPS = 20 # expected framerate for recording (adjust to your camera)
FRAME_WIDTH = 640 # resize frames for faster processing
FRAME_HEIGHT = 480
MIN_AREA = 1200 # minimum contour area to be considered motion (tweak)
SENSITIVITY = 25 # how much difference triggers motion (lower => more sensitive)
PRE_BUFFER_SECONDS = 3 # include 3 seconds before motion started
POST_RECORD_SECONDS = 4 # record N seconds after motion stops
CODEC = "mp4v" # codec fourcc (try 'XVID' or 'avc1' if 'mp4v' not available)
# -------------------------
# Helpers
# -------------------------
def ensure_dir(path):
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def timestamp_str():
return datetime.now().strftime("%Y%m%d_%H%M%S")
def make_output_filename():
return f"{timestamp_str()}_motion.mp4"
def write_log(csv_path, row):
header = ["filename","start_time","end_time","duration_s","frames","source"]
exists = os.path.exists(csv_path)
with open(csv_path, "a", newline="", encoding="utf-8") as f:
w = csv.writer(f)
if not exists:
w.writerow(header)
w.writerow(row)
# -------------------------
# Motion Recorder class
# -------------------------
class MotionRecorder:
def __init__(self, source=0, output_dir=OUTPUT_DIR):
self.source = source
self.output_dir = output_dir
ensure_dir(self.output_dir)
self.log_path = os.path.join(self.output_dir, "recording_log.csv") if LOG_CSV else None
self.cap = cv2.VideoCapture(self.source)
if not self.cap.isOpened():
raise RuntimeError(f"Cannot open source: {source}")
# If camera provides FPS, override
native_fps = self.cap.get(cv2.CAP_PROP_FPS)
if native_fps and native_fps > 0:
self.fps = native_fps
else:
self.fps = FPS
# Use resize dims
self.width = FRAME_WIDTH
self.height = FRAME_HEIGHT
# background subtractor (more robust) + simple diff fallback
self.bg_sub = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16, detectShadows=True)
self.pre_buffer = deque(maxlen=int(self.fps * PRE_BUFFER_SECONDS))
self.is_recording = False
self.writer = None
self.record_start_time = None
self.frames_recorded = 0
self.last_motion_time = None
def release(self):
if self.cap:
self.cap.release()
if self.writer:
self.writer.release()
cv2.destroyAllWindows()
def start(self):
print("Starting motion detection. Press 'q' to quit.")
try:
while True:
ret, frame = self.cap.read()
if not ret:
print("Stream ended or cannot fetch frame.")
break
# resize for consistent processing
frame = cv2.resize(frame, (self.width, self.height))
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray_blur = cv2.GaussianBlur(gray, (5,5), 0)
# background subtraction mask
fgmask = self.bg_sub.apply(gray_blur)
# threshold to reduce noise
_, thresh = cv2.threshold(fgmask, SENSITIVITY, 255, cv2.THRESH_BINARY)
# morphological operations to reduce small noise
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))
clean = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=1)
clean = cv2.morphologyEx(clean, cv2.MORPH_DILATE, kernel, iterations=2)
# find contours
contours, _ = cv2.findContours(clean, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
motion_detected = False
for cnt in contours:
if cv2.contourArea(cnt) >= MIN_AREA:
motion_detected = True
(x,y,w,h) = cv2.boundingRect(cnt)
# draw rectangle for preview
cv2.rectangle(frame, (x,y), (x+w, y+h), (0,255,0), 2)
# push frame into pre-buffer (store color frames)
self.pre_buffer.append(frame.copy())
# Recording logic
now = time.time()
if motion_detected:
self.last_motion_time = now
if not self.is_recording:
# start new recording using pre-buffer
fname = make_output_filename()
out_path = os.path.join(self.output_dir, fname)
fourcc = cv2.VideoWriter_fourcc(*CODEC)
self.writer = cv2.VideoWriter(out_path, fourcc, self.fps, (self.width, self.height))
if not self.writer.isOpened():
print("Warning: VideoWriter failed to open. Check codec availability.")
# flush pre-buffer to writer
for bf in list(self.pre_buffer):
if self.writer:
self.writer.write(bf)
self.is_recording = True
self.record_start_time = datetime.now()
self.frames_recorded = len(self.pre_buffer)
self.current_out_path = out_path
print(f"[{self.record_start_time}] Motion started -> Recording to {out_path}")
# If recording, write current frame and manage stop condition
if self.is_recording:
if self.writer:
self.writer.write(frame)
self.frames_recorded += 1
# stop if no motion for POST_RECORD_SECONDS
if self.last_motion_time and (now - self.last_motion_time) > POST_RECORD_SECONDS:
# finalize
record_end = datetime.now()
duration = (record_end - self.record_start_time).total_seconds()
print(f"[{record_end}] Motion ended. Duration: {duration:.2f}s, Frames: {self.frames_recorded}")
# close writer
if self.writer:
self.writer.release()
self.writer = None
# write log
if LOG_CSV and self.log_path:
write_log(self.log_path, [
os.path.basename(self.current_out_path),
self.record_start_time.strftime("%Y-%m-%d %H:%M:%S"),
record_end.strftime("%Y-%m-%d %H:%M:%S"),
f"{duration:.2f}",
str(self.frames_recorded),
str(self.source)
])
self.is_recording = False
self.frames_recorded = 0
# clear pre_buffer so next record begins clean
self.pre_buffer.clear()
# Show simple preview window (optional)
preview = frame.copy()
status_text = f"REC" if self.is_recording else "Idle"
cv2.putText(preview, f"Status: {status_text}", (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255) if self.is_recording else (0,255,0), 2)
cv2.imshow("Motion Recorder - Preview", preview)
# keyboard quit
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
print("Quit requested by user.")
break
except KeyboardInterrupt:
print("Interrupted by user.")
finally:
# cleanup
if self.writer:
self.writer.release()
if LOG_CSV and self.log_path:
# if we were recording when interrupted, log end
end_time = datetime.now()
duration = (end_time - self.record_start_time).total_seconds() if self.record_start_time else 0
write_log(self.log_path, [
os.path.basename(self.current_out_path),
self.record_start_time.strftime("%Y-%m-%d %H:%M:%S") if self.record_start_time else "",
end_time.strftime("%Y-%m-%d %H:%M:%S"),
f"{duration:.2f}",
str(self.frames_recorded),
str(self.source)
])
self.release()
print("Released resources. Exiting.")
# -------------------------
# CLI arg parsing
# -------------------------
def parse_args():
parser = argparse.ArgumentParser(description="Motion Detection Security Recorder")
parser.add_argument("--source", type=str, default="0", help="Video source: 0 (webcam), file path, or RTSP URL")
parser.add_argument("--out", type=str, default=OUTPUT_DIR, help="Output recordings folder")
parser.add_argument("--fps", type=int, default=FPS, help="Recording FPS fallback")
parser.add_argument("--w", type=int, default=FRAME_WIDTH, help="Frame width (resize)")
parser.add_argument("--h", type=int, default=FRAME_HEIGHT, help="Frame height (resize)")
parser.add_argument("--min-area", type=int, default=MIN_AREA, help="Min contour area to detect motion")
parser.add_argument("--sensitivity", type=int, default=SENSITIVITY, help="Threshold sensitivity for mask")
args = parser.parse_args()
return args
# -------------------------
# Entrypoint
# -------------------------
def main():
args = parse_args()
source = args.source
# convert "0" -> 0 for webcam
if source.isdigit():
source = int(source)
global FPS, FRAME_WIDTH, FRAME_HEIGHT, MIN_AREA, SENSITIVITY, OUTPUT_DIR
FPS = args.fps
FRAME_WIDTH = args.w
FRAME_HEIGHT = args.h
MIN_AREA = args.min_area
SENSITIVITY = args.sensitivity
OUTPUT_DIR = args.out
ensure_dir(OUTPUT_DIR)
recorder = MotionRecorder(source=source, output_dir=OUTPUT_DIR)
recorder.start()
if __name__ == "__main__":
main()
No comments:
Post a Comment