AlanVideos / app.py
turing-usp's picture
Update app.py
e788959 verified
raw
history blame
12.9 kB
import os
import re
import streamlit as st
import transformers
from transformers import pipeline
from transformers import AutoTokenizer
import nltk
from PIL import Image
import torch
import ffmpeg
import speech_recognition as sr
from pytube import YouTube
import string
import whisper
from moviepy.editor import AudioFileClip
from langchain.text_splitter import CharacterTextSplitter
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationalRetrievalChain
from langchain.callbacks import get_openai_callback
from langchain import PromptTemplate
from langchain import LLMChain
openai_api_key = os.getenv("openai_api_key")
# Criando a função que corta o texto em chunks:
def get_chunks(texto_transcrito):
"""Função que recebe uma string (um texto transcrito) e devolve
uma lista de strings, em que cada string ("chunk") contém um
tamanho específico, com overlap entre os pedaços. """
text_splitter=CharacterTextSplitter(separator='.', chunk_size=1000, chunk_overlap=100, length_function=len)
chunks = text_splitter.split_text(texto_transcrito)
return chunks
# Criando a função de vectorstore para transformar os chunks de texto em embeddings:
def get_vectorstore(chunks):
"""Função que recebe uma lista de chunks de texto e
os converte em embeddings (representações numéricas)
usando um modelo de embeddings pré-treinado. """
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(texts=chunks, embedding=embeddings)
return vectorstore
# Criando a função para converter o vídeo para o formato adequado:
@st.cache_data
def convert_mp4_to_wav(mp4_file, wav_file):
"""Função que converte um arquivo de vídeo no formato MP4 para um arquivo de
áudio no formato WAV a partir das seguintes entradas: o caminho do arquivo
de vídeo no formato MP4 que se deseja converter; o caminho onde o arquivo
de áudio WAV resultante será salvo. Nenhuma saída explícita é retornada."""
video = AudioFileClip(mp4_file)
video.write_audiofile(wav_file)
# Criando a função que gera a transcrição:
@st.cache_data
def get_transcriptions(url):
""" Função que recebe um link de um vídeo no YouTube e
devolve um dicionário contendo o título do vídeo (key)
e a transcrição de seu áudio (value). """
dicionario = {}
# Baixando o áudio:
youtube_content = YouTube(url)
audio_streams = youtube_content.streams.filter(only_audio=True)
audio_streams[0].download()
title = youtube_content.title
# Convertendo para Wav:
mp4_file = '/content/'+''.join(char for char in title if char not in string.punctuation.replace('/', '').replace('-', '').replace('!', '').replace('(', '').replace(')', ''))+'.mp4'
wav_file = '/content/'+''.join(char for char in title if char not in string.punctuation.replace('/', '').replace('-', '').replace('!', '').replace('(', '').replace(')', ''))+'.wav'
convert_mp4_to_wav(mp4_file, wav_file)
# Inicializando o reconhecedor de fala:
r = sr.Recognizer()
# Carregando o áudio gravado pelo Whisper em um objeto de áudio:
with sr.AudioFile(wav_file) as source:
audio = r.record(source)
# Transcrevendo:
texto_transcrito = r.recognize_whisper(audio)
# Adicionando ao dicionario:
dicionario[title] = texto_transcrito
return dicionario
# Criando a função que carrega o sumarizador extrativo (para português e inglês) via HuggingFace:
@st.cache_resource
def load_extractive():
""" Função sem valores de entrada que carrega o modelo
de sumarização extrativa via HuggingFace. """
return pipeline("summarization", model="NotXia/longformer-bio-ext-summ", tokenizer=AutoTokenizer.from_pretrained("NotXia/longformer-bio-ext-summ"), trust_remote_code=True)
# Criando a função que gera a sumarização extrativa:
@st.cache_data
def get_summarization(_model_pipeline, full_text, ratio):
""" Função que recebe um texto completo a ser resumido, juntamente
à taxa desejada de sumarização e a pipeline do modelo que fará o
sumário, e devolve a versão resumida desse texto. """
sentences = nltk.sent_tokenize(full_text)
extractive_sentences = _model_pipeline({"sentences": sentences}, strategy="ratio", strategy_args=ratio)
extractive_text = " ".join(extractive_sentences[0])
return extractive_text
# Criando a função que gera a tradução do texto a partir do ChatGPT:
@st.cache_data
def get_translation(summarized_text):
""" Função que recebe um resumo a ser traduzido,
juntamente à pipeline do modelo que fará a tradução,
e devolve a versão traduzida do texto. """
LLM = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.25, model_name="gpt-3.5-turbo")
messages = [SystemMessage(content='Você é um especialista em traduções de texto do inglês para português'), HumanMessage(content=summarized_text)]
translation = LLM(messages)
return translation.content
# Criando a função que corrige os erros de transcrição do texto a partir do ChatGPT:
@st.cache_data
def get_correction(transcription):
""" Função que recebe a transcrição (em PT-BR) e corrige eventuais
erros do whisper - por exemplo, a troca de palavras que, embora
semelhantes na pronúncia, são totalmente absurdas no contexto. """
correction_prompt = PromptTemplate.from_template(template = 'Você é um especialista em correção de textos. Você receberá uma transcrição de um áudio e deverá substituir as palavras transcritas erroneamente, considerando apenas a pronúncia, por palavras adequadas que façam sentido no contexto do texto. Além disso, corrija questões gramáticais para facilitar a compreensão. Corrija a seguinte transcrição: {transcription}')
LLM = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.1, model_name="gpt-3.5-turbo")
corrector = LLMChain(prompt=correction_prompt, llm=LLM)
with get_openai_callback() as cb:
corrected_transcription = corrector.run({"transcription": transcription})
total_t = cb.total_tokens
prompt_t = cb.prompt_tokens
completion_t = cb.completion_tokens
total_cost = cb.total_cost
return corrected_transcription
# Criando o chatbot
def alan_videos(vectorstore):
""" Função que inicializa e configura um LLM da OpenAI
e retorna um chatbot configurado pronto para uso. """
memory = ConversationBufferWindowMemory(memory_key='chat_history', return_messages=True, k=3)
LLM = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.25, model_name="gpt-3.5-turbo")
retriever=vectorstore.as_retriever()
chatbot = ConversationalRetrievalChain.from_llm(llm=LLM, retriever=retriever, memory=memory)
return chatbot
# Criando um modelo de chat:
def chat(pergunta):
""" Função que processa uma pergunta utilizando o chatbot
configurado (alan_videos) e retorna sua resposta. """
with get_openai_callback() as cb:
resposta = st.session_state.alanvideos.invoke({"question": pergunta})
total_t = cb.total_tokens
prompt_t = cb.prompt_tokens
completion_t = cb.completion_tokens
total_cost = cb.total_cost
return resposta['answer']
# Criando a função para customização com CSS:
def local_css(file_name):
""" Função que carrega um arquivo CSS local e aplica o estilo
ao Streamlit app, personalizando a interface do usuário. """
with open(file_name, "r") as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
# Configurando a aba do site:
icon = Image.open("Traçado laranja #f1863d.png")
st.set_page_config(page_title="AlanVideos", page_icon=icon, layout="wide", initial_sidebar_state="auto")
# Configurando o site:
def main():
local_css("style.css")
header = st.container()
model = st.container()
model_1 = st.container()
model_2 = st.container()
# Configurando a barra lateral:
with st.sidebar:
with st.form("data_collection"):
link = st.text_area(label="Coloque o link do seu vídeo do YouTube:", height=25, placeholder="Digite seu link...")
language = st.selectbox('Qual a linguagem do seu vídeo?', ('Português (pt)', 'Inglês (en)'))
translate = st.selectbox('Você deseja que o seu vídeo seja traduzido para Português?', ('Não', 'Sim'))
compression_rate = st.slider(label="Selecione o percentual de sumarização:", min_value=0.05, max_value=0.50, value=0.25, step=0.05)
submitted = st.form_submit_button("Enviar")
# Verificando se o link do vídeo é válido:
if link != '':
if re.match((r'(https?://)?(www\.)?''(youtube|youtu|youtube-nocookie)\.(com|be)/''(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})'), link):
st.success('Dados coletados!', icon="✅")
else:
st.error('Link inválido. Por favor, insira um link do YouTube.', icon="🚨")
if "alanvideos" not in st.session_state:
st.session_state.alanvideos = None
# Configurando o cabeçalho:
with header:
st.title(":orange[Alan]Videos")
st.subheader("Olá, usuário! Este é um projeto que utiliza técnicas de inteligência artificial para simplificar e acelerar a compreensão de conteúdo audiovisual. Por favor, preencha o formulário ao lado para que possamos responder as suas dúvidas a respeito de um vídeo do YouTube! :)", divider = "orange")
# Configurando os modelos:
with model:
if submitted and re.match((r'(https?://)?(www\.)?''(youtube|youtu|youtube-nocookie)\.(com|be)/''(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})'), link):
with st.spinner("Carregando modelos..."):
nltk.download("punkt")
extractive = load_extractive()
with st.spinner("Transcrevendo texto..."):
transcription = get_transcriptions(link)
texto_cru = ''.join(list(transcription.values()))
# Preparando o modelo de sumarização após o envio de um link:
with model_1:
st.header("Texto Sumarizado:")
with st.spinner("Carregando sumarização..."):
summary = get_summarization(extractive, texto_cru, compression_rate)
# Usando o GPT para corrigir a transcrição, caso o vídeo seja em PT-BR:
if language == 'Português (pt)':
summary = get_correction(summary)
# Usando o GPT para traduzir a transcrição para PT-BR, caso o usuário prefira:
elif language == 'Inglês (en)' and translate == 'Sim':
with st.spinner("Traduzindo sumarização..."):
summary = get_translation(summary)
st.session_state['summary'] = summary
# Preparando o AlanVideos após o envio de um link:
with model_2:
st.header("Resposta das perguntas:")
with st.spinner("Preparando AlanVideos..."):
# Separando o texto em chunks:
chunks = get_chunks(texto_cru)
# Armazenando os pedaços de texto em embeddings:
vectorstore = get_vectorstore(chunks)
# Criando o chatbot:
st.session_state.alanvideos = alan_videos(vectorstore)
# Apresentando os resultados:
with model:
# Resultado do modelo de sumarização:
with model_1:
if 'summary' in st.session_state:
st.write(st.session_state['summary'])
# Resultado do AlanVideos:
if 'summary' in st.session_state:
if "chat_history" not in st.session_state:
st.session_state.chat_history = [AIMessage(content=f"Olá, meu nome é Alan, e estou aqui para responder perguntas sobre o vídeo. Como posso ajudar?")]
pergunta = st.chat_input('Faça uma pergunta sobre o vídeo: ')
if pergunta is not None and pergunta != "":
resposta = chat(pergunta)
st.session_state.chat_history.append(HumanMessage(content=pergunta))
st.session_state.chat_history.append(AIMessage(content=resposta))
for message in st.session_state.chat_history:
if isinstance(message, AIMessage):
with st.chat_message("AI"):
st.write(message.content)
elif isinstance(message, HumanMessage):
with st.chat_message("Human"):
st.write(message.content)
main()