Sabbah13's picture
Update app.py
7bbc5c5 verified
raw
history blame
5 kB
import os
import streamlit as st
import whisperx
import torch
from utils import convert_segments_object_to_text, check_password
from gigiachat_requests import get_access_token, get_completion_from_gigachat, get_number_of_tokens
from openai_requests import get_completion_from_openai
if check_password():
st.title('Audio Transcription App')
st.sidebar.title("Settings")
device = os.getenv('DEVICE')
batch_size = int(os.getenv('BATCH_SIZE'))
compute_type = os.getenv('COMPUTE_TYPE')
initial_base_prompt = os.getenv('BASE_PROMPT')
initial_processing_prompt = os.getenv('PROCCESS_PROMPT')
llm = st.sidebar.selectbox("LLM", ["GigaChat", "Chat GPT"], index=0)
base_prompt = st.sidebar.text_area("Промпт для резюмирования", value=initial_base_prompt)
max_tokens_summary = st.sidebar.number_input("Максимальное количество токенов при резюмировании", min_value=1, value=1024)
enable_summarization = st.sidebar.checkbox("Добавить обработку транскрибации", value=False)
processing_prompt = st.sidebar.text_area("Промпт для обработки транскрибации", value=initial_processing_prompt)
ACCESS_TOKEN = st.secrets["HF_TOKEN"]
uploaded_file = st.file_uploader("Загрузите аудиофайл", type=["mp4", "wav", "m4a"])
if uploaded_file is not None:
file_name = uploaded_file.name
if 'file_name' not in st.session_state or st.session_state.file_name != file_name:
st.session_state.transcript = ''
st.session_state.file_name = file_name
print(st.session_state.file_name)
print(st.session_state.transcript)
print(st.session_state.file_name)
print(st.session_state.transcript)
st.audio(uploaded_file)
file_extension = uploaded_file.name.split(".")[-1] # Получаем расширение файла
temp_file_path = f"temp_file.{file_extension}" # Создаем временное имя файла с правильным расширением
with open(temp_file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
print(st.session_state.transcript)
if 'transcript' not in st.session_state or st.session_state.transcript == '':
with st.spinner('Транскрибируем...'):
# Load model
model = whisperx.load_model(os.getenv('WHISPER_MODEL_SIZE'), device, compute_type=compute_type)
# Load and transcribe audio
audio = whisperx.load_audio(temp_file_path)
result = model.transcribe(audio, batch_size=batch_size, language="ru")
print('Transcribed, now aligning')
model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
print('Aligned, now diarizing')
diarize_model = whisperx.DiarizationPipeline(use_auth_token=st.secrets["HF_TOKEN"], device=device)
diarize_segments = diarize_model(audio)
result_diar = whisperx.assign_word_speakers(diarize_segments, result)
st.write("Результат транскрибации:")
transcript = convert_segments_object_to_text(result_diar)
st.session_state.transcript = transcript
else:
transcript = st.session_state.transcript
st.text(transcript)
access_token = get_access_token()
if (enable_summarization):
with st.spinner('Обрабатываем транскрибацию...'):
if (llm == 'GigaChat'):
number_of_tokens = get_number_of_tokens(transcript, access_token)
print('Количество токенов в транскрибации: ' + str(number_of_tokens))
transcript = get_completion_from_gigachat(processing_prompt + transcript, number_of_tokens + 500, access_token)
elif (llm == 'Chat GPT'):
transcript = get_completion_from_openai(processing_prompt + transcript)
st.write("Результат обработки:")
st.text(transcript)
with st.spinner('Резюмируем...'):
if (llm == 'GigaChat'):
summary_answer = get_completion_from_gigachat(base_prompt + transcript, max_tokens_summary, access_token)
elif (llm == 'Chat GPT'):
summary_answer = get_completion_from_openai(base_prompt + transcript, max_tokens_summary)
st.write("Результат резюмирования:")
st.text(summary_answer)