conspectum / ui_video.py
macsunmood's picture
update app
6edd739
raw
history blame contribute delete
10.7 kB
import streamlit as st
import os
import pytesseract
from PIL import Image
import time
from utils import extract_frames_interval, extract_frames_pyscenedetect
st.title('๐Ÿ–ผ๏ธ Step 3: Video Processing (Frame Extraction & OCR)')
# Check if video path exists
if ('video_path' not in st.session_state or
not st.session_state['video_path'] or
not os.path.exists(st.session_state['video_path'])
):
st.warning('Video file not found. Please go back to the **๐Ÿ“ค Upload** page and process a video first.')
st.stop()
video_path = st.session_state['video_path']
st.write(f'Video file to process: `{os.path.basename(video_path)}`')
#
# ==================================================================
#
col_method, col_config = st.columns(2)
# --- Method ---
# with col_model.expander('**MODEL**', expanded=True):
with col_method.container(border=True):
# extraction_method = st.selectbox(
# 'Extraction method:',
# ('interval', 'video2slides', 'pyscenedetect'),
# index=0
# )
extraction_method = st.radio(
'Extraction method:',
('interval', 'video2slides', 'pyscenedetect'),
index=0,
horizontal=True,
)
# col_config_frame_interval, col_config_ocr_lang = st.columns(2)
# frame_interval = col_config_frame_interval.slider('Extract frames every `X` seconds:', min_value=1, max_value=60, value=5, step=1)
# ocr_lang = col_config_ocr_lang.text_input('OCR Language(s) (e.g. `rus`, `rus+eng`):', value='rus')
ocr_lang = st.text_input('OCR Language(s) (e.g. `rus`, `rus+eng`):', value='rus')
# --- Configuration ---
with col_config.expander(f'**`{extraction_method}` METHOD CONFIG**', expanded=True):
match extraction_method:
case 'interval':
extraction_interval = st.number_input(
'Frames extraction interval:',
min_value=0, max_value=25, step=1, format='%i', value=5,
help='Extract frames every `x` seconds'
)
case 'video2slides':
print('video2slides')
case 'pyscenedetect':
extraction_threshold = st.number_input(
'Frames extraction threshold:',
min_value=0.1, max_value=10.0, step=0.1, format='%f', value=1.5,
)
# --- Semantic Segmentation Placeholder ---
# st.markdown("---")
# --- Tesseract Configuration (Optional but recommended) ---
# Uncomment and set the path if tesseract is not in your PATH
# pytesseract.pytesseract.tesseract_cmd = r'/path/to/your/tesseract' # Example: '/usr/bin/tesseract' or 'C:\Program Files\Tesseract-OCR\tesseract.exe'
# # --- Frame Extraction and OCR ---
# st.subheader('OCR')
_, col_button_extract, _ = st.columns([2, 1, 2])
if col_button_extract.button('Extract Frames', type='primary', use_container_width=True):
# st.session_state['ocr_text'] = None # clear previous results
st.session_state['frames_paths'] = []
# all_ocr_results = []
col_info, col_complete, col_next = st.columns(3)
match extraction_method:
case 'interval':
with st.spinner(f'Extracting frames every {extraction_interval} seconds (using interval method)..'):
start_time = time.time()
frames_dir, frame_paths = extract_frames_interval(video_path, 'frames_pyscenedetect', interval_sec=extraction_interval)
extract_time = time.time() - start_time
if frames_dir and frame_paths:
st.session_state['frames_dir'] = frames_dir
st.session_state['frames_paths'] = frame_paths # store paths
col_info.success(f'Extracted {len(frame_paths)} frames in {extract_time:.2f}s.')
else:
col_info.error('Failed to extract frames')
st.stop()
case 'video2slides':
pass
case 'pyscenedetect':
with st.spinner(f'Extracting frames with `threshold={extraction_threshold}` (using pyscenedetect method)..'):
start_time = time.time()
frames_dir, frame_paths = extract_frames_pyscenedetect(video_path, 'frames_pyscenedetect', threshold=extraction_threshold)
extract_time = time.time() - start_time
if frames_dir and frame_paths:
st.session_state['frames_dir'] = frames_dir
st.session_state['frames_paths'] = frame_paths # store paths
col_info.success(f'Extracted {len(frame_paths)} frames in {extract_time:.2f}s.')
else:
col_info.error('Failed to extract frames')
st.stop()
if st.session_state['frames_paths']:
total_frames = len(st.session_state['frames_paths'])
# col_info.write(f'Performing OCR on {total_frames} frames..')
# ocr_progress = st.progress(0)
start_ocr_time = time.time()
extracted_texts = []
processed_count = 0
# Use columns to display some example frames
max_display_frames = 6
display_cols = st.columns(min(max_display_frames, total_frames) if total_frames > 0 else 1)
display_idx = 0
# Process frames in batches or one by one
for i, frame_path in enumerate(st.session_state['frames_paths']):
img = Image.open(frame_path)
# Extract timestamp from filename (assuming format frame_XXXXXX.png)
try:
secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0])
timestamp = time.strftime('%H:%M:%S', time.gmtime(secs))
extracted_texts.append({'timestamp': timestamp, 'image': img})
except:
extracted_texts.append({'timestamp': 'N/A', 'image': img}) # fallback if filename parse fails
# Display some examples
if display_idx < max_display_frames and display_idx < len(display_cols):
with display_cols[display_idx]:
st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True)
display_idx += 1
processed_count += 1
# ocr_progress.progress(processed_count / total_frames)
# # Process frames in batches or one by one
# for i, frame_path in enumerate(st.session_state['frames_paths']):
# try:
# img = Image.open(frame_path)
# # --- Potential Preprocessing/Filtering ---
# # Add logic here if needed:
# # - Detect if frame likely contains text (e.g., check contrast, edges)
# # - If segmentation was implemented, crop to slide regions here
# # --- Perform OCR ---
# text = pytesseract.image_to_string(img, lang=ocr_lang)
# # --- Basic Text Cleaning/Filtering ---
# cleaned_text = text.strip()
# if cleaned_text and len(cleaned_text) > 10: # filter very short/noisy results
# # Extract timestamp from filename (assuming format frame_XXXXXX.png)
# try:
# secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0])
# timestamp = time.strftime('%H:%M:%S', time.gmtime(secs))
# extracted_texts.append({'timestamp': timestamp, 'text': cleaned_text})
# except:
# extracted_texts.append({'timestamp': 'N/A', 'text': cleaned_text}) # fallback if filename parse fails
# # Display some examples
# if display_idx < max_display_frames and display_idx < len(display_cols):
# with display_cols[display_idx]:
# st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True)
# st.text(f'OCR:\n{cleaned_text[:100]}..') # show snippet
# display_idx += 1
# processed_count += 1
# ocr_progress.progress(processed_count / total_frames)
# except Exception as ocr_err:
# col_info.warning(f'Could not perform OCR on {os.path.basename(frame_path)}: {ocr_err}')
# processed_count += 1 # still count as processed
# ocr_progress.progress(processed_count / total_frames)
# ocr_time = time.time() - start_ocr_time
# col_complete.success(f'OCR processing finished in {ocr_time:.2f}s.')
# # --- Aggregate and Deduplicate OCR Text ---
# # Simple approach: Combine unique text blocks
# final_ocr_text = ""
# seen_texts = set()
# last_text = ""
# min_similarity_threshold = 0.8 # requires a library like `thefuzz` or similar for proper check
# # basic check: avoid exact consecutive duplicates
# for item in extracted_texts:
# current_text_block = item['text'].strip()
# # Basic check: Only add if significantly different from the last block
# # A more robust check would involve sequence matching or fuzzy matching
# is_duplicate = False
# if last_text:
# # Simple check: exact match or near-exact length/content start?
# if (current_text_block == last_text or
# (abs(len(current_text_block) - len(last_text)) < 10 and
# current_text_block.startswith(last_text[:20]))
# ):
# is_duplicate = True # likely a duplicate from consecutive frames
# if current_text_block and not is_duplicate: # only add non-empty, non-duplicate text
# final_ocr_text += f"\n\n--- Text from frame around {item['timestamp']} ---\n"
# final_ocr_text += current_text_block
# last_text = current_text_block # update last text added
# st.session_state['ocr_text'] = final_ocr_text.strip()
# if st.session_state['ocr_text']:
# col_complete.info('OCR processing complete.')
# col_next.page_link('ui_summarize.py', label='Next Step: **๐Ÿ“ Summarize**', icon='โžก๏ธ')
# else:
# col_complete.warning('No significant text found via OCR')
# # --- Display OCR Results ---
# st.subheader('Aggregated OCR Text')
# if 'ocr_text' in st.session_state and st.session_state['ocr_text']:
# st.text_area("Extracted Text from Frames", st.session_state['ocr_text'], height=400)
# else:
# st.info('OCR has not been run or no text was detected')
# st.divider()
# st.subheader('Semantic Segmentation')