Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import pytesseract | |
| from PIL import Image | |
| import time | |
| from utils import extract_frames_interval, extract_frames_pyscenedetect | |
| st.title('๐ผ๏ธ Step 3: Video Processing (Frame Extraction & OCR)') | |
| # Check if video path exists | |
| if ('video_path' not in st.session_state or | |
| not st.session_state['video_path'] or | |
| not os.path.exists(st.session_state['video_path']) | |
| ): | |
| st.warning('Video file not found. Please go back to the **๐ค Upload** page and process a video first.') | |
| st.stop() | |
| video_path = st.session_state['video_path'] | |
| st.write(f'Video file to process: `{os.path.basename(video_path)}`') | |
| # | |
| # ================================================================== | |
| # | |
| col_method, col_config = st.columns(2) | |
| # --- Method --- | |
| # with col_model.expander('**MODEL**', expanded=True): | |
| with col_method.container(border=True): | |
| # extraction_method = st.selectbox( | |
| # 'Extraction method:', | |
| # ('interval', 'video2slides', 'pyscenedetect'), | |
| # index=0 | |
| # ) | |
| extraction_method = st.radio( | |
| 'Extraction method:', | |
| ('interval', 'video2slides', 'pyscenedetect'), | |
| index=0, | |
| horizontal=True, | |
| ) | |
| # col_config_frame_interval, col_config_ocr_lang = st.columns(2) | |
| # frame_interval = col_config_frame_interval.slider('Extract frames every `X` seconds:', min_value=1, max_value=60, value=5, step=1) | |
| # ocr_lang = col_config_ocr_lang.text_input('OCR Language(s) (e.g. `rus`, `rus+eng`):', value='rus') | |
| ocr_lang = st.text_input('OCR Language(s) (e.g. `rus`, `rus+eng`):', value='rus') | |
| # --- Configuration --- | |
| with col_config.expander(f'**`{extraction_method}` METHOD CONFIG**', expanded=True): | |
| match extraction_method: | |
| case 'interval': | |
| extraction_interval = st.number_input( | |
| 'Frames extraction interval:', | |
| min_value=0, max_value=25, step=1, format='%i', value=5, | |
| help='Extract frames every `x` seconds' | |
| ) | |
| case 'video2slides': | |
| print('video2slides') | |
| case 'pyscenedetect': | |
| extraction_threshold = st.number_input( | |
| 'Frames extraction threshold:', | |
| min_value=0.1, max_value=10.0, step=0.1, format='%f', value=1.5, | |
| ) | |
| # --- Semantic Segmentation Placeholder --- | |
| # st.markdown("---") | |
| # --- Tesseract Configuration (Optional but recommended) --- | |
| # Uncomment and set the path if tesseract is not in your PATH | |
| # pytesseract.pytesseract.tesseract_cmd = r'/path/to/your/tesseract' # Example: '/usr/bin/tesseract' or 'C:\Program Files\Tesseract-OCR\tesseract.exe' | |
| # # --- Frame Extraction and OCR --- | |
| # st.subheader('OCR') | |
| _, col_button_extract, _ = st.columns([2, 1, 2]) | |
| if col_button_extract.button('Extract Frames', type='primary', use_container_width=True): | |
| # st.session_state['ocr_text'] = None # clear previous results | |
| st.session_state['frames_paths'] = [] | |
| # all_ocr_results = [] | |
| col_info, col_complete, col_next = st.columns(3) | |
| match extraction_method: | |
| case 'interval': | |
| with st.spinner(f'Extracting frames every {extraction_interval} seconds (using interval method)..'): | |
| start_time = time.time() | |
| frames_dir, frame_paths = extract_frames_interval(video_path, 'frames_pyscenedetect', interval_sec=extraction_interval) | |
| extract_time = time.time() - start_time | |
| if frames_dir and frame_paths: | |
| st.session_state['frames_dir'] = frames_dir | |
| st.session_state['frames_paths'] = frame_paths # store paths | |
| col_info.success(f'Extracted {len(frame_paths)} frames in {extract_time:.2f}s.') | |
| else: | |
| col_info.error('Failed to extract frames') | |
| st.stop() | |
| case 'video2slides': | |
| pass | |
| case 'pyscenedetect': | |
| with st.spinner(f'Extracting frames with `threshold={extraction_threshold}` (using pyscenedetect method)..'): | |
| start_time = time.time() | |
| frames_dir, frame_paths = extract_frames_pyscenedetect(video_path, 'frames_pyscenedetect', threshold=extraction_threshold) | |
| extract_time = time.time() - start_time | |
| if frames_dir and frame_paths: | |
| st.session_state['frames_dir'] = frames_dir | |
| st.session_state['frames_paths'] = frame_paths # store paths | |
| col_info.success(f'Extracted {len(frame_paths)} frames in {extract_time:.2f}s.') | |
| else: | |
| col_info.error('Failed to extract frames') | |
| st.stop() | |
| if st.session_state['frames_paths']: | |
| total_frames = len(st.session_state['frames_paths']) | |
| # col_info.write(f'Performing OCR on {total_frames} frames..') | |
| # ocr_progress = st.progress(0) | |
| start_ocr_time = time.time() | |
| extracted_texts = [] | |
| processed_count = 0 | |
| # Use columns to display some example frames | |
| max_display_frames = 6 | |
| display_cols = st.columns(min(max_display_frames, total_frames) if total_frames > 0 else 1) | |
| display_idx = 0 | |
| # Process frames in batches or one by one | |
| for i, frame_path in enumerate(st.session_state['frames_paths']): | |
| img = Image.open(frame_path) | |
| # Extract timestamp from filename (assuming format frame_XXXXXX.png) | |
| try: | |
| secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0]) | |
| timestamp = time.strftime('%H:%M:%S', time.gmtime(secs)) | |
| extracted_texts.append({'timestamp': timestamp, 'image': img}) | |
| except: | |
| extracted_texts.append({'timestamp': 'N/A', 'image': img}) # fallback if filename parse fails | |
| # Display some examples | |
| if display_idx < max_display_frames and display_idx < len(display_cols): | |
| with display_cols[display_idx]: | |
| st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True) | |
| display_idx += 1 | |
| processed_count += 1 | |
| # ocr_progress.progress(processed_count / total_frames) | |
| # # Process frames in batches or one by one | |
| # for i, frame_path in enumerate(st.session_state['frames_paths']): | |
| # try: | |
| # img = Image.open(frame_path) | |
| # # --- Potential Preprocessing/Filtering --- | |
| # # Add logic here if needed: | |
| # # - Detect if frame likely contains text (e.g., check contrast, edges) | |
| # # - If segmentation was implemented, crop to slide regions here | |
| # # --- Perform OCR --- | |
| # text = pytesseract.image_to_string(img, lang=ocr_lang) | |
| # # --- Basic Text Cleaning/Filtering --- | |
| # cleaned_text = text.strip() | |
| # if cleaned_text and len(cleaned_text) > 10: # filter very short/noisy results | |
| # # Extract timestamp from filename (assuming format frame_XXXXXX.png) | |
| # try: | |
| # secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0]) | |
| # timestamp = time.strftime('%H:%M:%S', time.gmtime(secs)) | |
| # extracted_texts.append({'timestamp': timestamp, 'text': cleaned_text}) | |
| # except: | |
| # extracted_texts.append({'timestamp': 'N/A', 'text': cleaned_text}) # fallback if filename parse fails | |
| # # Display some examples | |
| # if display_idx < max_display_frames and display_idx < len(display_cols): | |
| # with display_cols[display_idx]: | |
| # st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True) | |
| # st.text(f'OCR:\n{cleaned_text[:100]}..') # show snippet | |
| # display_idx += 1 | |
| # processed_count += 1 | |
| # ocr_progress.progress(processed_count / total_frames) | |
| # except Exception as ocr_err: | |
| # col_info.warning(f'Could not perform OCR on {os.path.basename(frame_path)}: {ocr_err}') | |
| # processed_count += 1 # still count as processed | |
| # ocr_progress.progress(processed_count / total_frames) | |
| # ocr_time = time.time() - start_ocr_time | |
| # col_complete.success(f'OCR processing finished in {ocr_time:.2f}s.') | |
| # # --- Aggregate and Deduplicate OCR Text --- | |
| # # Simple approach: Combine unique text blocks | |
| # final_ocr_text = "" | |
| # seen_texts = set() | |
| # last_text = "" | |
| # min_similarity_threshold = 0.8 # requires a library like `thefuzz` or similar for proper check | |
| # # basic check: avoid exact consecutive duplicates | |
| # for item in extracted_texts: | |
| # current_text_block = item['text'].strip() | |
| # # Basic check: Only add if significantly different from the last block | |
| # # A more robust check would involve sequence matching or fuzzy matching | |
| # is_duplicate = False | |
| # if last_text: | |
| # # Simple check: exact match or near-exact length/content start? | |
| # if (current_text_block == last_text or | |
| # (abs(len(current_text_block) - len(last_text)) < 10 and | |
| # current_text_block.startswith(last_text[:20])) | |
| # ): | |
| # is_duplicate = True # likely a duplicate from consecutive frames | |
| # if current_text_block and not is_duplicate: # only add non-empty, non-duplicate text | |
| # final_ocr_text += f"\n\n--- Text from frame around {item['timestamp']} ---\n" | |
| # final_ocr_text += current_text_block | |
| # last_text = current_text_block # update last text added | |
| # st.session_state['ocr_text'] = final_ocr_text.strip() | |
| # if st.session_state['ocr_text']: | |
| # col_complete.info('OCR processing complete.') | |
| # col_next.page_link('ui_summarize.py', label='Next Step: **๐ Summarize**', icon='โก๏ธ') | |
| # else: | |
| # col_complete.warning('No significant text found via OCR') | |
| # # --- Display OCR Results --- | |
| # st.subheader('Aggregated OCR Text') | |
| # if 'ocr_text' in st.session_state and st.session_state['ocr_text']: | |
| # st.text_area("Extracted Text from Frames", st.session_state['ocr_text'], height=400) | |
| # else: | |
| # st.info('OCR has not been run or no text was detected') | |
| # st.divider() | |
| # st.subheader('Semantic Segmentation') | |