Spaces:
Runtime error
Runtime error
| """ | |
| BLIND ASSISTANCE MODEL - HUGGING FACE SPACES DEPLOYMENT | |
| Enhanced Video Navigation System with Audio Guidance | |
| """ | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| from ultralytics import YOLO | |
| from gtts import gTTS | |
| import pygame | |
| import os | |
| import time | |
| from collections import deque | |
| from PIL import Image, ImageEnhance | |
| import torch | |
| import threading | |
| from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip | |
| import tempfile | |
| import json | |
| # Optional imports | |
| try: | |
| import easyocr | |
| EASYOCR_AVAILABLE = True | |
| except ImportError: | |
| EASYOCR_AVAILABLE = False | |
| print("β οΈ EasyOCR not available") | |
| try: | |
| import segmentation_models_pytorch as smp | |
| SMP_AVAILABLE = True | |
| except ImportError: | |
| SMP_AVAILABLE = False | |
| print("β οΈ segmentation_models_pytorch not available") | |
| class AudioNavigationSystem: | |
| def __init__(self): | |
| print("π Initializing Blind Assistance Model...") | |
| # Load YOLOv8 model | |
| print("Loading YOLOv8 model...") | |
| self.model = YOLO('yolov8n.pt') | |
| print("β Model loaded successfully!") | |
| # Initialize Semantic Segmentation Model | |
| print("Loading Semantic Segmentation Model...") | |
| self.segmentation_model = self.load_segmentation_model() | |
| print("β Segmentation model loaded!") | |
| # Define segmentation classes | |
| self.segmentation_classes = { | |
| 0: 'road', 1: 'sidewalk', 2: 'building', 3: 'wall', 4: 'fence', | |
| 5: 'pole', 6: 'traffic light', 7: 'traffic sign', 8: 'vegetation', | |
| 9: 'terrain', 10: 'sky', 11: 'person', 12: 'rider', 13: 'car', | |
| 14: 'truck', 15: 'bus', 16: 'train', 17: 'motorcycle', 18: 'bicycle', | |
| 19: 'void' | |
| } | |
| # Initialize Text Detection | |
| print("Loading Text Detection...") | |
| self.reader = self.load_text_detector() | |
| print("β Text detection initialized!") | |
| # Audio system | |
| self.use_audio = True | |
| self.audio_files = [] | |
| self.audio_timestamps = [] | |
| self.video_start_time = None | |
| self.speaking = False | |
| self.audio_lock = threading.Lock() | |
| # Navigation classes | |
| self.navigation_classes = { | |
| 'person': 'person', 'car': 'vehicle', 'truck': 'vehicle', 'bus': 'vehicle', | |
| 'motorcycle': 'vehicle', 'bicycle': 'bicycle', 'traffic light': 'traffic light', | |
| 'stop sign': 'stop sign', 'chair': 'chair', 'bench': 'bench' | |
| } | |
| # Priority levels | |
| self.object_priority = { | |
| 'important_text': 10, | |
| 'vehicle': 5, | |
| 'person': 4, | |
| 'bicycle': 4, | |
| 'traffic light': 3, | |
| 'stop sign': 3, | |
| 'stairs': 4, | |
| 'curb': 4, | |
| 'crosswalk': 3, | |
| 'text': 2, | |
| 'road': 1, | |
| 'sidewalk': 1, | |
| 'building': 1, | |
| 'vegetation': 1 | |
| } | |
| # Important keywords for text | |
| self.important_keywords = [ | |
| 'exit', 'entrance', 'warning', 'danger', 'caution', 'stop', | |
| 'stairs', 'elevator', 'escalator', 'crosswalk', 'curb', | |
| 'emergency', 'hospital', 'police', 'fire', 'help', | |
| 'men', 'women', 'toilet', 'restroom', 'washroom', | |
| 'up', 'down', 'left', 'right', 'north', 'south', 'east', 'west', | |
| 'hazard', 'attention' | |
| ] | |
| # Frame dimensions | |
| self.frame_width = 0 | |
| self.frame_height = 0 | |
| # Announcement cooldown | |
| self.last_announcement = time.time() | |
| self.announcement_cooldown = 3 | |
| # Store detected items | |
| self.detected_items = set() | |
| self.text_size_reference = 100 | |
| self.last_segmentation_analysis = "" | |
| self.segmentation_cooldown = 2 | |
| print("β System initialized successfully!") | |
| def load_text_detector(self): | |
| """Load text detection model""" | |
| if EASYOCR_AVAILABLE: | |
| try: | |
| return easyocr.Reader(['en']) | |
| except Exception as e: | |
| print(f"β οΈ EasyOCR initialization failed: {e}") | |
| return None | |
| def load_segmentation_model(self): | |
| """Load segmentation model""" | |
| if not SMP_AVAILABLE: | |
| return None | |
| try: | |
| model = smp.Unet( | |
| encoder_name="mobilenet_v2", | |
| encoder_weights="voc", | |
| classes=20, | |
| activation=None, | |
| ) | |
| return model | |
| except Exception as e: | |
| print(f"β οΈ Could not load segmentation model: {e}") | |
| return None | |
| def perform_semantic_segmentation(self, frame): | |
| """Perform semantic segmentation""" | |
| try: | |
| h, w = frame.shape[:2] | |
| seg_map = np.zeros((h, w), dtype=np.uint8) | |
| hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) | |
| # Road detection | |
| dark_mask = cv2.inRange(hsv, (0, 0, 0), (180, 255, 100)) | |
| seg_map[h//2:, :][dark_mask[h//2:, :] > 0] = 0 | |
| # Sky detection | |
| sky_mask = cv2.inRange(hsv, (100, 50, 150), (140, 255, 255)) | |
| seg_map[:h//3, :][sky_mask[:h//3, :] > 0] = 10 | |
| return seg_map | |
| except Exception as e: | |
| return np.zeros((frame.shape[0], frame.shape[1]), dtype=np.uint8) | |
| def analyze_segmentation_map(self, seg_map): | |
| """Analyze segmentation map""" | |
| h, w = seg_map.shape | |
| analysis = { | |
| 'immediate_walkable': 0, | |
| 'immediate_obstacles': 0, | |
| 'critical_warnings': [], | |
| 'guidance': [], | |
| 'environment': 'unknown' | |
| } | |
| immediate_path = seg_map[int(h*0.7):, :] | |
| road_pixels = np.sum(immediate_path == 0) | |
| total_pixels = immediate_path.size | |
| if total_pixels > 0: | |
| road_percentage = (road_pixels / total_pixels) * 100 | |
| if road_percentage > 60: | |
| analysis['guidance'].append("Clear path ahead") | |
| analysis['environment'] = 'road' | |
| elif road_percentage > 30: | |
| analysis['guidance'].append("Moderate path clarity") | |
| analysis['environment'] = 'mixed' | |
| else: | |
| analysis['guidance'].append("Obstructed path ahead") | |
| analysis['environment'] = 'obstructed' | |
| return analysis | |
| def generate_segmentation_guidance(self, seg_analysis): | |
| """Generate guidance from segmentation""" | |
| if not seg_analysis['guidance']: | |
| return None | |
| guidance = ". ".join(seg_analysis['guidance']) | |
| if seg_analysis['environment'] == 'road': | |
| guidance += ". You appear to be on a road." | |
| elif seg_analysis['environment'] == 'obstructed': | |
| guidance += ". Path may be obstructed." | |
| return guidance | |
| def preprocess_image_for_text(self, image): | |
| """Preprocess image for text detection""" | |
| pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) | |
| enhancer = ImageEnhance.Contrast(pil_image) | |
| pil_image = enhancer.enhance(2.0) | |
| enhancer = ImageEnhance.Sharpness(pil_image) | |
| pil_image = enhancer.enhance(2.0) | |
| return cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) | |
| def detect_text_easyocr(self, frame): | |
| """Detect text using EasyOCR""" | |
| if self.reader is None: | |
| return [] | |
| try: | |
| processed_frame = self.preprocess_image_for_text(frame) | |
| gray = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2GRAY) | |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
| thresh = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 11, 2) | |
| kernel = np.ones((2, 2), np.uint8) | |
| morphed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) | |
| processed_for_ocr = cv2.cvtColor(morphed, cv2.COLOR_GRAY2BGR) | |
| results = self.reader.readtext(processed_for_ocr, | |
| decoder='beamsearch', | |
| beamWidth=5, | |
| batch_size=1, | |
| height_ths=0.5, | |
| width_ths=0.5, | |
| min_size=20, | |
| text_threshold=0.3, | |
| link_threshold=0.3) | |
| detected_texts = [] | |
| for (bbox, text, confidence) in results: | |
| if confidence > 0.4 and len(text.strip()) > 1: | |
| clean_text = text.strip().lower() | |
| if len(bbox) >= 4: | |
| y_coords = [point[1] for point in bbox] | |
| text_height = max(y_coords) - min(y_coords) | |
| distance = self.calculate_text_distance(text_height) | |
| distance_category = self.get_distance_category(distance) | |
| is_important = any(keyword in clean_text for keyword in self.important_keywords) | |
| detected_texts.append({ | |
| 'type': 'text', | |
| 'text': clean_text, | |
| 'confidence': confidence, | |
| 'bbox': bbox, | |
| 'position': self.get_text_position(bbox), | |
| 'distance': distance, | |
| 'distance_category': distance_category, | |
| 'is_important': is_important, | |
| 'priority': 10 if is_important else 2 | |
| }) | |
| return detected_texts | |
| except Exception as e: | |
| print(f"Text detection error: {e}") | |
| return [] | |
| def get_text_position(self, bbox): | |
| """Determine text position""" | |
| if isinstance(bbox, list) and len(bbox) == 4: | |
| x_coords = [point[0] for point in bbox] | |
| x_center = sum(x_coords) / len(x_coords) | |
| third = self.frame_width / 3 | |
| if x_center < third: | |
| return "left" | |
| elif x_center < 2 * third: | |
| return "center" | |
| else: | |
| return "right" | |
| return "center" | |
| def calculate_text_distance(self, bbox_height): | |
| """Estimate text distance""" | |
| if bbox_height <= 0: | |
| return 10.0 | |
| distance = (self.text_size_reference * 2.0) / bbox_height | |
| return max(0.5, min(distance, 15.0)) | |
| def get_distance_category(self, distance): | |
| """Convert distance to category""" | |
| if distance < 2: | |
| return "very close" | |
| elif distance < 4: | |
| return "close" | |
| elif distance < 7: | |
| return "moderate distance" | |
| elif distance < 10: | |
| return "far" | |
| else: | |
| return "very far" | |
| def calculate_object_distance(self, bbox_height, object_type="person"): | |
| """Estimate object distance""" | |
| reference_sizes = { | |
| 'person': 1.7, 'vehicle': 1.5, 'bicycle': 1.0, | |
| 'animal': 0.5, 'chair': 1.0, 'bench': 1.0, | |
| 'pole': 2.0, 'default': 1.0 | |
| } | |
| real_height = reference_sizes.get(object_type, reference_sizes['default']) | |
| focal_length = 500 | |
| if bbox_height > 0: | |
| distance = (focal_length * real_height) / bbox_height | |
| return max(0.5, min(distance, 20)) | |
| return 20 | |
| def get_object_position(self, bbox): | |
| """Determine object position""" | |
| x_center = (bbox[0] + bbox[2]) / 2 | |
| third = self.frame_width / 3 | |
| if x_center < third: | |
| return "left" | |
| elif x_center < 2 * third: | |
| return "center" | |
| else: | |
| return "right" | |
| def get_comprehensive_priority(self, item): | |
| """Calculate comprehensive priority""" | |
| base_priority = self.object_priority.get(item.get('label', 'object'), 1) | |
| distance = item.get('distance', 10) | |
| distance_factor = max(0, 10 - distance) / 2 | |
| position = item.get('position', 'right') | |
| position_factor = 2 if position == 'center' else 1 | |
| if item.get('type') == 'text': | |
| if item.get('is_important', False): | |
| return 10 + distance_factor | |
| else: | |
| return 5 + distance_factor | |
| return base_priority * position_factor + distance_factor | |
| def generate_comprehensive_announcement(self, all_detections): | |
| """Generate balanced announcements""" | |
| if not all_detections: | |
| return "Path clear" | |
| messages = [] | |
| all_detections.sort(key=self.get_comprehensive_priority, reverse=True) | |
| announced_count = 0 | |
| max_announcements = 4 | |
| for item in all_detections: | |
| if announced_count >= max_announcements: | |
| break | |
| item_type = item.get('type', 'object') | |
| if item_type == 'text': | |
| text = item['text'] | |
| position = item['position'] | |
| distance_category = item['distance_category'] | |
| if item['is_important']: | |
| messages.append(f"IMPORTANT: {text} {distance_category} on your {position}") | |
| else: | |
| messages.append(f"Sign: {text} {distance_category} on your {position}") | |
| announced_count += 1 | |
| else: | |
| if announced_count < max_announcements: | |
| label = item['label'] | |
| position = item['position'] | |
| distance_category = item['distance_category'] | |
| if position == "center" and item['distance'] < 3: | |
| messages.append(f"Warning! {label} directly ahead, {distance_category}") | |
| else: | |
| messages.append(f"{label} on your {position}, {distance_category}") | |
| announced_count += 1 | |
| center_objects = [item for item in all_detections | |
| if item.get('position') == 'center' and item.get('distance', 10) < 3] | |
| if center_objects and len(messages) < 5: | |
| left_count = sum(1 for item in all_detections[:6] if item.get('position') == 'left') | |
| right_count = sum(1 for item in all_detections[:6] if item.get('position') == 'right') | |
| if left_count < right_count: | |
| messages.append("Consider moving left") | |
| elif right_count < left_count: | |
| messages.append("Consider moving right") | |
| return ". ".join(messages) | |
| def speak_gtts(self, text, timestamp=None): | |
| """Text-to-speech using gTTS""" | |
| if not text or self.speaking: | |
| return | |
| with self.audio_lock: | |
| self.speaking = True | |
| try: | |
| if timestamp is None: | |
| if self.video_start_time: | |
| timestamp = time.time() - self.video_start_time | |
| else: | |
| timestamp = 0 | |
| minutes = int(timestamp // 60) | |
| seconds = int(timestamp % 60) | |
| timestamp_str = f"{minutes:02d}:{seconds:02d}" | |
| print(f"π [{timestamp_str}] GUIDANCE: {text}") | |
| tts = gTTS(text=text, lang='en', slow=False) | |
| audio_filename = f"audio_{timestamp_str.replace(':', '-')}_{int(time.time() * 1000)}.mp3" | |
| tts.save(audio_filename) | |
| self.audio_files.append(audio_filename) | |
| self.audio_timestamps.append({ | |
| 'filename': audio_filename, | |
| 'timestamp': timestamp, | |
| 'timestamp_str': timestamp_str, | |
| 'text': text | |
| }) | |
| except Exception as e: | |
| print(f"β οΈ Speech generation error: {e}") | |
| finally: | |
| self.speaking = False | |
| time.sleep(0.5) | |
| def process_frame(self, frame): | |
| """Process video frame""" | |
| self.frame_height, self.frame_width = frame.shape[:2] | |
| seg_map = self.perform_semantic_segmentation(frame) | |
| seg_analysis = self.analyze_segmentation_map(seg_map) | |
| results = self.model(frame, conf=0.4, verbose=False) | |
| all_detections = [] | |
| objects_info = [] | |
| text_info = [] | |
| # Process YOLO detections | |
| for result in results: | |
| boxes = result.boxes | |
| for box in boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| conf = float(box.conf[0]) | |
| cls = int(box.cls[0]) | |
| label = self.model.names[cls] | |
| if label.lower() in self.navigation_classes: | |
| nav_label = self.navigation_classes[label.lower()] | |
| bbox_height = y2 - y1 | |
| distance = self.calculate_object_distance(bbox_height, nav_label) | |
| distance_category = self.get_distance_category(distance) | |
| position = self.get_object_position([x1, y1, x2, y2]) | |
| object_info = { | |
| 'type': 'object', | |
| 'label': nav_label, | |
| 'distance': distance, | |
| 'distance_category': distance_category, | |
| 'position': position, | |
| 'bbox': [x1, y1, x2, y2], | |
| 'confidence': conf, | |
| 'priority': self.object_priority.get(nav_label, 1) | |
| } | |
| objects_info.append(object_info) | |
| all_detections.append(object_info) | |
| # Draw bounding box | |
| if nav_label == 'vehicle': | |
| color = (0, 0, 255) | |
| elif nav_label == 'person': | |
| color = (0, 255, 255) | |
| elif nav_label == 'bicycle': | |
| color = (255, 0, 0) | |
| else: | |
| color = (0, 255, 0) | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) | |
| label_text = f"{nav_label.upper()} {distance_category}" | |
| (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| cv2.rectangle(frame, (x1, y1-th-10), (x1+tw+10, y1), color, -1) | |
| cv2.putText(frame, label_text, (x1+5, y1-5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| # Detect text | |
| current_time = time.time() | |
| if (current_time - self.last_announcement) > 1.5: | |
| text_info = self.detect_text_easyocr(frame) | |
| new_texts = [] | |
| for text_data in text_info: | |
| text_hash = hash(text_data['text'][:20]) | |
| if text_hash not in self.detected_items: | |
| new_texts.append(text_data) | |
| self.detected_items.add(text_hash) | |
| text_info = new_texts | |
| all_detections.extend(text_info) | |
| # Draw text bounding boxes | |
| for text_data in text_info: | |
| bbox = text_data['bbox'] | |
| text = text_data['text'] | |
| is_important = text_data['is_important'] | |
| color = (255, 0, 255) if is_important else (255, 255, 0) | |
| thickness = 3 if is_important else 2 | |
| pts = np.array(bbox, np.int32) | |
| pts = pts.reshape((-1, 1, 2)) | |
| cv2.polylines(frame, [pts], True, color, thickness) | |
| label_text = f"π© {text}" if is_important else f"TEXT: {text}" | |
| x_coords = [point[0] for point in bbox] | |
| y_coords = [point[1] for point in bbox] | |
| text_x = int(min(x_coords)) | |
| text_y = int(min(y_coords)) - 10 | |
| if text_y < 20: | |
| text_y = int(max(y_coords)) + 25 | |
| (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| cv2.rectangle(frame, (text_x, text_y-th-5), (text_x+tw+10, text_y+5), color, -1) | |
| cv2.putText(frame, label_text, (text_x+5, text_y), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| # Generate navigation message | |
| message = None | |
| if (current_time - self.last_announcement) > self.announcement_cooldown: | |
| seg_guidance = self.generate_segmentation_guidance(seg_analysis) | |
| object_message = self.generate_comprehensive_announcement(all_detections) | |
| if seg_guidance and "obstructed" in seg_guidance.lower(): | |
| message = f"{seg_guidance}. {object_message}" | |
| elif seg_guidance and object_message == "Path clear": | |
| message = seg_guidance | |
| else: | |
| message = object_message | |
| if message and message != "Path clear": | |
| threading.Thread(target=self.speak_gtts, args=(message,)).start() | |
| self.last_announcement = current_time | |
| # Status overlay | |
| overlay = frame.copy() | |
| cv2.rectangle(overlay, (5, 5), (500, 35), (0, 0, 0), -1) | |
| cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame) | |
| status_text = f"Objects: {len(objects_info)} | Texts: {len(text_info)}" | |
| cv2.putText(frame, status_text, (15, 28), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) | |
| # Draw center danger zone | |
| center_objects = [obj for obj in objects_info if obj['position'] == 'center' and obj['distance'] < 3] | |
| if center_objects: | |
| cv2.rectangle(frame, (self.frame_width//3, self.frame_height-100), | |
| (2*self.frame_width//3, self.frame_height-10), (0, 0, 255), 3) | |
| cv2.putText(frame, "OBSTACLE IN PATH", (self.frame_width//3 + 20, self.frame_height-50), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) | |
| return frame, message, len(objects_info), len(text_info) | |
| def process_video(self, video_path, output_path='output_navigation.mp4'): | |
| """Process uploaded video""" | |
| cap = cv2.VideoCapture(video_path) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| print(f"Processing video: {total_frames} frames at {fps} FPS") | |
| self.audio_timestamps = [] | |
| self.audio_files = [] | |
| self.detected_items = set() | |
| self.video_start_time = time.time() | |
| frame_count = 0 | |
| try: | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| processed_frame, message, obj_count, text_count = self.process_frame(frame) | |
| out.write(processed_frame) | |
| frame_count += 1 | |
| if frame_count % 30 == 0: | |
| progress = (frame_count / total_frames) * 100 | |
| print(f"Progress: {progress:.1f}%") | |
| finally: | |
| cap.release() | |
| out.release() | |
| print(f"β Video processing complete!") | |
| if self.audio_timestamps: | |
| final_output = 'final_with_audio.mp4' | |
| return self.merge_audio_into_video(output_path, final_output) | |
| else: | |
| return output_path | |
| def merge_audio_into_video(self, video_path, output_path='final_with_audio.mp4'): | |
| """Merge audio into video""" | |
| print("π΅ Merging audio into video...") | |
| if not self.audio_timestamps: | |
| return video_path | |
| try: | |
| video = VideoFileClip(video_path) | |
| video_duration = video.duration | |
| audio_clips = [] | |
| for audio_info in self.audio_timestamps: | |
| if os.path.exists(audio_info['filename']): | |
| try: | |
| audio_clip = AudioFileClip(audio_info['filename']) | |
| audio_clip = audio_clip.set_start(audio_info['timestamp']) | |
| audio_clips.append(audio_clip) | |
| except Exception as e: | |
| print(f"β οΈ Failed to load {audio_info['filename']}: {e}") | |
| if not audio_clips: | |
| return video_path | |
| final_audio = CompositeAudioClip(audio_clips) | |
| final_audio = final_audio.set_duration(video_duration) | |
| final_video = video.set_audio(final_audio) | |
| final_video.write_videofile( | |
| output_path, | |
| codec='libx264', | |
| audio_codec='aac', | |
| fps=video.fps, | |
| verbose=False, | |
| logger=None | |
| ) | |
| video.close() | |
| final_video.close() | |
| final_audio.close() | |
| for clip in audio_clips: | |
| clip.close() | |
| print(f"β Video with audio saved!") | |
| return output_path | |
| except Exception as e: | |
| print(f"β Error merging audio: {e}") | |
| return video_path | |
| # Initialize the system | |
| nav_system = AudioNavigationSystem() | |
| def process_video_gradio(video_file): | |
| """Gradio interface function""" | |
| try: | |
| if video_file is None: | |
| return None, "Please upload a video file" | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_input: | |
| tmp_input.write(video_file) | |
| input_path = tmp_input.name | |
| # Check video duration | |
| cap = cv2.VideoCapture(input_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) | |
| duration = frame_count / fps if fps > 0 else 0 | |
| cap.release() | |
| if duration > 15: | |
| return None, f"β οΈ Video is {duration:.1f} seconds long. Please upload a video shorter than 15 seconds." | |
| # Process video | |
| output_path = nav_system.process_video(input_path) | |
| # Generate transcript | |
| transcript_text = "Audio Guidance Transcript:\n\n" | |
| for item in nav_system.audio_timestamps: | |
| transcript_text += f"[{item['timestamp_str']}] {item['text']}\n\n" | |
| return output_path, transcript_text | |
| except Exception as e: | |
| return None, f"Error processing video: {str(e)}" | |
| # Create Gradio interface | |
| with gr.Blocks(title="Blind Assistance AI", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π¦― Blind Assistance AI - Video Navigation System | |
| Upload a video to receive audio navigation guidance with object detection, text recognition, and scene analysis. | |
| β οΈ **Important:** Please upload videos **shorter than 15 seconds** for optimal processing. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.Video(label="Upload Video (Max 15 seconds)") | |
| process_btn = gr.Button("Process Video", variant="primary", size="lg") | |
| with gr.Column(): | |
| video_output = gr.Video(label="Processed Video with Audio Guidance") | |
| transcript_output = gr.Textbox(label="Audio Transcript", lines=10) | |
| gr.Markdown(""" | |
| ### Features: | |
| - π― **Object Detection**: Identifies people, vehicles, and obstacles | |
| - π **Text Detection & OCR**: Reads signs, labels, and important text | |
| - πΊοΈ **Scene Analysis**: Understands environment and context | |
| - π **Voice Guidance**: Real-time audio navigation instructions | |
| """) | |
| process_btn.click( | |
| fn=process_video_gradio, | |
| inputs=[video_input], | |
| outputs=[video_output, transcript_output] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() |