conspectum / ui_summarize.py
macsunmood's picture
update app
6edd739
import streamlit as st
import os
from transformers import pipeline
import time
from docx import Document
from io import BytesIO
os.environ['STREAMLIT_SERVER_ENABLE_FILE_WATCHER'] = 'false'
import torch
from langchain_ollama.llms import OllamaLLM
# from utils import cleanup_session_files, get_session_id # for cleanup button
from utils import get_secret_api, get_secret_prompt
st.session_state.secret_api = get_secret_api()
import requests
# st.session_state.secret_prompt = get_secret_prompt()
prompt_file_id = '1s5r_DuxaEoMk-D5-53FVhTMeHGVtoeV7'
default_prompt = '''Ты - ассистент, который создает конспекты лекций на основе предоставленного текста. Этот текст состоит из двух частей:
1. Транскрибация аудиодорожки видеолекции,
2. Изображение выделенных из видео ключевых кадров, с полезной информацией.
Сделай детальный конспект по тому, что описывается в видео. Для иллюстрации сравнений и сопоставлений используй markdown-таблицы. Ответ предоставь в формате markdown.
'''
# gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, а для иллюстрации сравнений и сопоставлений используй markdown-таблицы:'
gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, используя markdown-таблицы.'
if st.session_state.main_topic:
gluing_prompt += f' Основная тема лекции: {st.session_state.main_topic}'
# st.write(image_path)
frames_paths = [os.path.join(st.session_state.frames_dir, f)
for f in os.listdir(st.session_state.frames_dir)
if f.endswith('.jpg')
and os.path.isfile(os.path.join(st.session_state.frames_dir, f))]
# import base64
# # Load and encode JPEG images to base64
# frames = []
# # st.success(os.listdir(st.session_state.frames_dir))
# # st.success([os.path.isfile(f) for f in os.listdir(st.session_state.frames_dir)])# if f.endswith('.jpg') and os.path.isfile(f)])
# for image_path in frames_paths:
# # st.write(image_path)
# with open(os.path.join(st.session_state.frames_dir, image_path), 'rb') as image_file:
# # Read the image and encode it to base64
# encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
# frames.append(encoded_string)
# # st.success(frames)
st.title('📝 Step 4: Lecture Summarization')
# Check if transcript and potentially OCR text are available
transcript_available = 'transcript' in st.session_state and st.session_state['transcript']
frames_available = 'frames_dir' in st.session_state and st.session_state['frames_dir']
if not transcript_available and not frames_available:
st.warning("No text content (Transcript or OCR) found. Please complete previous steps first.")
st.stop()
# st.info("This step combines the generated transcript and OCR text (if available) and creates a summary.")
# --- Combine Sources ---
st.subheader('Sources')
# combined_text = ""
source_info = []
col_source_transcript, col_source_frames = st.columns(2)
if transcript_available:
col_source_transcript.success('✅ Transcript found')
# st.success(len(st.session_state.transcript.__dict__['output']))
# st.success(st.session_state.transcript.__dict__['output'][0]['text'])
# combined_text += '--- Transcript ---\n' + st.session_state.transcript['output'][0]['text'] + '\n\n'
# st.success(st.session_state.transcript.output[0]['text'])
transcript_text = st.session_state.transcript.output['text']
transcript_segments = st.session_state.transcript_segments
# combined_text += '--- Transcript ---\n\n' + transcript_text + '\n\n'
# st.write(combined_text)
source_info.append('Transcript')
with col_source_transcript.expander('Show transcript'):
st.text_area('Transcript', transcript_text, height=200, key='sum_transcript_disp')
else:
col_source_transcript.warning('Transcript not available.')
if frames_available:
col_source_frames.success('✅ Extracted frames found')
# combined_text += "--- OCR results ---\n" + st.session_state['frames_dir']
source_info.append('Frames dir')
# with st.expander('Extracted frames directory'):
# st.text_area('Extracted frames directory', st.session_state['frames_dir'], height=200, key="sum_ocr_disp")
# st.text_area('Extracted frames directory', st.session_state['frames_dir'], height=200, key="sum_ocr_disp")
with col_source_frames.expander('Show frames'):
st.text_input('Extracted frames directory', st.session_state['frames_dir'])
else:
# st.warning('OCR Text not available.')
col_source_frames.warning('Extracted frames not available.')
# combined_text = combined_text.strip()
# if not combined_text:
# st.error("Combined text is empty. Cannot proceed.")
if not transcript_text:
st.error('Transcript text is empty. Cannot proceed.')
st.stop()
# --- Summarization Configuration ---
st.subheader('Summarization Settings')
# Consider different models/pipelines
summarizer_options = ['gemma3:4b',
'gemma3:12b',
'granite3.2-vision',
# 'phi4',
'mistral-small3.1',
'llama3.2-vision',
# 'YandexGPT',
# 't5-base',
# 't5-large',
# 'facebook/mbart-large-50',
# 'facebook/bart-large-cnn',
# 'google/pegasus-xsum',
]
selected_model = st.selectbox('Select Summarization Model:', summarizer_options, index=1)
# # Dynamic length based on input size (example logic)
# # input_length = len(combined_text.split())
# input_length = len(transcript_text.split()) # approx word count
# default_min = max(50, input_length // 10) # suggest min length ~10% of input
# default_max = max(150, input_length // 3) # suggest max length ~30% of input
# min_length = st.slider("Minimum Summary Length (tokens):", min_value=30, max_value=max(500, default_max + 100), value=default_min)
# max_length = st.slider("Maximum Summary Length (tokens):", min_value=50, max_value=max(1000, default_max + 200), value=default_max)
# if min_length >= max_length:
# st.warning("Minimum length should be less than maximum length.")
# # Adjust max_length automatically or prevent proceeding
# max_length = min_length + 50 # simple adjustment
# --- Generate Summary ---
def describe_video(model, frames_dir, describe_prompt):
images = []
for file in os.listdir(frames_dir):
images.append(os.path.join(frames_dir, file))
model_with_images = model.bind(images=images)
return model_with_images.invoke(describe_prompt)
def load_prompt():
describe_prompt = None
prompt_url = f'https://drive.google.com/uc?export=download&id={prompt_file_id}'
response = requests.get(prompt_url)
if response.status_code == 200 and 'Google Drive - Quota exceeded' not in response.text:
describe_prompt = response.text
if not describe_prompt:
try:
with open('ideal_prompt.txt', 'r', encoding='utf-8') as file:
describe_prompt = file.read()
except:
describe_prompt = default_prompt
return describe_prompt
secret_prompt = load_prompt()
# secret_prompt =
with st.expander('**Prompt**', expanded=True):
# col_1, col_2 = st.columns(2)
describe_prompt = st.text_area(label='Промпт', height=300, value=secret_prompt)
_, col_button_summary, _ = st.columns([2, 1, 2])
if col_button_summary.button('Generate Summary', type='primary', use_container_width=True):
st.session_state['summary'] = None # clear previous summary
st.session_state['edit_mode'] = False
with st.spinner(f'Performing summarization with `{selected_model}` model..'):
# st.session_state.summary = describe_video(model=OllamaLLM(model=selected_model),
# frames=frames,
# # frames_dir=st.session_state.frames_dir,
# # describe_prompt=describe_prompt + gluing_prompt + transcript_text
# prompt=describe_prompt + gluing_prompt + transcript_text
# )
# [st.write(path, 'rb') for path in frames_paths]
response = requests.post(
f'{st.session_state.secret_api}/summarize',
# data={'frames': frames},
params={'model': selected_model,
# 'frames': frames,
'prompt': describe_prompt + gluing_prompt + transcript_segments},
# 'prompt': ''},
files=[('frames', open(path, 'rb')) for path in frames_paths]
# files=[('files', open(f, 'rb')) for f in file_names]
)
st.write(response)
response = response.json()
st.badge(f'inference_time: {response["inference_time"]} | used model: {response["model_name"]}')
# st.write(response['form'])
st.session_state['summary'] = response['summary']
# if combined_text:
# with st.spinner(f"Summarizing text using {selected_model}.. Может занять некоторое время (до x2)"):
# try:
# start_time = time.time()
# # Load the pipeline - specify device if possible
# device = 0 if torch.cuda.is_available() else -1 # device=0 for first GPU, -1 for CPU
# summarizer = pipeline("summarization", model=selected_model, device=device)
# # Handle potential long input (simplistic chunking if needed, better models handle longer inputs)
# # Basic check: Transformers often have input limits (e.g., 1024 tokens for BART).
# # A more robust solution involves chunking, summarizing chunks, and combining summaries.
# # For this example, we'll try summarizing directly, but add a warning.
# max_model_input_length = getattr(summarizer.model.config, 'max_position_embeddings', 1024) # get model's max length
# if len(summarizer.tokenizer.encode(combined_text)) > max_model_input_length:
# st.warning(f'Input text might be too long for {selected_model} (max ~{max_model_input_length} tokens).' +
# f'Consider using models designed for longer text or implementing chunking.')
# # Simple Truncation (Not Ideal):
# # truncated_text = summarizer.tokenizer.decode(summarizer.tokenizer.encode(combined_text, max_length=max_model_input_length, truncation=True))
# # summary_result = summarizer(truncated_text, max_length=max_length, min_length=min_length, do_sample=False)
# # Attempt summarization (may error if too long and not handled)
# summary_result = summarizer(combined_text, max_length=max_length, min_length=min_length, do_sample=False)
# st.session_state['summary'] = summary_result[0]['summary_text']
# end_time = time.time()
# st.success(f"Summary generated in {end_time - start_time:.2f} seconds.")
# except Exception as e:
# st.error(f"Error during summarization: {e}")
# st.error("This could be due to model loading issues, insufficient memory, or input text length.")
# if 'summarizer' in locals():
# del summarizer # try to free memory
# if device == 0: torch.cuda.empty_cache()
# else:
# st.error("No text available to summarize.")
# # --- Display and Refine Summary ---
# # st.subheader('Summary')
if 'summary' in st.session_state and st.session_state['summary']:
# with st.container(height=600, border=True):
# summary_container = st.empty()
# edited_summary = st.session_state['summary']
# # summary_container.markdown(st.session_state['summary'])
# summary_container.markdown(edited_summary, unsafe_allow_html=True)
# _, col_button_render, _ = st.columns([2, 1, 2])
# # Use st.text_area for editing
# edited_summary = st.text_area(
# 'Edit the summary here (Markdown format supported):',
# value=st.session_state['summary'],
# height=400,
# key='summary_edit_area'
# )
# if col_button_render.button('Render Markdown', type='secondary', use_container_width=True):
# with st.spinner('Generating Markdown preview..'):
# # st.markdown(edited_summary, unsafe_allow_html=True)
# summary_container.markdown(edited_summary, unsafe_allow_html=True)
# # st.session_state['summary'] = edited_summary # update summary
# # else:
# # st.markdown('', unsafe_allow_html=True)
# Инициализация состояния
if 'edit_mode' not in st.session_state:
st.session_state.edit_mode = False
with st.container(height=500, border=True):
summary_container = st.empty()
edited_summary = st.session_state.summary
# Визуализация: переключение между редактированием и превью
if st.session_state.edit_mode:
# Режим редактирования
edited_summary = summary_container.text_area(
'Редактировать Markdown:',
value=st.session_state.summary,
height=500
)
st.session_state.summary = edited_summary
else:
# Режим превью
summary_container.markdown(st.session_state.summary, unsafe_allow_html=True)
def switch_mode():
st.session_state.edit_mode = not st.session_state.edit_mode
# Кнопка переключения режима
st.button('✏️ Редактировать' if not st.session_state.edit_mode else '👁️ Просмотр',
on_click=switch_mode,
use_container_width=True)
# --- Export Options ---
st.subheader('📥 Export Notes (Download)')
col_export_md, col_export_docx, col_export_pdf = st.columns(3)
st.session_state['final_notes'] = edited_summary # store edited version
# st.session_state['final_notes'] = summary_container # store edited version
final_notes_md = st.session_state.get('final_notes', '')
# st.info(final_notes_md)
# 1. Markdown (.md) export
col_export_md.download_button(
label="📥 Markdown (.md)",
data=final_notes_md,
file_name="lecture_notes.md",
mime="text/markdown",
use_container_width=True,
)
# 2. Word (.docx) export
try:
doc = Document()
doc.add_heading('Lecture Notes Summary', 0)
# Add basic Markdown conversion (very simple - assumes paragraphs)
# For full Markdown -> Docx, a library like 'pandoc' (external) or more complex parsing is needed.
paragraphs = final_notes_md.split('\n\n') # split by double newline
for para in paragraphs:
if para.strip(): # avoid empty paragraphs
# Basic handling for potential markdown emphasis (crude)
# A proper Markdown parser would be better here
cleaned_para = para.replace('*', '').replace('_', '').replace('#', '').strip()
doc.add_paragraph(cleaned_para)
# Save docx to a BytesIO buffer
buffer = BytesIO()
doc.save(buffer)
buffer.seek(0)
col_export_docx.download_button(
label='📥 Word (.docx)',
data=buffer,
file_name='lecture_notes.docx',
mime='application/vnd.openxmlformats-officedocument.wordprocessingml.document',
use_container_width=True
)
except Exception as docx_e:
st.error(f'Failed to generate .docx file: {docx_e}')
# 3. PDF (.pdf) export
try:
col_export_pdf.download_button(
label='📥 PDF (.pdf)',
data=buffer,
file_name="lecture_notes.pdf",
use_container_width=True,
# mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
disabled=True
)
except Exception as pdf_e:
st.error(f'Failed to generate .pdf file: {pdf_e}')
# 3. PDF Export (Requires extra libraries/setup - Placeholder)
# st.markdown("---")
# st.write("**PDF Export:**")
# try:
# from mdpdf.cli import mdpdf
# pdf_buffer = BytesIO()
# # This often requires command-line execution or careful API usage
# # Simplified placeholder - actual implementation may vary:
# # mdpdf(pdf_buffer, md=final_notes_md, ...) # Fictional direct API call
# st.info("PDF generation via libraries like mdpdf/WeasyPrint requires setup.")
# except ImportError:
# st.warning("`mdpdf` library not installed. PDF export unavailable.")
# except Exception as pdf_e:
# st.error(f"Failed to generate PDF (requires setup): {pdf_e}")
else:
st.info('Summary has not been generated or is empty.')
# --- Optional: Cleanup Button ---
# st.sidebar.markdown("---")
# if st.sidebar.button("End Session & Clean Up Files"):
# session_id = get_session_id()
# cleanup_session_files(session_id)
# # Clear relevant session state keys
# keys_to_clear = ['video_path', 'audio_path', 'frames_dir', 'transcript', 'summary', 'final_notes', 'extracted_frames', 'session_id']
# for key in keys_to_clear:
# if key in st.session_state:
# del st.session_state[key]
# st.success("Temporary files cleaned and session data cleared.")
# st.info("You can now start a new session from the 'Main' page.")
# # Consider navigating back to Main page or just showing message