Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import time | |
| from yt_dlp import YoutubeDL | |
| import ffmpeg | |
| import tempfile | |
| from utils import (save_uploaded_file, extract_audio, | |
| download_youtube, get_session_dir, | |
| cleanup_session_files, get_session_id, | |
| get_temp_dir, get_features, proc_raw_audio) | |
| st.title('π₯π Step 1: Upload Video & Preprocess') | |
| # Initialize session state defaults | |
| defaults = { | |
| 'uploaded_file': None, | |
| 'video_path': None, | |
| 'audio_path': None, | |
| 'ocr_text': None, | |
| 'transcript': None, | |
| 'summary': None, | |
| 'main_topic': None, | |
| 'input_method': 'Upload', | |
| 'input_title': None, | |
| 'video_input_path': None, | |
| 'video_url': None, | |
| # 'audio_wav': None, | |
| # 'audio_file': None, | |
| } | |
| for key, value in defaults.items(): | |
| st.session_state.setdefault(key, value) | |
| # --- Option to clear previous session --- | |
| st.sidebar.write('Current Session ID:') | |
| st.sidebar.write(f'`{get_session_id()}`') # session ID for debugging | |
| if st.sidebar.button('Start New Session'): | |
| session_id = get_session_id() # get current ID before clearing | |
| cleanup_session_files(session_id) | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] # clear all session state | |
| st.rerun() # rerun the script to reflect cleared state | |
| # --- Main Topic --- | |
| st.session_state.main_topic = st.text_input('(optional) Provide video topic:', st.session_state.main_topic) | |
| # st.session_state.main_topic = m | |
| # col_url, col_start_from = st.columns([5, 2]) | |
| # video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) | |
| # start_from = col_start_from.number_input( | |
| # 'Start From:', | |
| # min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], | |
| # help='Time shift from the beginning (in seconds)' | |
| # ) | |
| # if video_url: | |
| # st.session_state.video_url = video_url | |
| # st.session_state.video_input_path = '' # clear path if URL is used | |
| # --- Video source selection --- | |
| input_method = st.radio( | |
| 'Select Input Method:', | |
| ('Upload', 'YouTube'), | |
| key='input_method', | |
| horizontal=True | |
| ) | |
| video_path = None | |
| uploaded_file = None | |
| video_url = None | |
| if input_method == 'Upload': | |
| uploaded_file = st.file_uploader( | |
| 'Choose a video file', | |
| type=['mp4', 'avi', 'mkv', 'mov'] | |
| ) | |
| if uploaded_file: | |
| col_info, col_ready = st.columns(2) | |
| # Display basic file info | |
| col_info.info('**[ File Details ]** ' + | |
| f'name: `{uploaded_file.name}` | ' + | |
| f'type: `{uploaded_file.type}` | ' + | |
| f'size: `{uploaded_file.size / (1024 * 1024):.2f} MB`') | |
| # Save uploaded file temporarily for the Prefect flow | |
| temp_dir = get_temp_dir() # use a shared temp location | |
| # Use a unique name to avoid conflicts if multiple users run simultaneously | |
| target_path = os.path.join(temp_dir, f'upload_{get_session_id()}_{uploaded_file.name}') | |
| try: | |
| with open(target_path, 'wb') as f: | |
| f.write(uploaded_file.getbuffer()) | |
| st.session_state.video_input_path = target_path | |
| st.session_state.video_input_title = uploaded_file.name | |
| st.session_state.video_url = '' # clear URL if file is uploaded | |
| st.session_state.transcript = None | |
| st.session_state.summary = None | |
| col_ready.info('Ready for processing.') | |
| except Exception as e: | |
| col_ready.error(f'Error saving uploaded file: {e}') | |
| st.session_state.video_input_path = '' | |
| elif input_method == 'YouTube': | |
| #-- Obtain audio from YouTube video | |
| example_youtube = { | |
| 'title': 'ΠΠ±ΡΠ΅ΡΡΠ²Π΅Π½Π½ΠΎΠ΅ Π΄Π²ΠΈΠΆΠ΅Π½ΠΈΠ΅', | |
| 'url': 'https://www.youtube.com/watch?v=c3bhkrKF6F4', | |
| 'start': 0.0 | |
| } | |
| col_url, col_start_from = st.columns([5, 2]) | |
| video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) | |
| start_from = col_start_from.number_input( | |
| 'Start From:', | |
| min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], | |
| help='Time shift from the beginning (in seconds)' | |
| ) | |
| if video_url: | |
| st.session_state.video_url = video_url | |
| st.session_state.video_input_path = '' # clear path if URL is used | |
| def ui_processed_sound(audio_wav, audio_np): | |
| '''UI to show sound processing results''' | |
| st.audio(audio_wav) | |
| features = get_features(audio_np) | |
| def extract_videofile(video_file): | |
| # video_buffer = BytesIO(video_file.read()) | |
| # audio_data = VideoFileClip(video_buffer.name).audio | |
| # raw_source = StringIO(video_file.getvalue().decode('utf-8')) | |
| # raw_source = video_file.getvalue().decode('utf-8') | |
| # raw_source = video_file.read() | |
| # raw_source = BytesIO(video_file.getvalue()) | |
| #-- Get video | |
| # out, err = ( | |
| # ffmpeg | |
| # .input(video_file, ss=start_from) | |
| # .output('temp.mp4', vcodec='copy') | |
| # .overwrite_output() | |
| # .run() | |
| # ) | |
| # st.video('temp.mp4') | |
| # video = VideoFileClip(video_file) | |
| # audio = video.audio | |
| # audio.write_audiofile('output_audio.mp3') | |
| tfile = tempfile.NamedTemporaryFile(delete=False) | |
| tfile.write(video_file.read()) | |
| #-- Get audio | |
| # SAMPLE_RATE = 16000 | |
| audio_data, err = ( | |
| ffmpeg | |
| .input(tfile.name, ss=start_from) | |
| .output('pipe:', format='wav')#, acodec='pcm_s16le') | |
| # .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) | |
| # .global_args('-nostdin', '-threads', '0') | |
| .run(capture_stdout=True) | |
| ) | |
| if err: | |
| raise RuntimeError(f'Failed to load audio: {err.decode()}') | |
| return audio_data | |
| def extract_youtube(raw_url): | |
| #-- Get video | |
| # out, err = ( | |
| # ffmpeg | |
| # .input(raw_url, ss=start_from) | |
| # .output('temp.mp4', vcodec='copy') | |
| # .overwrite_output() | |
| # .run() | |
| # ) | |
| # st.video('temp.mp4') | |
| #-- Get audio | |
| # SAMPLE_RATE = 16000 | |
| audio_data, err = ( | |
| ffmpeg | |
| .input(raw_url, ss=start_from) | |
| .output('pipe:', format='wav')#, acodec='pcm_s16le') | |
| # .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) | |
| .global_args('-nostdin', '-threads', '0') | |
| .run(capture_stdout=True) | |
| ) | |
| if err: | |
| raise RuntimeError(f'Failed to load audio: {err.decode()}') | |
| return audio_data | |
| # --- Processing Button --- | |
| _, col_button_process, _ = st.columns([2, 1, 2]) | |
| if col_button_process.button('Process video', | |
| type='primary', | |
| use_container_width=True, | |
| disabled=not (st.session_state.video_input_path or st.session_state.video_url) | |
| ): | |
| # Clear previous paths if reprocessing | |
| st.session_state['video_path'] = None | |
| st.session_state['audio_path'] = None | |
| col_info, col_complete, col_next = st.columns(3) | |
| with st.spinner('Processing video input..'): | |
| if st.session_state['input_method'] == 'Upload' and uploaded_file: | |
| st.session_state.uploaded_file = uploaded_file | |
| video = uploaded_file | |
| # audio_data = extract_videofile(uploaded_file) | |
| saved_path = save_uploaded_file(uploaded_file) | |
| if saved_path: | |
| st.session_state['video_path'] = saved_path | |
| col_info.success(f'Video saved temporarily to: {os.path.basename(saved_path)}') | |
| else: | |
| col_info.error('Failed to save uploaded file') | |
| elif st.session_state['input_method'] == 'YouTube' and video_url: | |
| try: | |
| with YoutubeDL({'format': 'best+bestaudio'}) as ydl: | |
| info = ydl.extract_info(video_url, download=False) | |
| except Exception as e: | |
| st.error(e) | |
| else: | |
| d = info['duration'] | |
| h, m, s = d // 3600, (d % 3600) // 60, d % 60 | |
| time_str = [] | |
| if h: time_str.append(f'{h}h') | |
| if m: time_str.append(f'{m}m') | |
| if s or not time_str: time_str.append(f'{s}s') | |
| time_str = ' '.join(time_str) | |
| st.write(f"<small><div style='float: center; text-align: center'>\ | |
| **Title:** [{info['title']}]({video_url})\ | |
| **Duration:** {info['duration']} sec.</div></small>", | |
| unsafe_allow_html=True) | |
| video = video_url | |
| # audio_data = extract_youtube(info['url']) | |
| st.session_state.video_input_title = info['title'] | |
| session_dir = get_session_dir() | |
| os.makedirs(session_dir, exist_ok=True) | |
| downloaded_path = download_youtube(video_url, session_dir) | |
| if downloaded_path and os.path.exists(downloaded_path): | |
| st.session_state['video_path'] = downloaded_path | |
| col_info.success(f'YouTube video downloaded: {os.path.basename(downloaded_path)}') | |
| else: | |
| col_info.error('Failed to download YouTube video') | |
| else: | |
| st.warning('Please upload a file or provide a YouTube URL') | |
| st.stop() | |
| # --- Basic Preprocessing: Audio Extraction --- | |
| if st.session_state['video_path']: | |
| # st.write('Extracting audio..') | |
| start = time.time() | |
| # Ensure utils.extract_audio uses the correct path | |
| audio_path = extract_audio(st.session_state['video_path'], audio_format='mp3') | |
| # audio_path = extract_audio(st.session_state['video_path']) | |
| end = time.time() | |
| if audio_path and os.path.exists(audio_path): | |
| col_info.success(f'Audio extracted to: {os.path.basename(audio_path)} (took {end - start:.2f}s)') | |
| st.session_state['audio_path'] = audio_path | |
| else: | |
| col_info.error('Failed to extract audio from the video') | |
| st.warning('Proceeding without audio. STT step will be skipped') | |
| st.session_state['audio_path'] = None # explicitly set to None | |
| if st.session_state['video_path']: | |
| col_complete.info('Preprocessing complete') | |
| col_next.page_link('ui_transcribe.py', label='Next Step: ποΈ **Transcribe**', icon='β‘οΈ') | |
| # Display video | |
| with st.container(height=300, border=False): | |
| _, col_preview_description, _ = st.columns([1, 3, 1]) | |
| col_preview_description.subheader('Preview Video') | |
| _, col_video, _ = st.columns([1, 3, 1]) | |
| col_video.video(video) | |
| # audio_data = audio_path | |
| # audio_wav, audio_np = proc_raw_audio(audio_data) | |
| # st.session_state.audio_wav = audio_wav | |
| # st.session_state.audio_np = audio_np | |
| # # st.session_state.video = video.read() | |
| # ui_processed_sound(audio_wav, audio_np) | |
| # # Display current status | |
| # st.subheader("Current Status:") | |
| # if st.session_state.get('video_path'): | |
| # st.success(f"β Video Loaded: {os.path.basename(st.session_state['video_path'])}") | |
| # else: | |
| # st.warning("β³ Video not yet loaded or processed.") | |
| # if st.session_state.get('audio_path'): | |
| # st.success(f"β Audio Extracted: {os.path.basename(st.session_state['audio_path'])}") | |
| # elif st.session_state.get('video_path'): # only show warning if video was loaded but audio failed | |
| # st.warning("β οΈ Audio extraction failed or video has no audio track.") | |