Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import time | |
| import openai | |
| import base64 | |
| import pytz | |
| import uuid | |
| from threading import Thread | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime | |
| import json | |
| import os | |
| from gradio_client import Client, file | |
| import subprocess | |
| import ffmpeg | |
| api_key = os.getenv("OPEN_AI_KEY") | |
| user_name = os.getenv("USER_NAME") | |
| password = os.getenv("PASSWORD") | |
| LENGTH = 3 | |
| WEBCAM = 0 | |
| MARKDOWN = """ | |
| # Conntour | |
| """ | |
| AVATARS = ( | |
| "https://uqnmqpvwlbpmdvutucia.supabase.co/storage/v1/object/public/test/square_padding.png?t=2024-12-26T10%3A36%3A46.488Z", | |
| "https://media.roboflow.com/spaces/openai-white-logomark.png" | |
| ) | |
| VIDEO_PATH = "https://uqnmqpvwlbpmdvutucia.supabase.co/storage/v1/object/public/live-cameras/long_sf_junction.mp4?t=2025-01-14T10%3A09%3A14.826Z" | |
| # Set your OpenAI API key | |
| openai.api_key = api_key | |
| MODEL="gpt-4o" | |
| client = openai.OpenAI(api_key=api_key) | |
| # Global variable to stop the video capture loop | |
| stop_capture = False | |
| alerts_mode = True | |
| base_start_time = time.time() | |
| def clip_video_segment_2(input_video_path, start_time, duration): | |
| os.makedirs('videos', exist_ok=True) | |
| output_video_path = f"videos/{uuid.uuid4()}.mp4" | |
| # Use ffmpeg-python to clip the video | |
| try: | |
| ( | |
| ffmpeg | |
| .input(input_video_path, ss=start_time) # Seek to start_time | |
| .output(output_video_path, t=duration, c='copy') # Set the duration | |
| .run(overwrite_output=True) | |
| ) | |
| print('input_video_path', input_video_path, output_video_path) | |
| return output_video_path | |
| except ffmpeg.Error as e: | |
| print(f"Error clipping video: {e}") | |
| return None | |
| def clip_video_segment(input_video_path, start_time, duration): | |
| os.makedirs('videos', exist_ok=True) | |
| output_video_path = f"videos/{uuid.uuid4()}.mp4" | |
| subprocess.call([ | |
| 'ffmpeg', '-y', '-ss', str(start_time), '-i', input_video_path, | |
| '-t', str(duration), '-c', 'copy', output_video_path | |
| ]) | |
| print('input_video_path', input_video_path, output_video_path) | |
| return output_video_path | |
| def encode_to_video_fast(frames, fps): | |
| os.makedirs('videos', exist_ok=True) | |
| video_clip_path = f"videos/{uuid.uuid4()}.mp4" | |
| # Get frame size | |
| height, width, layers = frames[0].shape | |
| size = (width, height) | |
| # Define the codec and create VideoWriter object | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") # You can also try 'XVID', 'MJPG', etc. | |
| out = cv2.VideoWriter(video_clip_path, fourcc, fps, size) | |
| for frame in frames: | |
| out.write(frame) | |
| out.release() | |
| return video_clip_path | |
| # Function to process video frames using GPT-4 API | |
| def process_frames(frames, frames_to_skip = 1): | |
| os.makedirs('saved_frames', exist_ok=True) | |
| curr_frame=0 | |
| base64Frames = [] | |
| while curr_frame < len(frames) - 1: | |
| _, buffer = cv2.imencode(".jpg", frames[curr_frame]) | |
| base64Frames.append(base64.b64encode(buffer).decode("utf-8")) | |
| curr_frame += frames_to_skip | |
| return base64Frames | |
| # Function to check condition using GPT-4 API | |
| def check_condition(prompt, base64Frames): | |
| start_time = time.time() | |
| print('checking condition for frames:', len(base64Frames)) | |
| # Save frames as images | |
| messages = [ | |
| {"role": "system", "content": """You are analyzing video to check if the user's condition is met. | |
| Please respond with a JSON object in the following format: | |
| {"condition_met": true/false, "details": "optional details or summary. in the summary DON'T mention the words: image, images, frame, or frames. Instead, make it look like you were provided with video input and avoid referring to individual images or frames explicitly."}"""}, | |
| {"role": "user", "content": [prompt, *map(lambda x: {"type": "image_url", "image_url": {"url": f'data:image/jpg;base64,{x}', "detail": "low"}}, base64Frames)]} | |
| ] | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=messages, | |
| temperature=0, | |
| response_format={ "type": "json_object" } | |
| ) | |
| end_time = time.time() | |
| processing_time = end_time - start_time | |
| frames_count = len(base64Frames) | |
| api_response = response.choices[0].message.content | |
| try: | |
| jsonNew = json.loads(api_response) | |
| print('result', response.usage.total_tokens, jsonNew) | |
| return frames_count, processing_time, jsonNew | |
| except: | |
| print('result', response.usage.total_tokens, api_response) | |
| return frames_count, processing_time, api_response | |
| # Function to process video clip and update the chatbot | |
| def process_clip(prompt, frames, chatbot, id): | |
| # Print current time in Israel | |
| israel_tz = pytz.timezone('Asia/Jerusalem') | |
| start_time = datetime.now(israel_tz).strftime('%H:%M:%S') | |
| print("[Start]:", start_time, len(frames), id) | |
| # Encode frames into a video clip | |
| fps = int(len(frames) / LENGTH) | |
| base64Frames = process_frames(frames, fps) | |
| frames_count, processing_time, api_response = check_condition(prompt, base64Frames) | |
| if api_response["condition_met"] == True: | |
| finish_time = datetime.now(israel_tz).strftime('%H:%M:%S') | |
| # video_clip_path = encode_to_video_fast(frames, fps) | |
| video_clip_path = clip_video_segment_2(VIDEO_PATH, id*LENGTH, LENGTH) | |
| chatbot.append(((video_clip_path,), None)) | |
| chatbot.append((f"ID: {id}. Time: {start_time}\nDetails: {api_response.get('details', '')}", None)) | |
| frame_paths = [] | |
| for i, base64_frame in enumerate(base64Frames): | |
| frame_data = base64.b64decode(base64_frame) | |
| frame_path = f'saved_frames/frame_{uuid.uuid4()}.jpg' | |
| with open(frame_path, "wb") as f: | |
| f.write(frame_data) | |
| frame_paths.append(frame_path) | |
| def process_clip_from_file(prompt, frames, chatbot, fps, video_path, id): | |
| global stop_capture | |
| if not stop_capture: | |
| israel_tz = pytz.timezone('Asia/Jerusalem') | |
| start_time = datetime.now(israel_tz).strftime('%H:%M:%S') | |
| print("[Start]:", start_time, len(frames)) | |
| frames_to_skip = int(fps) | |
| base64Frames = process_frames(frames, frames_to_skip) | |
| frames_count, processing_time, api_response = check_condition(prompt, base64Frames) | |
| result = None | |
| if api_response and api_response.get("condition_met", False): | |
| # video_clip_path = encode_to_video_fast(frames, fps) | |
| video_clip_path = clip_video_segment_2(video_path, id*LENGTH, LENGTH) | |
| chatbot.append(((video_clip_path,), None)) | |
| chatbot.append((f"Event ID: {id+1}\nDetails: {api_response.get('details', '')}", None)) | |
| yield chatbot | |
| return chatbot | |
| # Function to capture video frames | |
| def analyze_stream(prompt, chatbot): | |
| global stop_capture | |
| global base_start_time | |
| stop_capture = False | |
| video_start = int(time.time() - base_start_time) % 180 | |
| # stream = "https://streamapi2.eu.loclx.io/video_feed/101" | |
| stream = VIDEO_PATH | |
| cap = cv2.VideoCapture(stream or WEBCAM) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, video_start*fps) | |
| print("Vide start", video_start, fps) | |
| frames = [] | |
| start_time = time.time() | |
| id = 0 | |
| while not stop_capture: | |
| ret, frame = cap.read() | |
| if not ret: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
| frames.append(frame) | |
| # Sample the frames every 5 seconds | |
| if time.time() - start_time >= LENGTH: | |
| # Start a new thread for processing the video clip | |
| Thread(target=process_clip, args=(prompt, frames.copy(), chatbot, id)).start() | |
| frames = [] | |
| start_time = time.time() | |
| id=id+1 | |
| yield chatbot | |
| cap.release() | |
| return chatbot | |
| def analyze_video_file(prompt, chatbot): | |
| global stop_capture | |
| stop_capture = False # Reset the stop flag when analysis starts | |
| video_path = VIDEO_PATH | |
| cap = cv2.VideoCapture(video_path) | |
| # Get video properties | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) # Frames per second | |
| frames_per_chunk = fps * LENGTH # Number of frames per 5-second chunk | |
| frames = [] | |
| chunk = 0 | |
| # Create a thread pool for concurrent processing | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| futures = [] | |
| while not stop_capture: | |
| ret, frame = cap.read() | |
| if not ret: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
| frames.append(frame) | |
| # Split the video into chunks of frames corresponding to 5 seconds | |
| if len(frames) >= frames_per_chunk: | |
| futures.append(executor.submit(process_clip_from_file, prompt, frames.copy(), chatbot, fps, video_path, chunk)) | |
| frames = [] | |
| chunk+=1 | |
| # If any remaining frames that are less than 5 seconds, process them as a final chunk | |
| if len(frames) > 0: | |
| futures.append(executor.submit(process_clip_from_file, prompt, frames.copy(), chatbot, fps, video_path, chunk)) | |
| chunk+=1 | |
| cap.release() | |
| # Yield results as soon as each thread completes | |
| for future in as_completed(futures): | |
| result = future.result() | |
| yield result | |
| return chatbot | |
| # Function to stop video capture | |
| def stop_capture_func(): | |
| global stop_capture | |
| stop_capture = True | |
| # Gradio interface | |
| with gr.Blocks(title="Conntour", fill_height=True) as demo: | |
| with gr.Row(): | |
| with gr.Column(): | |
| chatbot = gr.Chatbot(label="Events", bubble_full_width=False, avatar_images=AVATARS, height=700) | |
| prompt = gr.Textbox(label="Enter your prompt alert") | |
| start_btn = gr.Button("Start") | |
| stop_btn = gr.Button("Stop") | |
| start_btn.click(analyze_stream, inputs=[prompt, chatbot], outputs=[chatbot], queue=True) | |
| stop_btn.click(stop_capture_func) | |
| demo.launch(favicon_path='favicon.ico', auth=(user_name, password)) |