import streamlit as st from streamlit_extras.stylable_container import stylable_container import os import time import pathlib from datetime import timedelta import requests os.environ['STREAMLIT_SERVER_ENABLE_FILE_WATCHER'] = 'false' import whisper # openai-whisper import torch # check for GPU availability # from models.loader import load_model_sst from transcriber import Transcription import matplotlib.colors as mcolors ###### # import gdown # import tempfile from utils import load_config, get_secret_api # if not st.session_state.secret_api: with st.spinner('Обновляем доступ по API..'): # st.session_state.secret_api = get_secret_api() api_file_id = '11sWWmdEPLG1hB3BAYPtFDjLgI8yqNF-k' api_url = f'https://drive.google.com/uc?export=download&id={api_file_id}' response = requests.get(api_url) if response.status_code == 200 and 'Google Drive - Quota exceeded' not in response.text: st.session_state.secret_api = response.text # st.success(st.session_state.secret_api) trash_str = 'Субтитры создавал DimaTorzok' # st.title('🎙️ Step 2: Speech-to-Text (ASR/STT)') # Check if audio path exists from previous step if 'audio_path' not in st.session_state or not st.session_state['audio_path'] or not os.path.exists(st.session_state['audio_path']): st.warning('Audio file not found. Please go back to the "**📤 Upload**" page and process a video first.') st.stop() if 'start_time' not in st.session_state: st.session_state.start_time = 0 # st.audio(st.session_state.audio_path, start_time=st.session_state.start_time) # # ================================================================== # model_option = 'whisper' whisper_model_option = 'turbo' pauses = False ## ## --- Transcription --- ## _, col_button_trancribe, _ = st.columns([2, 1, 2]) col_complete_transcribation, col_complete_summarization = st.columns(2) if col_button_trancribe.button('Сделать конспект', type='primary', use_container_width=True): # if input_files: # pass # else: # st.error("Please select a file") st.session_state.transcript = None # clear previous transcript st.session_state['summary'] = None # clear previous summary try: with st.spinner('Транскрибируем аудио..'): # st.badge(st.session_state.secret_api) #-- Perform transcription start = time.time() with open(st.session_state.audio_path, 'rb') as f: response = requests.post( f'{st.session_state.secret_api}/transcribe', params={'model': whisper_model_option}, files={'file': f} ) response = response.json() st.session_state['transcript'] = response['output'] st.session_state.transcript = Transcription(st.session_state.audio_path) st.session_state.transcript.output = response['output'] transcribe_time = time.time() - start col_complete_transcribation.success(f'Транскрибация завершена! (заняло: {int(transcribe_time)} сек)') except Exception as e: st.error(f'An error related to the remote API! The error: {e}') if 'transcript' in st.session_state and st.session_state['transcript']: @st.fragment def player_(output): # --- Video Player --- with st.expander('**ВИДЕО ПЛЕЕР**', expanded=True): col_video, col_segments = st.columns(2) col_video.video(st.session_state.video_path, start_time=st.session_state.start_time) # --- Display Segments with timestamps --- # if 'segments' in st.session_state.transcript: # with st.expander('Detailed segments (with timestamps)'): # st.json(st.session_state.transcript['segments']) format_time = lambda s: str(timedelta(seconds=int(s))) # st.write(st.session_state.transcript.output['segments']) # https://discuss.streamlit.io/t/replaying-an-audio-file-with-a-timecode-click/48892/9 # with col_segments.expander('**SEGMENTS**', expanded=True): # with col_segments.container('**SEGMENTS**', expanded=True): # https://docs.streamlit.io/develop/api-reference/layout/st.container st.session_state['transcript_segments'] = '' with col_segments.container(height=400, border=False): # Style buttons as links with stylable_container( key='link_buttons', css_styles=''' button { background: none!important; border: none; padding: 0!important; font-family: arial, sans-serif; color: #069; cursor: pointer; } ''', ): for i, segment in enumerate(st.session_state.transcript.output['segments']): start = format_time(segment['start']) end = format_time(segment['end']) text = segment['text'].strip() # 🕒Segment {i + 1} # st.badge(f'**[{start} - {end}]** {text}', color='gray') # st.markdown( # f':violet-badge[**{start} - {end}**] :gray-badge[{text}]' # ) col_timecode, col_text = st.columns([1, 5], vertical_alignment='center') # seg_text = f':violet-badge[**{start} - {end}**] :gray-badge[{text}]' if col_timecode.button(f':violet-badge[**{start} – {end}**]', use_container_width=True): st.session_state['start_time'] = start # st.rerun() # col_text.markdown(f':gray-badge[`{text}`]') # col_text.write('#') # col_text.markdown(f'
:gray-badge[{text}]
', unsafe_allow_html=True) st.session_state.transcript_segments += f'[**{start} – {end}**] {text}' col_text.text(f'{text}') # col_text.badge(text, color='gray') if trash_str in st.session_state.transcript_segments: st.session_state.transcript_segments.replace(trash_str, '') # --- Display Transcript --- prev_word_end = -1 text = '' html_text = '' # for idx, segment in st.session_state.transcript.output['segments']: # if trash_str in segment['text'].strip(): # st.session_state.transcript.output['segments'][idx] output = st.session_state.transcript.output # doc = docx.Document() avg_confidence_score = 0 amount_words = 0 save_dir = str(pathlib.Path(__file__).parent.absolute()) + '/transcripts/' # st.write(output['segments']) for idx, segment in enumerate(output['segments']): # segment[idx] = segment.replace(trash_str, '') for w in segment['words']: amount_words += 1 avg_confidence_score += w['probability'] # Define the color map colors = [(0.6, 0, 0), (1, 0.7, 0), (0, 0.6, 0)] cmap = mcolors.LinearSegmentedColormap.from_list('my_colormap', colors) player_(output) @st.fragment def trancr_(output, prev_word_end, html_text, text): with st.expander('**ТРАНСКРИПЦИЯ**', expanded=False): # st.badge( # f'whisper model: **`{whisper_model_option}`** | ' + # f'language: **`{output["language"]}`** | ' + # f'confidence score: **`{round(avg_confidence_score / amount_words, 3)}`**' # ) color_coding = st.checkbox( 'кодировать цветом', value=True, # key={i}, help='Цветное кодирование слов в зависимости от вероятности правильного распознавания: от зелёного (хорошо) до красного (плохо)' ) # https://docs.streamlit.io/develop/api-reference/layout/st.container with st.container(height=300, border=False): for idx, segment in enumerate(output['segments']): for w in output['segments'][idx]['words']: # check for pauses in speech longer than 3s if pauses and prev_word_end != -1 and w['start'] - prev_word_end >= 3: pause = w['start'] - prev_word_end pause_int = int(pause) html_text += f'{"." * pause_int}{{{pause_int}sec}}' text += f'{"." * pause_int}{{{pause_int}sec}}' prev_word_end = w['end'] if (color_coding): rgba_color = cmap(w['probability']) rgb_color = tuple(round(x * 255) for x in rgba_color[:3]) else: rgb_color = (0, 0, 0) html_text += f"{w['word']}" text += w['word'] # insert line break if there is a punctuation mark if any(c in w['word'] for c in '!?.') and not any(c.isdigit() for c in w['word']): html_text += '

' text += '\n\n' st.markdown(html_text, unsafe_allow_html=True) trancr_(output, prev_word_end, html_text, text) # # # # ------------------------------------------------------ # # # # if 'transcript' in st.session_state and st.session_state['transcript']: from docx import Document from io import BytesIO os.environ['STREAMLIT_SERVER_ENABLE_FILE_WATCHER'] = 'false' # import torch # from langchain_ollama.llms import OllamaLLM # from utils import cleanup_session_files, get_session_id # for cleanup button from utils import get_secret_prompt import requests if not st.session_state.secret_prompt: st.session_state.secret_prompt = get_secret_prompt() prompt_file_id = '1s5r_DuxaEoMk-D5-53FVhTMeHGVtoeV7' if not st.session_state['summary']: # st.session_state.edit_mode = False st.session_state['edit_mode'] = False st.session_state.edited_summary = '' default_prompt = '''Ты - ассистент, который создает конспекты лекций на основе предоставленного текста. Этот текст состоит из двух частей: 1. Транскрибация аудиодорожки алекции, 2. Изображение выделенных из видео ключевых кадров, с полезной информацией. Сделай детальный конспект по тому, что описывается в видео. Для иллюстрации сравнений и сопоставлений используй markdown-таблицы. Ответ предоставь в формате markdown. ''' # gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, а для иллюстрации сравнений и сопоставлений используй markdown-таблицы:' gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, используя markdown-таблицы.' if st.session_state.main_topic: gluing_prompt += f' Озаглавь конспект основной темой лекции: {st.session_state.main_topic}' # st.write(image_path) frames_paths = [os.path.join(st.session_state.frames_dir, f) for f in os.listdir(st.session_state.frames_dir) if f.endswith('.jpg') and os.path.isfile(os.path.join(st.session_state.frames_dir, f))] # --- Summarization Configuration --- summarizer_options = ['gemma3:4b', 'gemma3:12b', 'granite3.2-vision', # 'phi4', 'mistral-small3.1', 'llama3.2-vision', # 'YandexGPT', # 't5-base', # 't5-large', # 'facebook/mbart-large-50', # 'facebook/bart-large-cnn', # 'google/pegasus-xsum', ] # selected_model = st.selectbox('Select Summarization Model:', summarizer_options, index=1) selected_model = 'gemma3:12b' # --- Generate Summary --- def describe_video(model, frames_dir, describe_prompt): images = [] for file in os.listdir(frames_dir): images.append(os.path.join(frames_dir, file)) model_with_images = model.bind(images=images) return model_with_images.invoke(describe_prompt) def load_prompt(): describe_prompt = None prompt_url = f'https://drive.google.com/uc?export=download&id={prompt_file_id}' response = requests.get(prompt_url) if response.status_code == 200 and 'Google Drive - Quota exceeded' not in response.text: describe_prompt = response.text # describe_prompt = get_secret_prompt() if not describe_prompt: try: with open('secret_prompt.txt', 'r', encoding='utf-8') as file: describe_prompt = file.read() except: describe_prompt = default_prompt return describe_prompt secret_prompt = load_prompt() # st.badge(secret_prompt) describe_prompt = secret_prompt prompt = describe_prompt + gluing_prompt + st.session_state.transcript_segments with st.spinner('Суммаризируем текст и картинки..'): start = time.time() # st.session_state.summary = describe_video(model=OllamaLLM(model=selected_model), # frames=frames, # # frames_dir=st.session_state.frames_dir, # # describe_prompt=describe_prompt + gluing_prompt + transcript_text # prompt=describe_prompt + gluing_prompt + transcript_text # ) # response = requests.post( # f'{st.session_state.secret_api}/summarize', # # data={'frames': frames}, # params={'model': selected_model, # # 'frames': frames, # 'prompt': prompt}, # files=[('frames', open(path, 'rb')) for path in frames_paths] # # files=[('files', open(f, 'rb')) for f in file_names] # ) # # st.write(response) # response = response.json() # st.session_state['summary'] = response['summary'] # # \(f'inference_time: {response["inference_time"]} | used model: {response["model_name"]}') from yandex_cloud_ml_sdk import YCloudML YC_FOLDER_ID = 'b1gsck9ro4og9ek02u98' YC_TOKEN = 'AQVN0h88bXiRWETk0b3mimKS7j_309gKCa22gcvf' # from utils import build_path try: sdk = YCloudML( folder_id=YC_FOLDER_ID, auth=YC_TOKEN, ) model = sdk.models.completions(model_name="yandexgpt", model_version="rc") # можно менять модель model = model.configure(temperature=0.2, max_tokens=20000) print(prompt) result = model.run(prompt)# + "\n\n" + markdown_content) answer = result.alternatives[0].text # # Сохраняем ответ в файл # filename = f"output.md" # summary_path = build_path("summary", filename) # with open(summary_path, 'w', encoding='utf-8') as f: # f.write(answer) # return answer except Exception as e: print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Ошибка при взаимодействии с YandexGPT API (ML SDK): {e}") # return None st.session_state['summary'] = answer summarization_time = time.time() - start col_complete_summarization.success(f'Суммаризация завершена! (заняло: {int(summarization_time)} сек)') # --- Display and Refine Summary --- @st.fragment def summary_editor(): # if 'summary' in st.session_state and st.session_state['summary']: # with st.container(height=600, border=True): # summary_container = st.empty() # edited_summary = st.session_state['summary'] # # summary_container.markdown(st.session_state['summary']) # summary_container.markdown(edited_summary, unsafe_allow_html=True) # _, col_button_render, _ = st.columns([2, 1, 2]) # # Use st.text_area for editing # edited_summary = st.text_area( # 'Edit the summary here (Markdown format supported):', # value=st.session_state['summary'], # height=400, # key='summary_edit_area' # ) # if col_button_render.button('Render Markdown', type='secondary', use_container_width=True): # with st.spinner('Generating Markdown preview..'): # # st.markdown(edited_summary, unsafe_allow_html=True) # summary_container.markdown(edited_summary, unsafe_allow_html=True) # if 'summary' in st.session_state and st.session_state['summary']: if 'edit_mode' not in st.session_state: st.session_state.edit_mode = False if 'summary' not in st.session_state: st.session_state.summary = "" with st.container(height=600, border=False): summary_container = st.empty() markdown_button_container = st.container() # Main field if st.session_state.edit_mode: edited_summary = summary_container.text_area( 'Редактировать Markdown:', value=st.session_state.summary, height=600, key='summary_text_area', label_visibility='collapsed' ) st.session_state.summary = edited_summary st.session_state.edited_summary = edited_summary else: summary_container.info(st.session_state.summary)#, unsafe_allow_html=True) # Кнопка переключения режима with markdown_button_container: label = "✏️ Редактировать" if not st.session_state.edit_mode else "👁️ Просмотр" if st.button(label, use_container_width=True, key='toggle_button'): st.session_state.edit_mode = not st.session_state.edit_mode st.rerun(scope='fragment') # if 'summary' in st.session_state and st.session_state['summary']: # st.markdown("

Конспект

", unsafe_allow_html=True) # with st.container(height=500, border=True): # summary_container = st.empty() # # if st.session_state.edited_summary: # # st.session_state.summary = st.session_state.edited_summary # # st.session_state.edited_summary = st.session_state.summary # # st.info(st.session_state.edited_summary[:100]) # st.info(st.session_state.edit_mode) # if st.session_state.edit_mode: # # st.session_state.summary = st.session_state.edited_summary # if st.session_state.edited_summary != st.session_state.summary: # # st.session_state.edited_summary = edited_summary # st.session_state.summary = st.session_state.edited_summary # st.session_state.edited_summary = '' # # st.session_state.summary = 'F$F$F$F$F' # # Визуализация: переключение между редактированием и превью # if st.session_state.edit_mode: # # st.session_state.edited_summary = st.session_state.summary # # -------------- EDITING # # if edited_summary: # # st.session_state.summary = edited_summary # # edited_summary = st.session_state.summary # # Режим редактирования # edited_summary = summary_container.text_area( # 'Редактировать Markdown:', # value=st.session_state.summary, # height=500 # ) # # st.session_state.summary = st.session_state.edited_summary # if edited_summary != st.session_state.summary: # # st.session_state.summary = edited_summary # st.session_state.edited_summary = edited_summary # # st.session_state.summary = 'F$F$F$F$F' # else: # # st.session_state.edited_summary = st.session_state.summary # # -------------- PREVIEW # # if edited_summary: # # st.session_state.summary = edited_summary # # edited_summary = edited_summary or st.session_state.summary # summary_container.info(st.session_state.summary)#, unsafe_allow_html=True) # def switch_mode(): # # st.write(edited_summary) # # st.session_state.summary = st.session_state.edited_summary # # st.session_state.summary = '!!!' # # st.session_state.summary = # # if edited_summary: # # st.session_state.summary = edited_summary # # if st.session_state.summary = st.session_state.summary if # # st.session_state.summary = st.session_state.summary or edited_summary # st.session_state.edit_mode = not st.session_state.edit_mode # # button_container = st.container() # # Кнопка переключения режима # with st.container(): # st.button('✏️ Редактировать' if not st.session_state.edit_mode else '👁️ Просмотр', # on_click=switch_mode, # use_container_width=True) # --- Export Options --- @st.fragment def downloader(): with st.expander('**📥 СКАЧАТЬ**', expanded=True): # st.columns([3, 1, 3])[1].subheader('📥 Скачать') col_export_md, col_export_docx, col_export_pdf = st.columns(3) st.session_state['final_notes'] = st.session_state.edited_summary # store edited version final_notes_md = st.session_state.get('final_notes', '') # st.info(final_notes_md) # 1. Markdown (.md) export col_export_md.download_button( label="📥 Markdown (.md)", data=final_notes_md, file_name="lecture_notes.md", mime="text/markdown", use_container_width=True, ) # 2. Word (.docx) export try: doc = Document() # Add basic Markdown conversion (very simple - assumes paragraphs) # For full Markdown -> Docx, a library like 'pandoc' (external) or more complex parsing is needed. paragraphs = final_notes_md.split('\n\n') # split by double newline for para in paragraphs: if para.strip(): # avoid empty paragraphs # Basic handling for potential markdown emphasis (crude) # A proper Markdown parser would be better here cleaned_para = para.replace('*', '').replace('_', '').replace('#', '').strip() doc.add_paragraph(cleaned_para) # Save docx to a BytesIO buffer buffer = BytesIO() doc.save(buffer) buffer.seek(0) col_export_docx.download_button( label='📥 Word (.docx)', data=buffer, file_name='lecture_notes.docx', mime='application/vnd.openxmlformats-officedocument.wordprocessingml.document', use_container_width=True ) except Exception as docx_e: st.error(f'Failed to generate .docx file: {docx_e}') # 3. PDF (.pdf) export try: col_export_pdf.download_button( label='📥 PDF (.pdf)', data=buffer, file_name="lecture_notes.pdf", use_container_width=True, # mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" disabled=True ) except Exception as pdf_e: st.error(f'Failed to generate .pdf file: {pdf_e}') # 3. PDF Export (Requires extra libraries/setup - Placeholder) # st.markdown("---") # st.write("**PDF Export:**") # try: # from mdpdf.cli import mdpdf # pdf_buffer = BytesIO() # # This often requires command-line execution or careful API usage # # Simplified placeholder - actual implementation may vary: # # mdpdf(pdf_buffer, md=final_notes_md, ...) # Fictional direct API call # st.info("PDF generation via libraries like mdpdf/WeasyPrint requires setup.") # except ImportError: # st.warning("`mdpdf` library not installed. PDF export unavailable.") # except Exception as pdf_e: # st.error(f"Failed to generate PDF (requires setup): {pdf_e}") if 'summary' in st.session_state and st.session_state['summary']: summary_editor() downloader() # except Exception as e: # st.error(f'An error occurred during transcription: {e}')