import time import io import gradio as gr import cv2 import base64 import openai from langchain.prompts import PromptTemplate from langchain.chat_models import ChatOpenAI from langchain.schema import StrOutputParser from PIL import Image from prompts import VISION_SYSTEM_PROMPT, AUDIO_SYSTEM_PROMPT, USER_PROMPT_TEMPLATE, FINAL_EVALUATION_PROMPT global_dict = {} ###### # SETTINGS VIDEO_FRAME_LIMIT = 2000 ###### def validate_api_key(api_key): client = openai.OpenAI(api_key=api_key) try: # Make your OpenAI API request here response = client.chat.completions.create( model="gpt-4", messages=[ {"role": "user", "content": "Hello world"}, ] ) except openai.RateLimitError as e: # Handle rate limit error (we recommend using exponential backoff) print(f"OpenAI API request exceeded rate limit: {e}") response = None error = e pass except openai.APIConnectionError as e: # Handle connection error here print(f"Failed to connect to OpenAI API: {e}") response = None error = e pass except openai.APIError as e: # Handle API error here, e.g. retry or log print(f"OpenAI API returned an API Error: {e}") response = None error = e pass if response: return True else: raise gr.Error(f"OpenAI returned an API Error: {error}") def _process_video(video_file): # Read and process the video file video = cv2.VideoCapture(video_file.name) if 'video_file' not in global_dict: global_dict.setdefault('video_file', video_file.name) else: global_dict['video_file'] = video_file.name base64Frames = [] while video.isOpened(): success, frame = video.read() if not success: break _, buffer = cv2.imencode(".jpg", frame) base64Frames.append(base64.b64encode(buffer).decode("utf-8")) video.release() if len(base64Frames) > VIDEO_FRAME_LIMIT: raise gr.Warning(f"Video's play time is too long. (>1m)") print(len(base64Frames), "frames read.") if not base64Frames: raise gr.Error(f"Cannot open the video.") return base64Frames def _make_video_batch(video_file, batch_size, total_batch_percent): frames = _process_video(video_file) TOTAL_FRAME_COUNT = len(frames) BATCH_SIZE = int(batch_size) TOTAL_BATCH_SIZE = int(TOTAL_FRAME_COUNT * total_batch_percent / 100) BATCH_STEP = int(TOTAL_FRAME_COUNT / TOTAL_BATCH_SIZE) base64FramesBatch = [] for idx in range(0, TOTAL_FRAME_COUNT, BATCH_STEP * BATCH_SIZE): # print(f'## {idx}') temp = [] for i in range(BATCH_SIZE): # print(f'# {idx + BATCH_STEP * i}') if (idx + BATCH_STEP * i) < TOTAL_FRAME_COUNT: temp.append(frames[idx + BATCH_STEP * i]) else: continue base64FramesBatch.append(temp) for idx, batch in enumerate(base64FramesBatch): # assert len(batch) <= BATCH_SIZE print(f'##{idx} - batch_size: {len(batch)}') if 'batched_frames' not in global_dict: global_dict.setdefault('batched_frames', base64FramesBatch) else: global_dict['batched_frames'] = base64FramesBatch return base64FramesBatch def show_batches(video_file, batch_size, total_batch_percent): batched_frames = _make_video_batch(video_file, batch_size, total_batch_percent) images = [] for i, l in enumerate(batched_frames): print(f"#### Batch_{i+1}") for j, img in enumerate(l): print(f'## Image_{j+1}') image_bytes = base64.b64decode(img.encode("utf-8")) # Convert the bytes to a stream (file-like object) image_stream = io.BytesIO(image_bytes) # Open the image as a PIL image image = Image.open(image_stream) images.append((image, f"batch {i+1}")) print("-"*100) return images def show_audio_transcript(video_file, api_key): previous_video_file = global_dict.get('video_file') if global_dict.get('transcript') and previous_video_file == video_file.name: return global_dict['transcript'] else: audio_file = open(video_file.name, "rb") client = openai.OpenAI(api_key=api_key) transcript = client.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="text" ) if 'transcript' not in global_dict: global_dict.setdefault('transcript', transcript) else: global_dict['transcript'] = transcript return transcript def change_audio_rubric(choice): print(choice) if choice == "Video only": return gr.Textbox( visible=False ) else: return gr.Textbox( label="3. Audio Evaluation Rubric (if needed)", info="Enter your evaluation rubric here...", placeholder="\nHere's what the performer should *SAY* as follows:\n1. From standing, you need to shout 'Start' signal.\n2. Rock forward, you shouldn't make any noise while rolling.\n3. Standing still again, you need to shout 'Finish' signal.", lines=7, interactive=True, visible=True) def change_audio_eval(choice): print(choice) if choice == "Video only": return gr.Textbox( visible=False, ) else: return gr.Textbox( label="Audio Script Eval...", lines=10, interactive=False, visible=True ) def call_gpt_vision(api_key, rubrics, progress=gr.Progress()) -> list: frames = global_dict.get('batched_frames') openai.api_key = api_key full_result_vision = [] full_text_vision = "" idx = 0 for batch in progress.tqdm(frames): VISION_PROMPT_MESSAGES = [ { "role": "system", "content": VISION_SYSTEM_PROMPT, }, { "role": "user", "content": [ PromptTemplate.from_template(USER_PROMPT_TEMPLATE).format(rubrics=rubrics), *map(lambda x: {"image": x, "resize": 300}, batch), ], }, ] params = { "model": "gpt-4-vision-preview", "messages": VISION_PROMPT_MESSAGES, "max_tokens": 1024, } try: result = openai.chat.completions.create(**params) print(result.choices[0].message.content) full_result_vision.append(result) except Exception as e: print(f"Error: {e}") full_text_vision += f'### BATCH_{idx+1}\n' + "-"*50 + "\n" + f"Error: {e}" + "\n" + "-"*50 + "\n" idx += 1 pass if 'full_result_vision' not in global_dict: global_dict.setdefault('full_result_vision', full_result_vision) else: global_dict['full_result_vision'] = full_result_vision print(f'### BATCH_{idx+1}') print('-'*100) full_text_vision += f'### BATCH_{idx+1}\n' + "-"*50 + "\n" + result.choices[0].message.content + "\n" + "-"*50 + "\n" idx += 1 time.sleep(2) return full_text_vision def call_gpt_audio(api_key, rubrics) -> str: transcript = global_dict.get('transcript') openai.api_key = api_key full_text_audio = "" print(f"RUBRIC_AUDIO: {rubrics}") if not rubrics: return full_text_audio else: PROMPT_MESSAGES = [ { "role": "system", "content": AUDIO_SYSTEM_PROMPT, }, { "role": "user", "content": PromptTemplate.from_template(USER_PROMPT_TEMPLATE).format(rubrics=rubrics) + "\n\n\n" + transcript }, ] params = { "model": "gpt-4", "messages": PROMPT_MESSAGES, "max_tokens": 1024, } try: result = openai.chat.completions.create(**params) full_text_audio = result.choices[0].message.content print(full_text_audio) except openai.OpenAIError as e: print(f"Failed to connect to OpenAI: {e}") pass if 'full_text_audio' not in global_dict: global_dict.setdefault('full_text_audio', full_text_audio) else: global_dict['full_text_audio'] = full_text_audio return full_text_audio def get_full_result(): full_result_vision = global_dict.get('full_result_vision') full_result_audio = global_dict.get('full_text_audio') result_text_video = "" result_text_audio = "" for idx, res in enumerate(full_result_vision): result_text_video += f'