Spaces:
Sleeping
Sleeping
import cv2 | |
import streamlit as st | |
import tempfile | |
import base64 | |
import os | |
from dotenv import load_dotenv | |
from openai import OpenAI | |
import assemblyai as aai | |
from moviepy.editor import * | |
# Load environment variables | |
load_dotenv() | |
aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY") | |
OpenAI.api_key = os.getenv("OPENAI_API_KEY") | |
client = OpenAI() | |
def main(): | |
st.title('Insightly Video Content Moderation') | |
# Video upload section | |
uploaded_video = st.file_uploader('Upload a video', type=["mp4", "avi", "mov"]) | |
if uploaded_video is not None: | |
# Save the video to a temp file | |
tfile = tempfile.NamedTemporaryFile(delete=False) | |
tfile.write(uploaded_video.read()) | |
video_file_path = tfile.name | |
tfile.close() | |
transcriber = aai.Transcriber() | |
transcript = transcriber.transcribe(tfile.name) | |
# Process the video and display frames in a grid layout | |
base64_frames = video_to_base64_frames(video_file_path) | |
display_frame_grid(base64_frames[::30]) # Display every 30th frame in a 3-column grid | |
st.write("Actions:") # Header for the actions/buttons section | |
spacer_col1, col1, spacer_col2, col2, spacer_col3 = st.columns([1, 2, 1, 2, 1]) | |
with col1: | |
if st.button("Description"): | |
st.session_state['description'] = generate_description(base64_frames) if 'description' not in st.session_state else st.session_state['description'] | |
with col2: | |
if st.button("Generate Transcript"): | |
transcript = transcriber.transcribe(video_file_path) | |
st.session_state['transcript'] = transcript.text if 'transcript' not in st.session_state else st.session_state['transcript'] | |
# If any value exists in session state then display it | |
if 'description' in st.session_state and st.session_state['description']: | |
st.subheader("Video Description") | |
st.write(st.session_state['description']) | |
if 'transcript' in st.session_state and st.session_state['transcript']: | |
st.subheader("Video Transcript") | |
st.write(st.session_state['transcript']) | |
def video_to_base64_frames(video_file_path): | |
# Logic to extract all frames from the video and convert them to base64 | |
video = cv2.VideoCapture(video_file_path) | |
base64_frames = [] | |
while video.isOpened(): | |
success, frame = video.read() | |
if not success: | |
break | |
_, buffer = cv2.imencode('.jpg', frame) | |
base64_frame = base64.b64encode(buffer).decode('utf-8') | |
base64_frames.append(base64_frame) | |
video.release() | |
return base64_frames | |
######################################### | |
#Generate Video description | |
def generate_description(base64_frames): | |
prompt_messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
"1. Generate a description for this sequence of video frames in about 100 words.\ | |
Return the following : 2. Frame by frame summary of what's happening in the video. 3. List of objects in the video. 4. Any restrictive content or sensitive content and if so which frame 5. What category can this video be tagged to?", | |
*map(lambda x: {"image": x, "resize": 428}, base64_frames[0::30]), | |
], | |
}, | |
] | |
response = client.chat.completions.create( | |
model="gpt-4-vision-preview", | |
messages=prompt_messages, | |
max_tokens=3000, | |
) | |
return response.choices[0].message.content | |
######################## | |
def display_frame_grid(base64_frames): | |
cols_per_row = 3 | |
n_frames = len(base64_frames) | |
for idx in range(0, n_frames, cols_per_row): | |
cols = st.columns(cols_per_row) | |
for col_index in range(cols_per_row): | |
frame_idx = idx + col_index | |
if frame_idx < n_frames: | |
with cols[col_index]: | |
frame = base64_frames[frame_idx] | |
st.image(base64.b64decode(frame), caption=f'Frame {frame_idx * 30 + 1}', width=200) | |
if __name__ == '__main__': | |
main() |