"""you need a gpu for realtime inference | 3050 or better""" """the input should be 720x480 px | other resolution works as well | takes more time though""" """important is that the videos are using acc as audio codec""" """packages""" # run: # pip install torch torchvision ultralytics pillow opencv-python numpy pyttsx3 moviepy pydub pycom import torch from ultralytics import YOLO from PIL import Image from torchvision import transforms import torchvision.transforms.functional as F import cv2 import numpy as np import time from collections import Counter import pyttsx3 import wave import threading import queue import gradio as gr import os, glob, shutil from moviepy import VideoFileClip, AudioFileClip from pathlib import Path from pydub import AudioSegment """folders and temp outputs""" #set these to match your folder structure EXAMPLES_FOLDER = Path("examples_video") # Default folder for examples TEMP_FILES_FOLDER = Path("temp_files") TEMP_FILES_FOLDER.mkdir(exist_ok=True) """check torch""" # my values / torch version import torch print(torch.__version__) # 2.8.0.dev20250422+cu128 print(torch.version.cuda) # 12.8 print(torch.backends.cudnn.enabled) # True """global variables""" audio_queue = queue.Queue(maxsize=1) # Only keep the most recent sentence audio_thread_active = False audio_is_playing = False user_device = "cuda" if torch.cuda.is_available() else "cpu" """yolo model loading""" from ultralytics import YOLO yoloV11_Small = YOLO("yolo_models_downloaded/yolo11s.pt", task="detect").to(user_device).eval() """transforms""" class PadToSquare: """ A TorchVision transform to scale a PIL Image and pad it to a square while preserving aspect ratio. Args: scale (float): Optional scaling factor applied before padding (default=1.0). fill (int or tuple): Pixel fill value for padding. 0 for black or tuple for RGB (default=0). padding_mode (str): Type of padding: 'constant', 'edge', 'reflect', or 'symmetric'. See torchvision.transforms.functional.pad for details. """ def __init__(self, scale: float = 1.0, fill=0, padding_mode: str = 'constant'): self.fill = fill self.padding_mode = padding_mode def __call__(self, img: Image.Image) -> Image.Image: # 2) Compute padding to make square w, h = img.size dim = max(w, h) pad_left = (dim - w) // 2 pad_top = (dim - h) // 2 pad_right = dim - w - pad_left pad_bottom = dim - h - pad_top # 3) Apply padding using functional API padding = (pad_left, pad_top, pad_right, pad_bottom) return F.pad(img, padding, fill=self.fill, padding_mode=self.padding_mode) testing_transforms = transforms.Compose([ PadToSquare(scale=1, fill=0, padding_mode="constant"), transforms.Resize(size=640), transforms.ToTensor() #divides the color channels by 255 yielding in 0-1 range ]) """ helper functions""" def return_attributes(result, frame): """ Returns a Dictionary with Lists: "labels", "left_right" """ # Get frame dimensions if frame is not None: try: height, width = frame.shape[:2] # Only get first two dimensions except (AttributeError, ValueError): # Fallback if frame.shape doesn't work height, width = 640, 640 else: height, width = 640, 640 # Initialize return lists centers = [] l_m_r = [] array_objects = [] try: # Process box coordinates if hasattr(result, 'boxes') and hasattr(result.boxes, 'xywh'): for box in result.boxes.xywh: # Convert to Python scalar if it's a numpy array if hasattr(box, 'item'): x = box[0].item() if hasattr(box[0], 'item') else float(box[0]) y = box[1].item() if hasattr(box[1], 'item') else float(box[1]) else: x, y = float(box[0]), float(box[1]) center = [int(x), int(y)] centers.append(center) # Determine position if x < width * 0.25: l_m_r.append("left") elif x > width * 0.75: l_m_r.append("right") else: l_m_r.append("center") # Process class labels if hasattr(result, 'boxes') and hasattr(result.boxes, 'cls'): for index in result.boxes.cls: # Convert index to integer safely if hasattr(index, 'item'): idx = int(index.item()) else: idx = int(index) label = result.names[idx] array_objects.append(str(label)) except Exception as e: print(f"Error in return_attributes: {e}") # Return empty lists if there's an error return {"labels": [], "left_right": []} return {"labels": array_objects, "left_right": l_m_r} def sort_attributes(attributes): """ Sorts the 'labels' and 'left_right' lists in the attributes dictionary based on the order: 'center' -> 'left' -> 'right'. Parameters: attributes (dict): A dictionary with keys 'labels' and 'left_right'. Returns: dict: A new dictionary with sorted 'labels' and 'left_right' lists. """ # Define the desired order position_order = {'center': 0, 'left': 1, 'right': 2} # Combine the labels and positions into pairs combined = list(zip(attributes['left_right'], attributes['labels'])) # Sort the combined list based on the defined position order sorted_combined = sorted(combined, key=lambda x: position_order.get(x[0], float('inf'))) # Unzip the sorted pairs back into separate lists sorted_positions, sorted_labels = zip(*sorted_combined) # Return the sorted attributes as a new dictionary return { 'labels': list(sorted_labels), 'left_right': list(sorted_positions) } def count_objects(labels, directions): """ Group identical labels only when in the same direction, then return sorted lists of 'labels' and 'left_right'. Args: labels (List[str]): Detected class names. directions (List[str]): Corresponding positions: 'left', 'center', or 'right'. Returns: dict: { 'labels': List[str], # e.g. ['sofa', 'chair', '2 houses'] 'left_right':List[str] # e.g. ['center', 'left', 'right'] } """ # 1. Count each (label, direction) pair pair_counts = Counter(zip(labels, directions)) # :contentReference[oaicite:0]{index=0} # 2. Define direction ranking for sorting rank = {'center': 0, 'left': 1, 'right': 2} # 3. Prepare a helper to pluralize labels when count > 1 def pluralize(word, count): if count == 1: return f"a {word}" # simple English plural rules if any(word.endswith(s) for s in ('s','x','z','sh','ch')): return f"{count} {word}es" if word.endswith('y') and word[-2] not in 'aeiou': return f"{count} {word[:-1]}ies" return f"{count} {word}s" # 4. Sort the unique (label, direction) keys by direction rank sorted_pairs = sorted(pair_counts.items(), key=lambda x: rank.get(x[0][1], float('inf'))) # # 5. Build the output lists out_labels = [] out_dirs = [] for (label, direction), cnt in sorted_pairs: out_labels.append(pluralize(label, cnt)) out_dirs.append(direction) return {'labels': out_labels, 'left_right': out_dirs} def join_items(items): """ Join a list of strings into a human-readable list: - Single item: "X" - Two items: "X and Y" - More: "X, Y and Z" """ # If only one, return it unchanged if len(items) == 1: return items[0] # If two, join with " and " if len(items) == 2: return f"{items[0]} and {items[1]}" # Otherwise comma-join all but last, then " and " before final return ", ".join(items[:-1]) + " and " + items[-1] def make_sentence(attrib_dict): """ Build a sentence from attrib_dict with keys: - labels: List[str], e.g. ['a human','a dog','2 trees','2 houses'] - left_right:List[str], e.g. ['center','center','left','right'] Returns: A string like: "There is a human and a dog in front of you. There are also 2 trees to your left and 2 houses to your right." Or when no center objects: "There is a car to your right." """ labels = attrib_dict["labels"] directions = attrib_dict["left_right"] # Group labels by direction grouped = {"center": [], "left": [], "right": []} for lbl, dr in zip(labels, directions): if dr in grouped: grouped[dr].append(lbl) parts = [] has_center_objects = bool(grouped["center"]) # Front clause (center) front = grouped["center"] if front: verb = "is" if len(front) == 1 and not front[0].startswith(('2 ', '3 ', '4 ')) else "are" parts.append(f"There {verb} {join_items(front)} in front of you") # Side clauses (left, right) side_parts = [] for side in ("left", "right"): items = grouped[side] if items: # For single items starting with "a", use "is" # For numeric items (like "2 cars"), use "are" if len(items) == 1: item = items[0] if item.startswith("a "): side_parts.append(f"There is {item} to your {side}") else: # Handle numeric items verb = "is" if item.startswith(("1 ", "one ")) else "are" side_parts.append(f"There {verb} {item} to your {side}") else: side_parts.append(f"There are {join_items(items)} to your {side}") # Determine whether to use "also" if side_parts: if has_center_objects: # Modify first side part to include "also" if side_parts: first_side = side_parts[0] # Insert "also" after "There" words = first_side.split(" ", 2) if len(words) >= 2: side_parts[0] = f"{words[0]} also {words[1]} {words[2]}" parts.extend(side_parts) # Join all parts with periods return ". ".join(parts) + "." def text_to_speech_audio(text: str, voice_id: int = 1, rate: int = 120) -> np.ndarray: """Convert text to speech using pyttsx3 and return audio array""" # Create a temporary WAV file temp_file = "temp_speech.wav" # Initialize TTS engine engine = pyttsx3.init() # Configure voice voices = engine.getProperty('voices') if voice_id < len(voices): engine.setProperty('voice', voices[voice_id].id) # Set speech rate engine.setProperty('rate', rate) engine.setProperty('volume', 0.9) # Save speech to the file engine.save_to_file(text, temp_file) engine.runAndWait() # Read the WAV file with wave.open(temp_file, 'rb') as wav_file: # Get parameters framerate = wav_file.getframerate() n_frames = wav_file.getnframes() # Read all frames audio_data = wav_file.readframes(n_frames) # Convert to numpy array audio_array = np.frombuffer(audio_data, dtype=np.int16) # Return the audio data and sample rate return audio_array, framerate def draw_detection_video(frame, result, display_labels=True, left_right_lines=True): boxes = result.boxes.xyxy.cpu().numpy() class_ids = result.boxes.cls.cpu().numpy().astype(int) class_names = result.names for index in range(len(boxes)): box = boxes[index] class_id = class_ids[index] label = str(class_names[class_id]) x1, y1, x2, y2 = map(int, box) cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2) if left_right_lines: y,x,color = frame.shape cv2.line(frame, (int(x*.25),0), (int(x*0.25),y), (0,255,0), 2) cv2.line(frame, (int(x*.75),0), (int(x*0.75),y), (0,255,0), 2) if display_labels: cv2.putText(frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0,255,0), 2) """process audio / video with corrected sizing""" # Audio thread state audio_queue = queue.Queue(maxsize=1) audio_thread_active = False VOICE_RATE_MULTIPLIER = 1.2 # 20% faster # Global TTS engine lock to prevent concurrent access issues tts_engine_lock = threading.Lock() VOICE_RATE_MULTIPLIER = 1.2 # 20% faster def resize_frame(frame, max_dimension=1920): """ Resize frame while maintaining aspect ratio, ensuring no dimension exceeds max_dimension. Args: frame: Input image frame max_dimension: Maximum allowed dimension for width or height Returns: Resized frame and scaling factor """ height, width = frame.shape[:2] # If both dimensions are already below max, return original if width <= max_dimension and height <= max_dimension: return frame, 1.0 # Calculate scale factor to reduce largest dimension to max_dimension scale_factor = max_dimension / max(width, height) # Calculate new dimensions new_width = int(width * scale_factor) new_height = int(height * scale_factor) # Resize the frame resized_frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA) print(f"Resized from {width}x{height} to {new_width}x{new_height}") return resized_frame, scale_factor def configure_voice_engine(engine): """Configure a pyttsx3 engine with male English voice and adjusted rate.""" try: # Get available voices voices = engine.getProperty('voices') # Look for a male voice for voice in voices: # Most voice IDs or names contain information about gender and language voice_id = voice.id.lower() if ('en' in voice_id and ('male' in voice_id or 'david' in voice_id or 'mark' in voice_id)): engine.setProperty('voice', voice.id) break else: # If no male English voice found, use the first English voice for voice in voices: if 'en' in voice.id.lower(): engine.setProperty('voice', voice.id) break # Get and adjust speaking rate default_rate = engine.getProperty('rate') engine.setProperty('rate', int(default_rate * VOICE_RATE_MULTIPLIER)) except Exception as e: print(f"Error configuring voice: {e}") def audio_thread(): """Background thread: speak any enqueued sentence via pyttsx3.""" # Initialize TTS engine - use lock to prevent conflicts with tts_engine_lock: engine = pyttsx3.init() configure_voice_engine(engine) while audio_thread_active: try: sentence = audio_queue.get(timeout=0.1) # Use lock to prevent conflicts with other TTS operations with tts_engine_lock: engine.say(sentence) engine.runAndWait() except queue.Empty: continue except Exception as e: print(f"Error in audio thread: {e}") def save_speech_to_wav(text: str, wav_path: Path): """Synchronously save TTS of `text` into `wav_path` using pyttsx3.""" if not text: print(f"Warning: Empty text passed to save_speech_to_wav") # Create silent audio for empty text silence = AudioSegment.silent(duration=500) # 0.5s silence silence.export(str(wav_path), format="wav") return try: # Use the lock to prevent concurrent TTS engine usage with tts_engine_lock: # Create a new engine instance each time engine = pyttsx3.init() configure_voice_engine(engine) # Save to file engine.save_to_file(text, str(wav_path)) engine.runAndWait() # Make sure the engine is properly shut down engine.stop() # Add a small delay to ensure file is properly written time.sleep(0.1) # Verify the file exists and has content if os.path.exists(wav_path) and os.path.getsize(wav_path) > 0: print(f"Successfully saved speech to {wav_path}") else: print(f"Warning: Generated WAV file is empty or missing: {wav_path}") # Create silent audio as fallback silence = AudioSegment.silent(duration=500) # 0.5s silence silence.export(str(wav_path), format="wav") except Exception as e: print(f"Error saving speech to WAV: {e}") # Create silent audio as fallback for error cases try: silence = AudioSegment.silent(duration=500) # 0.5s silence silence.export(str(wav_path), format="wav") print(f"Created silent fallback for failed TTS: {wav_path}") except Exception as silence_error: print(f"Failed to create silent fallback: {silence_error}") def process_image(image_path, save_output=False, output_path=None, confidence=0.5): """ Process a single image file with YOLO detection and convert to MP4 without audio. Returns the path to the processed image or video. """ try: # Read the image frame = cv2.imread(image_path) if frame is None: raise IOError(f"Could not open image: {image_path}") # YOLO processing attributes = {"labels": [], "left_right": []} results = yoloV11_Small(frame, conf=confidence, verbose=False, iou=0.5, agnostic_nms=True) # Draw detections for result in results: draw_detection_video(frame, result, display_labels=True, left_right_lines=True) attributes = return_attributes(result, frame) # We're intentionally NOT drawing the sentence at the bottom left # Generate sentence just for console output current_sentence = "" if attributes["labels"]: attrs = sort_attributes(attributes) counted = count_objects(attrs["labels"], attrs["left_right"]) current_sentence = make_sentence(counted) print(f"Detected: {current_sentence}") # Save output if requested if save_output: if output_path: cv2.imwrite(output_path, frame) print(f"Saved output to {output_path}") else: # Create default output filename file_name = os.path.basename(image_path) name, ext = os.path.splitext(file_name) output_path = f"{name}_detected{ext}" cv2.imwrite(output_path, frame) print(f"Saved output to {output_path}") # Always create a video from the image (without audio) # Create file paths base_name = Path(image_path).stem final_path = str(TEMP_FILES_FOLDER / f"{base_name}_final.mp4") # Set video parameters - fixed short duration height, width = frame.shape[:2] fps = 30 # Create a 1-second video (30 frames at 30fps) total_frames = 30 # Create video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') video_writer = cv2.VideoWriter(final_path, fourcc, fps, (width, height)) # Create frames (static image for all frames) for _ in range(total_frames): video_writer.write(frame) video_writer.release() print(f"Created video without audio: {final_path}") return final_path except Exception as e: print(f"Image processing error: {e}") return None def process_video(input_source, save_output=False, output_path=None, confidence=0.5, base_name="video"): """ Process video with YOLO detection and simultaneous male English TTS playback, then mux combined audio into the saved video. Returns path to final video file (with audio). """ global audio_thread_active # Determine output paths if save_output: temp_vid = Path(output_path) if output_path else TEMP_FILES_FOLDER / f"{base_name}_temp.mp4" else: temp_vid = None # Start live TTS thread for preview only audio_thread_active = True preview_thread = threading.Thread(target=audio_thread, daemon=True) preview_thread.start() # Capture segments to mux later segments = [] # (timestamp_ms, text) # For accurate timing frame_count = 0 total_frames = 0 # Open video cap = cv2.VideoCapture(input_source) if not cap.isOpened(): raise IOError(f"Cannot open video: {input_source}") # Get original video properties fps = cap.get(cv2.CAP_PROP_FPS) or 30 original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"Original video dimensions: {original_width}x{original_height}") # Check first frame to determine if resizing is needed ret, first_frame = cap.read() if not ret: raise IOError("Failed to read first frame from video") # Get target dimensions first_frame, scale_factor = resize_frame(first_frame) target_width, target_height = first_frame.shape[1], first_frame.shape[0] # Reset video capture to start cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Setup writer with target dimensions if save_output: fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(str(temp_vid), fourcc, fps, (target_width, target_height)) print(f"Output video dimensions: {target_width}x{target_height}") last_sentence = None processing_start_time = time.time() # Track if AI is currently speaking is_speaking = False speaking_end_frame = 0 # Function to estimate speech duration in frames def estimate_tts_duration_frames(text, fps): """Estimate TTS duration in frames based on word count and FPS.""" words = len(text.split()) # Base duration: 400ms per word, adjusted by voice rate multiplier duration_ms = int((words * 400) / VOICE_RATE_MULTIPLIER) # Add a small buffer and convert to frames duration_ms = max(duration_ms, 500) # At least 500ms return int((duration_ms / 1000) * fps) # Convert ms to frames based on FPS # Process frames while True: ret, frame = cap.read() if not ret: break frame_count += 1 # Resize if needed (maintaining aspect ratio) if original_width > 1920 or original_height > 1920: frame, _ = resize_frame(frame) # Calculate timestamp based on frame number for more accurate timing frame_timestamp_ms = int((frame_count / fps) * 1000) # Check if AI is still speaking based on estimated duration if is_speaking and frame_count >= speaking_end_frame: is_speaking = False # YOLO detect attributes = {"labels": [], "left_right": []} results = yoloV11_Small(frame, conf=confidence, verbose=False, iou=0.5, agnostic_nms=True) for r in results: draw_detection_video(frame, r, display_labels=True, left_right_lines=True) attributes = return_attributes(r, frame) # Make sentence sentence = "" if attributes["labels"]: attrs = sort_attributes(attributes) counted = count_objects(attrs["labels"], attrs["left_right"]) sentence = make_sentence(counted) # New utterance - only if we have a new sentence AND we're not currently speaking if sentence and sentence != last_sentence and not is_speaking: # Add to segments with frame-based timestamp for final video segments.append((frame_timestamp_ms, sentence)) # Calculate estimated speech duration in frames speech_duration_frames = estimate_tts_duration_frames(sentence, fps) is_speaking = True speaking_end_frame = frame_count + speech_duration_frames # Clear queue if something is waiting try: audio_queue.get_nowait() except queue.Empty: pass # Queue new sentence for speaking - this goes to live preview audio_queue.put(sentence) last_sentence = sentence # For debugging print(f"Frame {frame_count}/{total_frames}: Speaking '{sentence}' for {speech_duration_frames} frames") # Overlay and display h, w = frame.shape[:2] # Get dimensions of potentially resized frame if sentence: cv2.putText(frame, sentence, (10, h-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2) # Add speaking indicator if currently speaking if is_speaking: cv2.putText(frame, f"Speaking... (Frame {frame_count}/{speaking_end_frame})", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255), 2) cv2.imshow('YOLO', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break if save_output: out.write(frame) # Cleanup video capture cap.release() if save_output: out.release() cv2.destroyAllWindows() # Stop the audio thread for preview audio_thread_active = False time.sleep(0.5) # Give time for thread to clean up if not save_output: return None # Make sure the preview thread is fully stopped before continuing if preview_thread.is_alive(): time.sleep(1.0) # Wait for thread to exit # Build audio track - creating a silent base track clip = VideoFileClip(str(temp_vid)) total_ms = int(clip.duration * 1000) print(f"Video duration: {total_ms}ms") combined_audio = AudioSegment.silent(duration=total_ms) # Create WAV files for each detected segment wav_files = [] print(f"Processing {len(segments)} audio segments") # Create all the WAV files in sequence to avoid pyttsx3 conflicts for idx, (ts, text) in enumerate(segments): wav_path = TEMP_FILES_FOLDER / f"{base_name}_{idx}_{ts}.wav" # Use our improved save_speech_to_wav function save_speech_to_wav(text, wav_path) if os.path.exists(wav_path) and os.path.getsize(wav_path) > 0: wav_files.append((ts, wav_path)) else: print(f"Warning: WAV file not created for segment {idx}: {text}") # Keep track of end times to prevent overlapping last_end_time = 0 # Sort segments by timestamp to ensure proper ordering wav_files.sort(key=lambda x: x[0]) # Process each audio segment and add it at the correct timestamp WITHOUT overlapping for idx, (ts, wav_path) in enumerate(wav_files): try: # Load the audio segment seg = AudioSegment.from_wav(str(wav_path)) # Get actual duration actual_duration = len(seg) # Ensure we don't start before the last segment ended start_time = max(ts, last_end_time) # Position it at its timestamp combined_audio = combined_audio.overlay(seg, position=start_time) # Update the last end time last_end_time = start_time + actual_duration print(f"Added segment {idx} at {start_time}ms (original ts: {ts}ms): duration {actual_duration}ms") except Exception as e: print(f"Error processing audio segment {idx}: {e}") # Export the combined audio audio_file = TEMP_FILES_FOLDER / f"{base_name}_audio.wav" combined_audio.export(str(audio_file), format="wav") print(f"Exported combined audio to {audio_file}") # Explicitly close and release moviepy resources to prevent file access issues clip.close() # Mux audio into video using FFmpeg directly for better reliability final_vid = TEMP_FILES_FOLDER / f"{base_name}_final.mp4" try: # Verify audio file exists and has content if not os.path.exists(audio_file) or os.path.getsize(audio_file) == 0: print(f"Warning: Audio file {audio_file} is missing or empty, generating silence") # Create a silent audio file if the original is empty silent_audio = AudioSegment.silent(duration=total_ms) silent_audio.export(str(audio_file), format="wav") # Use FFmpeg directly - this is more reliable than moviepy for consistent audio muxing import subprocess cmd = [ 'ffmpeg', '-i', str(temp_vid), # Video input '-i', str(audio_file), # Audio input '-c:v', 'libx264', # Use libx264 for video compatibility '-c:a', 'aac', # Use AAC for audio '-b:a', '192k', # Set audio bitrate '-shortest', # End when shortest input ends '-strict', 'experimental', # Allow experimental codecs '-movflags', '+faststart', # Optimize for web playback '-y', # Overwrite output if exists str(final_vid) # Output file ] # Run ffmpeg command print(f"Running FFmpeg command: {' '.join(cmd)}") result = subprocess.run(cmd, check=True, capture_output=True, text=True) print(f"Video created using ffmpeg: {final_vid}") if result.stdout: print(f"FFmpeg stdout: {result.stdout}") if result.stderr: print(f"FFmpeg stderr: {result.stderr}") # Verify the created file has audio check_cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_type', '-of', 'default=noprint_wrappers=1:nokey=1', str(final_vid) ] try: probe_result = subprocess.run(check_cmd, capture_output=True, text=True) if 'audio' in probe_result.stdout: print(f"FFprobe confirms audio stream exists in {final_vid}") else: print(f"Warning: FFprobe did not detect audio in {final_vid}") # Try to fix by adding a silent audio track as fallback verify_video_has_audio(str(final_vid)) except Exception as probe_error: print(f"FFprobe check error: {probe_error}") except Exception as ffmpeg_error: print(f"FFmpeg error: {ffmpeg_error}") # If all else fails, just copy the video without audio import shutil shutil.copy(str(temp_vid), str(final_vid)) print(f"Created video without audio as last resort: {final_vid}") # Clean up temporary files with proper error handling and file access retries def safe_remove_file(file_path, max_retries=3, retry_delay=1.0): """Safely remove a file with retries for Windows file locking issues""" for attempt in range(max_retries): try: if os.path.exists(file_path): os.remove(file_path) return True except Exception as e: if attempt < max_retries - 1: print(f"Retry {attempt+1}/{max_retries}: Failed to remove {file_path}: {e}") time.sleep(retry_delay) else: print(f"Final attempt failed to remove {file_path}: {e}") return False # Clean wav files for _, wav_path in wav_files: safe_remove_file(wav_path) # Clean temp video and audio safe_remove_file(temp_vid) safe_remove_file(audio_file) # Return the path to the final video with audio if os.path.exists(final_vid): return str(final_vid) else: # If something went wrong with the final video, return the temp video if it exists return str(temp_vid) if os.path.exists(temp_vid) else None """gradio app""" # Function to load example media from a folder def load_examples_from_folder(folder_path=EXAMPLES_FOLDER): """Load all images and videos from a specified folder as examples""" if not os.path.exists(folder_path): os.makedirs(folder_path) # Create the folder if it doesn't exist print(f"Created examples folder at {folder_path}") return [] # Valid file extensions for images and videos valid_extensions = [ # Images ".jpg", ".jpeg", ".png", ".bmp", ".gif", ".webp", # Videos ".mp4", ".avi", ".mov", ".mkv", ".webm" ] # Get all files with valid extensions in the folder example_files = [] for ext in valid_extensions: example_files.extend(glob.glob(os.path.join(folder_path, f"*{ext}"))) example_files.extend(glob.glob(os.path.join(folder_path, f"*{ext.upper()}"))) print(f"Loaded {len(example_files)} example files from {folder_path}") return example_files # Load example media from the configured folder raw = load_examples_from_folder() unique = list(dict.fromkeys(raw)) # remove duplicates example_media = [(p, None) for p in unique] # tell Gallery about each media item def determine_media_type(file_path): """Determine if a file is a video or image based on extension""" if not file_path: return None file_path = str(file_path) video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.webm'] image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'] file_ext = os.path.splitext(file_path.lower())[1] if file_ext in video_extensions: return "video" elif file_ext in image_extensions: return "image" else: return None def handle_upload(file_obj): """Handle file upload and return appropriate updates""" if file_obj is None: return gr.update(visible=False), None, gr.update(visible=False) file_path = file_obj.name if hasattr(file_obj, 'name') else str(file_obj) media_type = determine_media_type(file_path) if media_type == "video": return gr.update(value=file_path, visible=True), file_path, gr.update(value=None, visible=False) elif media_type == "image": return gr.update(value=None, visible=False), file_path, gr.update(value=file_path, visible=True) else: return gr.update(visible=False), None, gr.update(visible=False) def select_from_gallery(evt: gr.SelectData): # grab the file-path string selected_path = example_media[evt.index][0] # if you wrapped as (path,caption) # call your existing upload-handler return handle_upload(selected_path) def verify_video_has_audio(video_path): """Verify that a video file has an audio stream, and if not, try to fix it.""" try: import subprocess import time # Check if FFprobe is available (to check audio streams) try: check_cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_type', '-of', 'default=noprint_wrappers=1:nokey=1', str(video_path) ] probe_result = subprocess.run(check_cmd, capture_output=True, text=True) has_audio = 'audio' in probe_result.stdout if has_audio: print(f"✓ Video {video_path} has audio stream") return True else: print(f"⚠️ Video {video_path} has NO audio stream") # Try to fix by adding a silent audio track try: print(f"Adding silent audio track to {video_path}") # Get video duration without loading the entire file duration_cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(video_path) ] duration_result = subprocess.run(duration_cmd, capture_output=True, text=True) try: duration_sec = float(duration_result.stdout.strip()) duration_ms = int(duration_sec * 1000) except (ValueError, TypeError): # Fallback to a default duration if we can't parse the output duration_ms = 10000 # 10 seconds # Create a temporary silence file silence = AudioSegment.silent(duration=duration_ms) silence_path = str(TEMP_FILES_FOLDER / "temp_silence.wav") silence.export(silence_path, format="wav") # Create a new output path fixed_path = str(video_path).replace(".mp4", "_with_audio.mp4") # Use FFmpeg to add the silent audio cmd = [ 'ffmpeg', '-i', str(video_path), # Video input '-i', silence_path, # Silent audio input '-c:v', 'copy', # Copy video codec (no re-encoding) '-c:a', 'aac', # Convert audio to AAC '-shortest', # Stop at shortest stream '-y', # Overwrite output fixed_path # Output path ] subprocess.run(cmd, check=True, capture_output=True) # Make sure the original file is not in use time.sleep(0.5) # Replace original file with fixed file # On Windows, we may need to delete and rename instead of os.replace try: os.remove(video_path) os.rename(fixed_path, video_path) except Exception as replace_error: print(f"Error replacing file: {replace_error}") # Return the fixed path instead if we couldn't replace return fixed_path print(f"✓ Added silent audio to {video_path}") return True except Exception as fix_error: print(f"Failed to add silent audio: {fix_error}") return False except Exception as e: print(f"FFprobe not available or error: {e}") # If FFprobe isn't available, try using moviepy instead (as fallback only) try: clip = VideoFileClip(str(video_path)) has_audio = clip.audio is not None clip.close() if not has_audio: print(f"⚠️ MoviePy reports no audio in {video_path}") # We'd need to implement a moviepy-based solution here if FFmpeg is not available return has_audio except Exception as moviepy_error: print(f"MoviePy check failed: {moviepy_error}") return False except Exception as e: print(f"Error verifying audio in video: {e}") return False def analyze_media(media_path, confidence): """ If media_path is an image: - run process_image(..., save_output=True) - if it produced a video, return that; otherwise return the annotated image path If media_path is a video: - run process_video(..., save_output=True) - return the final video path (or None on failure) """ # Clean up temp directory first to prevent conflicts from previous runs if TEMP_FILES_FOLDER.exists(): try: shutil.rmtree(TEMP_FILES_FOLDER) time.sleep(0.5) # Give OS time to complete the operation except Exception as e: print(f"Warning: Could not clean temp directory: {e}") # Try to delete files individually if rmtree fails try: for file in TEMP_FILES_FOLDER.glob("*"): try: os.remove(file) except: pass except: pass # Create temp directory TEMP_FILES_FOLDER.mkdir(parents=True, exist_ok=True) if not media_path: return None mtype = determine_media_type(media_path) base = Path(media_path).stem if mtype == "image": # where to write annotated image/video out_img = TEMP_FILES_FOLDER / f"{base}_detected.jpg" # process_image will create out_img, and—if it detects objects—a short video+audio result = process_image( image_path=media_path, save_output=True, output_path=str(out_img), confidence=confidence ) # process_image returns either the final video path or the image path if result and os.path.exists(result): # Verify the output has audio (for video outputs) if determine_media_type(result) == "video": verify_video_has_audio(result) return result elif out_img.exists(): return str(out_img) return None elif mtype == "video": out_vid = TEMP_FILES_FOLDER / f"{base}_detected.mp4" final = process_video( input_source=media_path, save_output=True, output_path=str(out_vid), confidence=confidence, base_name=base ) # Verify the output has audio if final and os.path.exists(final): verify_video_has_audio(final) return final if out_vid.exists(): verify_video_has_audio(str(out_vid)) return str(out_vid) return None else: # unsupported file type return None custom_css = """ /* 1) Global override: remove any forced sizing on .fixed-height anywhere */ .fixed-height.svelte-842rpi.svelte-842rpi { min-height: 0 !important; /* cancel the global min-height */ max-height: none !important; /* cancel the global max-height */ height: auto !important; /* allow auto height */ } /* 2) Same-query override: mirror Gradio's media query exactly */ @media (min-width: 1280px) { /* target the exact same class chain inside the breakpoint */ .fixed-height.svelte-842rpi.svelte-842rpi { min-height: 0 !important; /* zero-out the 55vh/min-height there */ max-height: none !important; /* remove the viewport-height cap */ height: auto !important; /* let content dictate height */ } } """ # ensure a clean temp_files/ on each startup if TEMP_FILES_FOLDER.exists(): shutil.rmtree(TEMP_FILES_FOLDER) TEMP_FILES_FOLDER.mkdir(parents=True, exist_ok=True) # Create the Gradio interface with gr.Blocks(title="Media Object Detection", css=custom_css) as demo: gr.Markdown("## 🔍 Media Object Detection") gr.Markdown("Upload or select an image or video, then click 'Analyze Media'") # Store current media path current_media = gr.State() with gr.Row(): with gr.Column(scale=1): # Input components video_input = gr.Video(label="Video Preview", visible=False) image_input = gr.Image(label="Image Preview", type="filepath", visible=False) # Media selection controls upload_btn = gr.UploadButton( "Upload Media (Image or Video)", file_types=["image", "video"] ) analyze_btn = gr.Button("Analyze Media 🚀", variant="primary") confidence = gr.Slider( minimum=0.1, maximum=0.9, value=0.5, step=0.025, label="Detection Confidence" ) with gr.Column(scale=1): # Output components video_output = gr.Video(label="Processed Video Output", visible=True) #image_output = gr.Image(label="Processed Image Output", visible=False) # Example gallery at bottom with gr.Row(variant="panel"): if example_media: examples_gallery = gr.Gallery( value=example_media, label=f"Example Media (Click to Select) - {len(example_media)} examples from {EXAMPLES_FOLDER}", columns=6, elem_id="my_media_gallery", allow_preview=False, elem_classes=["centered-examples"] ) else: gr.Markdown( f"No example media found in {EXAMPLES_FOLDER} folder. Add media files to see examples." ) # Connect input components if example_media: examples_gallery.select( fn=select_from_gallery, outputs=[video_input, current_media, image_input] ) upload_btn.upload( fn=handle_upload, inputs=[upload_btn], outputs=[video_input, current_media, image_input] ) # Connect analyze button analyze_btn.click( fn=analyze_media, inputs=[current_media, confidence], outputs=[video_output] #, image_output ) if __name__ == "__main__": demo.launch()