Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import time | |
from yt_dlp import YoutubeDL | |
import ffmpeg | |
import tempfile | |
from utils import (save_uploaded_file, extract_audio, | |
download_youtube, get_session_dir, | |
cleanup_session_files, get_session_id, | |
get_temp_dir, get_features, proc_raw_audio) | |
st.title('📥📄 Step 1: Upload Video & Preprocess') | |
# Initialize session state defaults | |
defaults = { | |
'uploaded_file': None, | |
'video_path': None, | |
'audio_path': None, | |
'ocr_text': None, | |
'transcript': None, | |
'summary': None, | |
'main_topic': None, | |
'input_method': 'Upload', | |
'input_title': None, | |
'video_input_path': None, | |
'video_url': None, | |
# 'audio_wav': None, | |
# 'audio_file': None, | |
} | |
for key, value in defaults.items(): | |
st.session_state.setdefault(key, value) | |
# --- Option to clear previous session --- | |
st.sidebar.write('Current Session ID:') | |
st.sidebar.write(f'`{get_session_id()}`') # session ID for debugging | |
if st.sidebar.button('Start New Session'): | |
session_id = get_session_id() # get current ID before clearing | |
cleanup_session_files(session_id) | |
for key in list(st.session_state.keys()): | |
del st.session_state[key] # clear all session state | |
st.rerun() # rerun the script to reflect cleared state | |
# --- Main Topic --- | |
st.session_state.main_topic = st.text_input('(optional) Provide video topic:', st.session_state.main_topic) | |
# st.session_state.main_topic = m | |
# col_url, col_start_from = st.columns([5, 2]) | |
# video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) | |
# start_from = col_start_from.number_input( | |
# 'Start From:', | |
# min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], | |
# help='Time shift from the beginning (in seconds)' | |
# ) | |
# if video_url: | |
# st.session_state.video_url = video_url | |
# st.session_state.video_input_path = '' # clear path if URL is used | |
# --- Video source selection --- | |
input_method = st.radio( | |
'Select Input Method:', | |
('Upload', 'YouTube'), | |
key='input_method', | |
horizontal=True | |
) | |
video_path = None | |
uploaded_file = None | |
video_url = None | |
if input_method == 'Upload': | |
uploaded_file = st.file_uploader( | |
'Choose a video file', | |
type=['mp4', 'avi', 'mkv', 'mov'] | |
) | |
if uploaded_file: | |
col_info, col_ready = st.columns(2) | |
# Display basic file info | |
col_info.info('**[ File Details ]** ' + | |
f'name: `{uploaded_file.name}` | ' + | |
f'type: `{uploaded_file.type}` | ' + | |
f'size: `{uploaded_file.size / (1024 * 1024):.2f} MB`') | |
# Save uploaded file temporarily for the Prefect flow | |
temp_dir = get_temp_dir() # use a shared temp location | |
# Use a unique name to avoid conflicts if multiple users run simultaneously | |
target_path = os.path.join(temp_dir, f'upload_{get_session_id()}_{uploaded_file.name}') | |
try: | |
with open(target_path, 'wb') as f: | |
f.write(uploaded_file.getbuffer()) | |
st.session_state.video_input_path = target_path | |
st.session_state.video_input_title = uploaded_file.name | |
st.session_state.video_url = '' # clear URL if file is uploaded | |
st.session_state.transcript = None | |
st.session_state.summary = None | |
col_ready.info('Ready for processing.') | |
except Exception as e: | |
col_ready.error(f'Error saving uploaded file: {e}') | |
st.session_state.video_input_path = '' | |
elif input_method == 'YouTube': | |
#-- Obtain audio from YouTube video | |
example_youtube = { | |
'title': 'Общественное движение', | |
'url': 'https://www.youtube.com/watch?v=c3bhkrKF6F4', | |
'start': 0.0 | |
} | |
col_url, col_start_from = st.columns([5, 2]) | |
video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) | |
start_from = col_start_from.number_input( | |
'Start From:', | |
min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], | |
help='Time shift from the beginning (in seconds)' | |
) | |
if video_url: | |
st.session_state.video_url = video_url | |
st.session_state.video_input_path = '' # clear path if URL is used | |
def ui_processed_sound(audio_wav, audio_np): | |
'''UI to show sound processing results''' | |
st.audio(audio_wav) | |
features = get_features(audio_np) | |
def extract_videofile(video_file): | |
# video_buffer = BytesIO(video_file.read()) | |
# audio_data = VideoFileClip(video_buffer.name).audio | |
# raw_source = StringIO(video_file.getvalue().decode('utf-8')) | |
# raw_source = video_file.getvalue().decode('utf-8') | |
# raw_source = video_file.read() | |
# raw_source = BytesIO(video_file.getvalue()) | |
#-- Get video | |
# out, err = ( | |
# ffmpeg | |
# .input(video_file, ss=start_from) | |
# .output('temp.mp4', vcodec='copy') | |
# .overwrite_output() | |
# .run() | |
# ) | |
# st.video('temp.mp4') | |
# video = VideoFileClip(video_file) | |
# audio = video.audio | |
# audio.write_audiofile('output_audio.mp3') | |
tfile = tempfile.NamedTemporaryFile(delete=False) | |
tfile.write(video_file.read()) | |
#-- Get audio | |
# SAMPLE_RATE = 16000 | |
audio_data, err = ( | |
ffmpeg | |
.input(tfile.name, ss=start_from) | |
.output('pipe:', format='wav')#, acodec='pcm_s16le') | |
# .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) | |
# .global_args('-nostdin', '-threads', '0') | |
.run(capture_stdout=True) | |
) | |
if err: | |
raise RuntimeError(f'Failed to load audio: {err.decode()}') | |
return audio_data | |
def extract_youtube(raw_url): | |
#-- Get video | |
# out, err = ( | |
# ffmpeg | |
# .input(raw_url, ss=start_from) | |
# .output('temp.mp4', vcodec='copy') | |
# .overwrite_output() | |
# .run() | |
# ) | |
# st.video('temp.mp4') | |
#-- Get audio | |
# SAMPLE_RATE = 16000 | |
audio_data, err = ( | |
ffmpeg | |
.input(raw_url, ss=start_from) | |
.output('pipe:', format='wav')#, acodec='pcm_s16le') | |
# .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) | |
.global_args('-nostdin', '-threads', '0') | |
.run(capture_stdout=True) | |
) | |
if err: | |
raise RuntimeError(f'Failed to load audio: {err.decode()}') | |
return audio_data | |
# --- Processing Button --- | |
_, col_button_process, _ = st.columns([2, 1, 2]) | |
if col_button_process.button('Process video', | |
type='primary', | |
use_container_width=True, | |
disabled=not (st.session_state.video_input_path or st.session_state.video_url) | |
): | |
# Clear previous paths if reprocessing | |
st.session_state['video_path'] = None | |
st.session_state['audio_path'] = None | |
col_info, col_complete, col_next = st.columns(3) | |
with st.spinner('Processing video input..'): | |
if st.session_state['input_method'] == 'Upload' and uploaded_file: | |
st.session_state.uploaded_file = uploaded_file | |
video = uploaded_file | |
# audio_data = extract_videofile(uploaded_file) | |
saved_path = save_uploaded_file(uploaded_file) | |
if saved_path: | |
st.session_state['video_path'] = saved_path | |
col_info.success(f'Video saved temporarily to: {os.path.basename(saved_path)}') | |
else: | |
col_info.error('Failed to save uploaded file') | |
elif st.session_state['input_method'] == 'YouTube' and video_url: | |
try: | |
with YoutubeDL({'format': 'best+bestaudio'}) as ydl: | |
info = ydl.extract_info(video_url, download=False) | |
except Exception as e: | |
st.error(e) | |
else: | |
d = info['duration'] | |
h, m, s = d // 3600, (d % 3600) // 60, d % 60 | |
time_str = [] | |
if h: time_str.append(f'{h}h') | |
if m: time_str.append(f'{m}m') | |
if s or not time_str: time_str.append(f'{s}s') | |
time_str = ' '.join(time_str) | |
st.write(f"<small><div style='float: center; text-align: center'>\ | |
**Title:** [{info['title']}]({video_url})\ | |
**Duration:** {info['duration']} sec.</div></small>", | |
unsafe_allow_html=True) | |
video = video_url | |
# audio_data = extract_youtube(info['url']) | |
st.session_state.video_input_title = info['title'] | |
session_dir = get_session_dir() | |
os.makedirs(session_dir, exist_ok=True) | |
downloaded_path = download_youtube(video_url, session_dir) | |
if downloaded_path and os.path.exists(downloaded_path): | |
st.session_state['video_path'] = downloaded_path | |
col_info.success(f'YouTube video downloaded: {os.path.basename(downloaded_path)}') | |
else: | |
col_info.error('Failed to download YouTube video') | |
else: | |
st.warning('Please upload a file or provide a YouTube URL') | |
st.stop() | |
# --- Basic Preprocessing: Audio Extraction --- | |
if st.session_state['video_path']: | |
# st.write('Extracting audio..') | |
start = time.time() | |
# Ensure utils.extract_audio uses the correct path | |
audio_path = extract_audio(st.session_state['video_path'], audio_format='mp3') | |
# audio_path = extract_audio(st.session_state['video_path']) | |
end = time.time() | |
if audio_path and os.path.exists(audio_path): | |
col_info.success(f'Audio extracted to: {os.path.basename(audio_path)} (took {end - start:.2f}s)') | |
st.session_state['audio_path'] = audio_path | |
else: | |
col_info.error('Failed to extract audio from the video') | |
st.warning('Proceeding without audio. STT step will be skipped') | |
st.session_state['audio_path'] = None # explicitly set to None | |
if st.session_state['video_path']: | |
col_complete.info('Preprocessing complete') | |
col_next.page_link('ui_transcribe.py', label='Next Step: 🎙️ **Transcribe**', icon='➡️') | |
# Display video | |
with st.container(height=300, border=False): | |
_, col_preview_description, _ = st.columns([1, 3, 1]) | |
col_preview_description.subheader('Preview Video') | |
_, col_video, _ = st.columns([1, 3, 1]) | |
col_video.video(video) | |
# audio_data = audio_path | |
# audio_wav, audio_np = proc_raw_audio(audio_data) | |
# st.session_state.audio_wav = audio_wav | |
# st.session_state.audio_np = audio_np | |
# # st.session_state.video = video.read() | |
# ui_processed_sound(audio_wav, audio_np) | |
# # Display current status | |
# st.subheader("Current Status:") | |
# if st.session_state.get('video_path'): | |
# st.success(f"✅ Video Loaded: {os.path.basename(st.session_state['video_path'])}") | |
# else: | |
# st.warning("⏳ Video not yet loaded or processed.") | |
# if st.session_state.get('audio_path'): | |
# st.success(f"✅ Audio Extracted: {os.path.basename(st.session_state['audio_path'])}") | |
# elif st.session_state.get('video_path'): # only show warning if video was loaded but audio failed | |
# st.warning("⚠️ Audio extraction failed or video has no audio track.") | |