import streamlit as st import cv2 import numpy as np import os import tempfile import time from PIL import Image import matplotlib.pyplot as plt import io import base64 # Set page configuration st.set_page_config( page_title="Motion Detection and Object Tracking", page_icon="🎥", layout="wide" ) # Custom CSS for styling st.markdown(""" """, unsafe_allow_html=True) # Utility functions from backend def f_keepLargeComponents(I, th): """Keep only large connected components in a binary image""" R = np.zeros(I.shape) < 0 unique_labels = np.unique(I.flatten()) for label in unique_labels: if label == 0: pass else: I2 = I == label if np.sum(I2) > th: R = R | I2 return np.float32(255 * R) def convert_to_image(cv2_img): """Convert OpenCV image to PIL Image for Streamlit display""" img_rgb = cv2.cvtColor(cv2_img, cv2.COLOR_BGR2RGB) return Image.fromarray(img_rgb) def get_image_base64(img): """Convert image to base64 for HTML display""" buffered = io.BytesIO() img.save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()).decode() return img_str def process_frame(frame, bg_subtractor, min_area): """Process a frame with background subtraction and component filtering""" # Resize for consistent processing frame = cv2.resize(frame, dsize=(600, 400)) # Apply background subtraction fgmask = bg_subtractor.apply(frame) # Apply morphological operations K_r = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) fgmask = cv2.morphologyEx(np.float32(fgmask), cv2.MORPH_OPEN, K_r) # Connected components analysis num_labels, labels_im = cv2.connectedComponents(np.array(fgmask > 0, np.uint8)) # Keep only large components fgmask = f_keepLargeComponents(labels_im, min_area) # Create visualization mask F = np.zeros(frame.shape, np.uint8) F[:, :, 0], F[:, :, 1], F[:, :, 2] = fgmask, fgmask, fgmask # Combine original and mask combined = np.hstack((frame, F)) has_motion = np.sum(fgmask) > 0 return frame, fgmask, combined, has_motion # App title and description st.markdown("
Motion Detection and Object Tracking
", unsafe_allow_html=True) st.markdown("Track objects and detect motion in videos or image sequences. Upload your own video or use your webcam.") # Sidebar for settings st.sidebar.markdown("
Settings
", unsafe_allow_html=True) # Input source selection input_source = st.sidebar.radio("Select Input Source", ["Upload Video", "Upload Image Sequence", "Sample Video"]) # Motion detection parameters min_area = st.sidebar.slider("Minimum Component Area", 100, 5000, 1000, 100) history = st.sidebar.slider("Background History", 100, 1000, 500, 50) var_threshold = st.sidebar.slider("Variance Threshold", 5, 100, 16, 1) detect_shadows = st.sidebar.checkbox("Detect Shadows", value=True) # Output settings save_output = st.sidebar.checkbox("Save Processed Frames", value=False) min_sequence_frames = st.sidebar.number_input("Minimum Frames in Sequence", 1, 20, 5) # Object detection option use_object_detection = st.sidebar.checkbox("Enable Object Detection", value=False) if use_object_detection: st.sidebar.warning("Note: Object detection requires the cvlib package which must be installed on your Streamlit Cloud deployment.") # Create tabs for different functionalities tab1, tab2, tab3 = st.tabs(["Process Video", "Results", "About"]) with tab1: # Process input based on selection if input_source == "Upload Video": uploaded_file = st.file_uploader("Upload a video file", type=["mp4", "avi", "mov", "mkv"]) if uploaded_file is not None: # Save uploaded file to a temporary file temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') temp_file.write(uploaded_file.read()) # Create background subtractor fgModel = cv2.createBackgroundSubtractorMOG2(history=history, varThreshold=var_threshold, detectShadows=detect_shadows) # Process video if st.button("Process Video"): # Create output directory if saving results if save_output: output_dir = tempfile.mkdtemp() st.session_state['output_dir'] = output_dir st.session_state['saved_frames'] = [] # Open video capture cap = cv2.VideoCapture(temp_file.name) # Process frames frame_idx = 0 motion_sequence = [] sequence_counter = 0 progress_bar = st.progress(0) frame_display = st.empty() status_text = st.empty() # Get total frames for progress calculation total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_idx += 1 # Process frame original, mask, combined, has_motion = process_frame(frame, fgModel, min_area) # Update motion sequence if has_motion: motion_sequence.append(original) elif len(motion_sequence) > 0: # Save sequence if it meets minimum length if len(motion_sequence) >= min_sequence_frames and save_output: sequence_counter += 1 for i, seq_frame in enumerate(motion_sequence): frame_name = f"{sequence_counter}_{i+1}.jpg" frame_path = os.path.join(output_dir, frame_name) # Apply object detection if enabled if use_object_detection: try: import cvlib as cv from cvlib.object_detection import draw_bbox bbox, labels, conf = cv.detect_common_objects(seq_frame) seq_frame = draw_bbox(seq_frame, bbox, labels, conf) except ImportError: st.warning("cvlib not available. Skipping object detection.") cv2.imwrite(frame_path, seq_frame) st.session_state['saved_frames'].append(frame_path) # Reset sequence motion_sequence = [] # Display current frame frame_display.image(convert_to_image(combined), caption="Processing: Original | Foreground Mask", use_column_width=True) # Update progress progress = min(frame_idx / total_frames, 1.0) progress_bar.progress(progress) # Display status status_text.text(f"Processing frame {frame_idx}/{total_frames} | Motion sequences: {sequence_counter}") # Slow down processing slightly for display time.sleep(0.01) # Save final sequence if any if len(motion_sequence) >= min_sequence_frames and save_output: sequence_counter += 1 for i, seq_frame in enumerate(motion_sequence): frame_name = f"{sequence_counter}_{i+1}.jpg" frame_path = os.path.join(output_dir, frame_name) # Apply object detection if enabled if use_object_detection: try: import cvlib as cv from cvlib.object_detection import draw_bbox bbox, labels, conf = cv.detect_common_objects(seq_frame) seq_frame = draw_bbox(seq_frame, bbox, labels, conf) except ImportError: pass cv2.imwrite(frame_path, seq_frame) st.session_state['saved_frames'].append(frame_path) cap.release() # Complete progress_bar.progress(1.0) status_text.success(f"Processing complete! {sequence_counter} motion sequences detected.") # Clean up os.unlink(temp_file.name) # Set results flag if save_output and sequence_counter > 0: st.session_state['has_results'] = True st.info("Results are available in the Results tab.") elif input_source == "Upload Image Sequence": uploaded_files = st.file_uploader("Upload image sequence", type=["jpg", "jpeg", "png"], accept_multiple_files=True) if uploaded_files: # Create background subtractor fgModel = cv2.createBackgroundSubtractorMOG2(history=history, varThreshold=var_threshold, detectShadows=detect_shadows) # Process images if st.button("Process Images"): # Create output directory if saving results if save_output: output_dir = tempfile.mkdtemp() st.session_state['output_dir'] = output_dir st.session_state['saved_frames'] = [] # Process frames frame_idx = 0 motion_sequence = [] sequence_counter = 0 progress_bar = st.progress(0) frame_display = st.empty() status_text = st.empty() total_frames = len(uploaded_files) for file in uploaded_files: frame_idx += 1 # Read image file_bytes = np.asarray(bytearray(file.read()), dtype=np.uint8) frame = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR) # Reset file pointer file.seek(0) # Process frame original, mask, combined, has_motion = process_frame(frame, fgModel, min_area) # Update motion sequence if has_motion: motion_sequence.append(original) elif len(motion_sequence) > 0: # Save sequence if it meets minimum length if len(motion_sequence) >= min_sequence_frames and save_output: sequence_counter += 1 for i, seq_frame in enumerate(motion_sequence): frame_name = f"{sequence_counter}_{i+1}.jpg" frame_path = os.path.join(output_dir, frame_name) # Apply object detection if enabled if use_object_detection: try: import cvlib as cv from cvlib.object_detection import draw_bbox bbox, labels, conf = cv.detect_common_objects(seq_frame) seq_frame = draw_bbox(seq_frame, bbox, labels, conf) except ImportError: st.warning("cvlib not available. Skipping object detection.") cv2.imwrite(frame_path, seq_frame) st.session_state['saved_frames'].append(frame_path) # Reset sequence motion_sequence = [] # Display current frame frame_display.image(convert_to_image(combined), caption="Processing: Original | Foreground Mask", use_column_width=True) # Update progress progress = min(frame_idx / total_frames, 1.0) progress_bar.progress(progress) # Display status status_text.text(f"Processing frame {frame_idx}/{total_frames} | Motion sequences: {sequence_counter}") # Slow down processing slightly for display time.sleep(0.01) # Save final sequence if any if len(motion_sequence) >= min_sequence_frames and save_output: sequence_counter += 1 for i, seq_frame in enumerate(motion_sequence): frame_name = f"{sequence_counter}_{i+1}.jpg" frame_path = os.path.join(output_dir, frame_name) # Apply object detection if enabled if use_object_detection: try: import cvlib as cv from cvlib.object_detection import draw_bbox bbox, labels, conf = cv.detect_common_objects(seq_frame) seq_frame = draw_bbox(seq_frame, bbox, labels, conf) except ImportError: pass cv2.imwrite(frame_path, seq_frame) st.session_state['saved_frames'].append(frame_path) # Complete progress_bar.progress(1.0) status_text.success(f"Processing complete! {sequence_counter} motion sequences detected.") # Set results flag if save_output and sequence_counter > 0: st.session_state['has_results'] = True st.info("Results are available in the Results tab.") else: # Sample video st.info("Using a sample video for demonstration") # Create sample video data - in a real app, you'd use a sample video file sample_video = st.selectbox("Select sample video", ["Campus", "Shopping Mall", "Office Room"]) # Process sample video if st.button("Process Sample Video"): # Create background subtractor fgModel = cv2.createBackgroundSubtractorMOG2(history=history, varThreshold=var_threshold, detectShadows=detect_shadows) # Create output directory if saving results if save_output: output_dir = tempfile.mkdtemp() st.session_state['output_dir'] = output_dir st.session_state['saved_frames'] = [] # Generate some dummy frames for demonstration total_frames = 100 progress_bar = st.progress(0) frame_display = st.empty() status_text = st.empty() sequence_counter = 0 motion_sequence = [] # Simulate processing frames for i in range(total_frames): # Create a dummy frame with some motion frame = np.zeros((400, 600, 3), dtype=np.uint8) # Add some moving objects if i % 10 < 5: # Motion every other 5 frames cv2.circle(frame, (300 + i % 100, 200), 50, (0, 0, 255), -1) has_motion = True else: has_motion = False # Apply background subtraction (simplified for demo) if i == 0: fgmask = np.zeros((400, 600), dtype=np.float32) else: # Process frame - simplified for demo gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) fgmask = fgModel.apply(frame) fgmask = np.float32(fgmask) # Create visualization mask F = np.zeros(frame.shape, np.uint8) F[:, :, 0], F[:, :, 1], F[:, :, 2] = fgmask, fgmask, fgmask # Combine original and mask combined = np.hstack((frame, F)) # Update motion sequence if has_motion: motion_sequence.append(frame) elif len(motion_sequence) > 0: # Save sequence if it meets minimum length if len(motion_sequence) >= min_sequence_frames and save_output: sequence_counter += 1 for j, seq_frame in enumerate(motion_sequence): frame_name = f"{sequence_counter}_{j+1}.jpg" frame_path = os.path.join(output_dir, frame_name) cv2.imwrite(frame_path, seq_frame) st.session_state['saved_frames'].append(frame_path) # Reset sequence motion_sequence = [] # Display current frame frame_display.image(convert_to_image(combined), caption="Processing: Original | Foreground Mask", use_column_width=True) # Update progress progress = (i + 1) / total_frames progress_bar.progress(progress) # Display status status_text.text(f"Processing frame {i+1}/{total_frames} | Motion sequences: {sequence_counter}") # Slow down processing slightly for display time.sleep(0.05) # Save final sequence if any if len(motion_sequence) >= min_sequence_frames and save_output: sequence_counter += 1 for j, seq_frame in enumerate(motion_sequence): frame_name = f"{sequence_counter}_{j+1}.jpg" frame_path = os.path.join(output_dir, frame_name) cv2.imwrite(frame_path, seq_frame) st.session_state['saved_frames'].append(frame_path) # Complete progress_bar.progress(1.0) status_text.success(f"Processing complete! {sequence_counter} motion sequences detected.") # Set results flag if save_output and sequence_counter > 0: st.session_state['has_results'] = True st.info("Results are available in the Results tab.") with tab2: st.markdown("
Detected Motion Sequences
", unsafe_allow_html=True) if 'has_results' in st.session_state and st.session_state['has_results']: if 'saved_frames' in st.session_state and st.session_state['saved_frames']: # Group frames by sequence sequences = {} for frame_path in st.session_state['saved_frames']: frame_name = os.path.basename(frame_path) seq_id = frame_name.split('_')[0] if seq_id not in sequences: sequences[seq_id] = [] sequences[seq_id].append(frame_path) # Display sequences selected_sequence = st.selectbox("Select motion sequence", list(sequences.keys())) if selected_sequence: st.write(f"Sequence {selected_sequence} - {len(sequences[selected_sequence])} frames") # Display thumbnails cols = st.columns(min(5, len(sequences[selected_sequence]))) for i, (col, frame_path) in enumerate(zip(cols, sequences[selected_sequence])): img = Image.open(frame_path) col.image(img, caption=f"Frame {i+1}", use_column_width=True) # Display full sequence st.write("Full sequence:") sequence_frames = [] for frame_path in sequences[selected_sequence]: img = Image.open(frame_path) sequence_frames.append(np.array(img)) # Create animated GIF option if st.button("Create GIF from Sequence"): with st.spinner("Creating GIF..."): # Create a temporary file for the GIF with tempfile.NamedTemporaryFile(suffix='.gif', delete=False) as temp_gif: gif_path = temp_gif.name # Convert frames to GIF images = [Image.fromarray(frame) for frame in sequence_frames] images[0].save( gif_path, save_all=True, append_images=images[1:], duration=200, loop=0 ) # Display the GIF with open(gif_path, 'rb') as gif_file: gif_data = gif_file.read() st.image(gif_data, caption="Motion Sequence GIF") # Provide download link st.download_button( label="Download GIF", data=gif_data, file_name=f"sequence_{selected_sequence}.gif", mime="image/gif" ) # Clean up os.unlink(gif_path) # Display stacked view st.write("All frames in sequence:") for i, frame in enumerate(sequence_frames): st.image(frame, caption=f"Frame {i+1}", use_column_width=True) else: st.info("No results available yet. Process a video or image sequence with 'Save Processed Frames' enabled.") with tab3: st.markdown("
About this application
", unsafe_allow_html=True) st.markdown(""" This application implements motion detection and object tracking using background subtraction and connected component analysis. ### Features: - Process videos or image sequences to detect motion - Background subtraction using the MOG2 algorithm - Connected component filtering to remove noise - Optional object detection (requires cvlib) - Save and display motion sequences - Export sequences as GIFs ### How to use: 1. Select an input source (upload video, image sequence, or use sample) 2. Adjust parameters in the sidebar 3. Process the input 4. View results in the Results tab ### Requirements for deployment: ``` streamlit opencv-python-headless numpy pillow matplotlib ``` For object detection functionality, also install: ``` cvlib tensorflow ``` ### How it works: The application uses background subtraction to identify moving objects in a video sequence. It then applies connected component analysis to filter out small noise components and track larger moving objects. Optionally, it can apply object detection to identify the types of objects detected. """) # Show parameters explanation st.markdown("### Parameter Explanation:") st.markdown(""" - **Minimum Component Area**: The minimum size (in pixels) of connected components to keep. Smaller components are filtered out as noise. - **Background History**: Number of frames used to build the background model in MOG2. - **Variance Threshold**: Threshold on the squared Mahalanobis distance to decide whether a pixel is foreground or background. - **Detect Shadows**: Enable shadow detection in MOG2 algorithm. - **Minimum Frames in Sequence**: The minimum number of consecutive frames with motion to save as a sequence. """) # Add deployment instructions st.markdown("### Deployment to Streamlit Cloud:") st.markdown(""" To deploy this application to Streamlit Cloud: 1. Create a GitHub repository with this code 2. Include a `requirements.txt` file with the necessary dependencies 3. Connect your GitHub repository to Streamlit Cloud 4. Deploy the application Example `requirements.txt`: ``` streamlit==1.24.0 opencv-python-headless==4.7.0.72 numpy==1.24.3 pillow==9.5.0 matplotlib==3.7.1 ``` """)