Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
from transformers import pipeline | |
import time | |
from docx import Document | |
from io import BytesIO | |
os.environ['STREAMLIT_SERVER_ENABLE_FILE_WATCHER'] = 'false' | |
import torch | |
from langchain_ollama.llms import OllamaLLM | |
# from utils import cleanup_session_files, get_session_id # for cleanup button | |
from utils import get_secret_api, get_secret_prompt | |
st.session_state.secret_api = get_secret_api() | |
import requests | |
# st.session_state.secret_prompt = get_secret_prompt() | |
prompt_file_id = '1s5r_DuxaEoMk-D5-53FVhTMeHGVtoeV7' | |
default_prompt = '''Ты - ассистент, который создает конспекты лекций на основе предоставленного текста. Этот текст состоит из двух частей: | |
1. Транскрибация аудиодорожки видеолекции, | |
2. Изображение выделенных из видео ключевых кадров, с полезной информацией. | |
Сделай детальный конспект по тому, что описывается в видео. Для иллюстрации сравнений и сопоставлений используй markdown-таблицы. Ответ предоставь в формате markdown. | |
''' | |
# gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, а для иллюстрации сравнений и сопоставлений используй markdown-таблицы:' | |
gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, используя markdown-таблицы.' | |
if st.session_state.main_topic: | |
gluing_prompt += f' Основная тема лекции: {st.session_state.main_topic}' | |
# st.write(image_path) | |
frames_paths = [os.path.join(st.session_state.frames_dir, f) | |
for f in os.listdir(st.session_state.frames_dir) | |
if f.endswith('.jpg') | |
and os.path.isfile(os.path.join(st.session_state.frames_dir, f))] | |
# import base64 | |
# # Load and encode JPEG images to base64 | |
# frames = [] | |
# # st.success(os.listdir(st.session_state.frames_dir)) | |
# # st.success([os.path.isfile(f) for f in os.listdir(st.session_state.frames_dir)])# if f.endswith('.jpg') and os.path.isfile(f)]) | |
# for image_path in frames_paths: | |
# # st.write(image_path) | |
# with open(os.path.join(st.session_state.frames_dir, image_path), 'rb') as image_file: | |
# # Read the image and encode it to base64 | |
# encoded_string = base64.b64encode(image_file.read()).decode('utf-8') | |
# frames.append(encoded_string) | |
# # st.success(frames) | |
st.title('📝 Step 4: Lecture Summarization') | |
# Check if transcript and potentially OCR text are available | |
transcript_available = 'transcript' in st.session_state and st.session_state['transcript'] | |
frames_available = 'frames_dir' in st.session_state and st.session_state['frames_dir'] | |
if not transcript_available and not frames_available: | |
st.warning("No text content (Transcript or OCR) found. Please complete previous steps first.") | |
st.stop() | |
# st.info("This step combines the generated transcript and OCR text (if available) and creates a summary.") | |
# --- Combine Sources --- | |
st.subheader('Sources') | |
# combined_text = "" | |
source_info = [] | |
col_source_transcript, col_source_frames = st.columns(2) | |
if transcript_available: | |
col_source_transcript.success('✅ Transcript found') | |
# st.success(len(st.session_state.transcript.__dict__['output'])) | |
# st.success(st.session_state.transcript.__dict__['output'][0]['text']) | |
# combined_text += '--- Transcript ---\n' + st.session_state.transcript['output'][0]['text'] + '\n\n' | |
# st.success(st.session_state.transcript.output[0]['text']) | |
transcript_text = st.session_state.transcript.output['text'] | |
transcript_segments = st.session_state.transcript_segments | |
# combined_text += '--- Transcript ---\n\n' + transcript_text + '\n\n' | |
# st.write(combined_text) | |
source_info.append('Transcript') | |
with col_source_transcript.expander('Show transcript'): | |
st.text_area('Transcript', transcript_text, height=200, key='sum_transcript_disp') | |
else: | |
col_source_transcript.warning('Transcript not available.') | |
if frames_available: | |
col_source_frames.success('✅ Extracted frames found') | |
# combined_text += "--- OCR results ---\n" + st.session_state['frames_dir'] | |
source_info.append('Frames dir') | |
# with st.expander('Extracted frames directory'): | |
# st.text_area('Extracted frames directory', st.session_state['frames_dir'], height=200, key="sum_ocr_disp") | |
# st.text_area('Extracted frames directory', st.session_state['frames_dir'], height=200, key="sum_ocr_disp") | |
with col_source_frames.expander('Show frames'): | |
st.text_input('Extracted frames directory', st.session_state['frames_dir']) | |
else: | |
# st.warning('OCR Text not available.') | |
col_source_frames.warning('Extracted frames not available.') | |
# combined_text = combined_text.strip() | |
# if not combined_text: | |
# st.error("Combined text is empty. Cannot proceed.") | |
if not transcript_text: | |
st.error('Transcript text is empty. Cannot proceed.') | |
st.stop() | |
# --- Summarization Configuration --- | |
st.subheader('Summarization Settings') | |
# Consider different models/pipelines | |
summarizer_options = ['gemma3:4b', | |
'gemma3:12b', | |
'granite3.2-vision', | |
# 'phi4', | |
'mistral-small3.1', | |
'llama3.2-vision', | |
# 'YandexGPT', | |
# 't5-base', | |
# 't5-large', | |
# 'facebook/mbart-large-50', | |
# 'facebook/bart-large-cnn', | |
# 'google/pegasus-xsum', | |
] | |
selected_model = st.selectbox('Select Summarization Model:', summarizer_options, index=1) | |
# # Dynamic length based on input size (example logic) | |
# # input_length = len(combined_text.split()) | |
# input_length = len(transcript_text.split()) # approx word count | |
# default_min = max(50, input_length // 10) # suggest min length ~10% of input | |
# default_max = max(150, input_length // 3) # suggest max length ~30% of input | |
# min_length = st.slider("Minimum Summary Length (tokens):", min_value=30, max_value=max(500, default_max + 100), value=default_min) | |
# max_length = st.slider("Maximum Summary Length (tokens):", min_value=50, max_value=max(1000, default_max + 200), value=default_max) | |
# if min_length >= max_length: | |
# st.warning("Minimum length should be less than maximum length.") | |
# # Adjust max_length automatically or prevent proceeding | |
# max_length = min_length + 50 # simple adjustment | |
# --- Generate Summary --- | |
def describe_video(model, frames_dir, describe_prompt): | |
images = [] | |
for file in os.listdir(frames_dir): | |
images.append(os.path.join(frames_dir, file)) | |
model_with_images = model.bind(images=images) | |
return model_with_images.invoke(describe_prompt) | |
def load_prompt(): | |
describe_prompt = None | |
prompt_url = f'https://drive.google.com/uc?export=download&id={prompt_file_id}' | |
response = requests.get(prompt_url) | |
if response.status_code == 200 and 'Google Drive - Quota exceeded' not in response.text: | |
describe_prompt = response.text | |
if not describe_prompt: | |
try: | |
with open('ideal_prompt.txt', 'r', encoding='utf-8') as file: | |
describe_prompt = file.read() | |
except: | |
describe_prompt = default_prompt | |
return describe_prompt | |
secret_prompt = load_prompt() | |
# secret_prompt = | |
with st.expander('**Prompt**', expanded=True): | |
# col_1, col_2 = st.columns(2) | |
describe_prompt = st.text_area(label='Промпт', height=300, value=secret_prompt) | |
_, col_button_summary, _ = st.columns([2, 1, 2]) | |
if col_button_summary.button('Generate Summary', type='primary', use_container_width=True): | |
st.session_state['summary'] = None # clear previous summary | |
st.session_state['edit_mode'] = False | |
with st.spinner(f'Performing summarization with `{selected_model}` model..'): | |
# st.session_state.summary = describe_video(model=OllamaLLM(model=selected_model), | |
# frames=frames, | |
# # frames_dir=st.session_state.frames_dir, | |
# # describe_prompt=describe_prompt + gluing_prompt + transcript_text | |
# prompt=describe_prompt + gluing_prompt + transcript_text | |
# ) | |
# [st.write(path, 'rb') for path in frames_paths] | |
response = requests.post( | |
f'{st.session_state.secret_api}/summarize', | |
# data={'frames': frames}, | |
params={'model': selected_model, | |
# 'frames': frames, | |
'prompt': describe_prompt + gluing_prompt + transcript_segments}, | |
# 'prompt': ''}, | |
files=[('frames', open(path, 'rb')) for path in frames_paths] | |
# files=[('files', open(f, 'rb')) for f in file_names] | |
) | |
st.write(response) | |
response = response.json() | |
st.badge(f'inference_time: {response["inference_time"]} | used model: {response["model_name"]}') | |
# st.write(response['form']) | |
st.session_state['summary'] = response['summary'] | |
# if combined_text: | |
# with st.spinner(f"Summarizing text using {selected_model}.. Может занять некоторое время (до x2)"): | |
# try: | |
# start_time = time.time() | |
# # Load the pipeline - specify device if possible | |
# device = 0 if torch.cuda.is_available() else -1 # device=0 for first GPU, -1 for CPU | |
# summarizer = pipeline("summarization", model=selected_model, device=device) | |
# # Handle potential long input (simplistic chunking if needed, better models handle longer inputs) | |
# # Basic check: Transformers often have input limits (e.g., 1024 tokens for BART). | |
# # A more robust solution involves chunking, summarizing chunks, and combining summaries. | |
# # For this example, we'll try summarizing directly, but add a warning. | |
# max_model_input_length = getattr(summarizer.model.config, 'max_position_embeddings', 1024) # get model's max length | |
# if len(summarizer.tokenizer.encode(combined_text)) > max_model_input_length: | |
# st.warning(f'Input text might be too long for {selected_model} (max ~{max_model_input_length} tokens).' + | |
# f'Consider using models designed for longer text or implementing chunking.') | |
# # Simple Truncation (Not Ideal): | |
# # truncated_text = summarizer.tokenizer.decode(summarizer.tokenizer.encode(combined_text, max_length=max_model_input_length, truncation=True)) | |
# # summary_result = summarizer(truncated_text, max_length=max_length, min_length=min_length, do_sample=False) | |
# # Attempt summarization (may error if too long and not handled) | |
# summary_result = summarizer(combined_text, max_length=max_length, min_length=min_length, do_sample=False) | |
# st.session_state['summary'] = summary_result[0]['summary_text'] | |
# end_time = time.time() | |
# st.success(f"Summary generated in {end_time - start_time:.2f} seconds.") | |
# except Exception as e: | |
# st.error(f"Error during summarization: {e}") | |
# st.error("This could be due to model loading issues, insufficient memory, or input text length.") | |
# if 'summarizer' in locals(): | |
# del summarizer # try to free memory | |
# if device == 0: torch.cuda.empty_cache() | |
# else: | |
# st.error("No text available to summarize.") | |
# # --- Display and Refine Summary --- | |
# # st.subheader('Summary') | |
if 'summary' in st.session_state and st.session_state['summary']: | |
# with st.container(height=600, border=True): | |
# summary_container = st.empty() | |
# edited_summary = st.session_state['summary'] | |
# # summary_container.markdown(st.session_state['summary']) | |
# summary_container.markdown(edited_summary, unsafe_allow_html=True) | |
# _, col_button_render, _ = st.columns([2, 1, 2]) | |
# # Use st.text_area for editing | |
# edited_summary = st.text_area( | |
# 'Edit the summary here (Markdown format supported):', | |
# value=st.session_state['summary'], | |
# height=400, | |
# key='summary_edit_area' | |
# ) | |
# if col_button_render.button('Render Markdown', type='secondary', use_container_width=True): | |
# with st.spinner('Generating Markdown preview..'): | |
# # st.markdown(edited_summary, unsafe_allow_html=True) | |
# summary_container.markdown(edited_summary, unsafe_allow_html=True) | |
# # st.session_state['summary'] = edited_summary # update summary | |
# # else: | |
# # st.markdown('', unsafe_allow_html=True) | |
# Инициализация состояния | |
if 'edit_mode' not in st.session_state: | |
st.session_state.edit_mode = False | |
with st.container(height=500, border=True): | |
summary_container = st.empty() | |
edited_summary = st.session_state.summary | |
# Визуализация: переключение между редактированием и превью | |
if st.session_state.edit_mode: | |
# Режим редактирования | |
edited_summary = summary_container.text_area( | |
'Редактировать Markdown:', | |
value=st.session_state.summary, | |
height=500 | |
) | |
st.session_state.summary = edited_summary | |
else: | |
# Режим превью | |
summary_container.markdown(st.session_state.summary, unsafe_allow_html=True) | |
def switch_mode(): | |
st.session_state.edit_mode = not st.session_state.edit_mode | |
# Кнопка переключения режима | |
st.button('✏️ Редактировать' if not st.session_state.edit_mode else '👁️ Просмотр', | |
on_click=switch_mode, | |
use_container_width=True) | |
# --- Export Options --- | |
st.subheader('📥 Export Notes (Download)') | |
col_export_md, col_export_docx, col_export_pdf = st.columns(3) | |
st.session_state['final_notes'] = edited_summary # store edited version | |
# st.session_state['final_notes'] = summary_container # store edited version | |
final_notes_md = st.session_state.get('final_notes', '') | |
# st.info(final_notes_md) | |
# 1. Markdown (.md) export | |
col_export_md.download_button( | |
label="📥 Markdown (.md)", | |
data=final_notes_md, | |
file_name="lecture_notes.md", | |
mime="text/markdown", | |
use_container_width=True, | |
) | |
# 2. Word (.docx) export | |
try: | |
doc = Document() | |
doc.add_heading('Lecture Notes Summary', 0) | |
# Add basic Markdown conversion (very simple - assumes paragraphs) | |
# For full Markdown -> Docx, a library like 'pandoc' (external) or more complex parsing is needed. | |
paragraphs = final_notes_md.split('\n\n') # split by double newline | |
for para in paragraphs: | |
if para.strip(): # avoid empty paragraphs | |
# Basic handling for potential markdown emphasis (crude) | |
# A proper Markdown parser would be better here | |
cleaned_para = para.replace('*', '').replace('_', '').replace('#', '').strip() | |
doc.add_paragraph(cleaned_para) | |
# Save docx to a BytesIO buffer | |
buffer = BytesIO() | |
doc.save(buffer) | |
buffer.seek(0) | |
col_export_docx.download_button( | |
label='📥 Word (.docx)', | |
data=buffer, | |
file_name='lecture_notes.docx', | |
mime='application/vnd.openxmlformats-officedocument.wordprocessingml.document', | |
use_container_width=True | |
) | |
except Exception as docx_e: | |
st.error(f'Failed to generate .docx file: {docx_e}') | |
# 3. PDF (.pdf) export | |
try: | |
col_export_pdf.download_button( | |
label='📥 PDF (.pdf)', | |
data=buffer, | |
file_name="lecture_notes.pdf", | |
use_container_width=True, | |
# mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
disabled=True | |
) | |
except Exception as pdf_e: | |
st.error(f'Failed to generate .pdf file: {pdf_e}') | |
# 3. PDF Export (Requires extra libraries/setup - Placeholder) | |
# st.markdown("---") | |
# st.write("**PDF Export:**") | |
# try: | |
# from mdpdf.cli import mdpdf | |
# pdf_buffer = BytesIO() | |
# # This often requires command-line execution or careful API usage | |
# # Simplified placeholder - actual implementation may vary: | |
# # mdpdf(pdf_buffer, md=final_notes_md, ...) # Fictional direct API call | |
# st.info("PDF generation via libraries like mdpdf/WeasyPrint requires setup.") | |
# except ImportError: | |
# st.warning("`mdpdf` library not installed. PDF export unavailable.") | |
# except Exception as pdf_e: | |
# st.error(f"Failed to generate PDF (requires setup): {pdf_e}") | |
else: | |
st.info('Summary has not been generated or is empty.') | |
# --- Optional: Cleanup Button --- | |
# st.sidebar.markdown("---") | |
# if st.sidebar.button("End Session & Clean Up Files"): | |
# session_id = get_session_id() | |
# cleanup_session_files(session_id) | |
# # Clear relevant session state keys | |
# keys_to_clear = ['video_path', 'audio_path', 'frames_dir', 'transcript', 'summary', 'final_notes', 'extracted_frames', 'session_id'] | |
# for key in keys_to_clear: | |
# if key in st.session_state: | |
# del st.session_state[key] | |
# st.success("Temporary files cleaned and session data cleared.") | |
# st.info("You can now start a new session from the 'Main' page.") | |
# # Consider navigating back to Main page or just showing message | |