Spaces:
Sleeping
Sleeping
import os | |
import re | |
import streamlit as st | |
import transformers | |
from transformers import pipeline | |
from transformers import AutoTokenizer | |
import nltk | |
from PIL import Image | |
import torch | |
import ffmpeg | |
import speech_recognition as sr | |
from pytube import YouTube | |
import string | |
import whisper | |
from moviepy.editor import AudioFileClip | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.chat_models import ChatOpenAI | |
from langchain.memory import ConversationBufferWindowMemory | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain.callbacks import get_openai_callback | |
from langchain import PromptTemplate | |
from langchain import LLMChain | |
openai_api_key = os.getenv("openai_api_key") | |
# Criando a função que corta o texto em chunks: | |
def get_chunks(texto_transcrito): | |
"""Função que recebe uma string (um texto transcrito) e devolve | |
uma lista de strings, em que cada string ("chunk") contém um | |
tamanho específico, com overlap entre os pedaços. """ | |
text_splitter=CharacterTextSplitter(separator='.', chunk_size=1000, chunk_overlap=100, length_function=len) | |
chunks = text_splitter.split_text(texto_transcrito) | |
return chunks | |
# Criando a função de vectorstore para transformar os chunks de texto em embeddings: | |
def get_vectorstore(chunks): | |
"""Função que recebe uma lista de chunks de texto e | |
os converte em embeddings (representações numéricas) | |
usando um modelo de embeddings pré-treinado. """ | |
embeddings = OpenAIEmbeddings() | |
vectorstore = FAISS.from_texts(texts=chunks, embedding=embeddings) | |
return vectorstore | |
# Criando a função para converter o vídeo para o formato adequado: | |
def convert_mp4_to_wav(mp4_file, wav_file): | |
"""Função que converte um arquivo de vídeo no formato MP4 para um arquivo de | |
áudio no formato WAV a partir das seguintes entradas: o caminho do arquivo | |
de vídeo no formato MP4 que se deseja converter; o caminho onde o arquivo | |
de áudio WAV resultante será salvo. Nenhuma saída explícita é retornada.""" | |
video = AudioFileClip(mp4_file) | |
video.write_audiofile(wav_file) | |
# Criando a função que gera a transcrição: | |
def get_transcriptions(url): | |
""" Função que recebe um link de um vídeo no YouTube e | |
devolve um dicionário contendo o título do vídeo (key) | |
e a transcrição de seu áudio (value). """ | |
dicionario = {} | |
# Baixando o áudio: | |
youtube_content = YouTube(url) | |
audio_streams = youtube_content.streams.filter(only_audio=True) | |
audio_streams[0].download() | |
title = youtube_content.title | |
# Convertendo para Wav: | |
mp4_file = '/content/'+''.join(char for char in title if char not in string.punctuation.replace('/', '').replace('-', '').replace('!', '').replace('(', '').replace(')', ''))+'.mp4' | |
wav_file = '/content/'+''.join(char for char in title if char not in string.punctuation.replace('/', '').replace('-', '').replace('!', '').replace('(', '').replace(')', ''))+'.wav' | |
convert_mp4_to_wav(mp4_file, wav_file) | |
# Inicializando o reconhecedor de fala: | |
r = sr.Recognizer() | |
# Carregando o áudio gravado pelo Whisper em um objeto de áudio: | |
with sr.AudioFile(wav_file) as source: | |
audio = r.record(source) | |
# Transcrevendo: | |
texto_transcrito = r.recognize_whisper(audio) | |
# Adicionando ao dicionario: | |
dicionario[title] = texto_transcrito | |
return dicionario | |
# Criando a função que carrega o sumarizador extrativo (para português e inglês) via HuggingFace: | |
def load_extractive(): | |
""" Função sem valores de entrada que carrega o modelo | |
de sumarização extrativa via HuggingFace. """ | |
return pipeline("summarization", model="NotXia/longformer-bio-ext-summ", tokenizer=AutoTokenizer.from_pretrained("NotXia/longformer-bio-ext-summ"), trust_remote_code=True) | |
# Criando a função que gera a sumarização extrativa: | |
def get_summarization(_model_pipeline, full_text, ratio): | |
""" Função que recebe um texto completo a ser resumido, juntamente | |
à taxa desejada de sumarização e a pipeline do modelo que fará o | |
sumário, e devolve a versão resumida desse texto. """ | |
sentences = nltk.sent_tokenize(full_text) | |
extractive_sentences = _model_pipeline({"sentences": sentences}, strategy="ratio", strategy_args=ratio) | |
extractive_text = " ".join(extractive_sentences[0]) | |
return extractive_text | |
# Criando a função que gera a tradução do texto a partir do ChatGPT: | |
def get_translation(summarized_text): | |
""" Função que recebe um resumo a ser traduzido, | |
juntamente à pipeline do modelo que fará a tradução, | |
e devolve a versão traduzida do texto. """ | |
LLM = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.25, model_name="gpt-3.5-turbo") | |
messages = [SystemMessage(content='Você é um especialista em traduções de texto do inglês para português'), HumanMessage(content=summarized_text)] | |
translation = LLM(messages) | |
return translation.content | |
# Criando a função que corrige os erros de transcrição do texto a partir do ChatGPT: | |
def get_correction(transcription): | |
""" Função que recebe a transcrição (em PT-BR) e corrige eventuais | |
erros do whisper - por exemplo, a troca de palavras que, embora | |
semelhantes na pronúncia, são totalmente absurdas no contexto. """ | |
correction_prompt = PromptTemplate.from_template(template = 'Você é um especialista em correção de textos. Você receberá uma transcrição de um áudio e deverá substituir as palavras transcritas erroneamente, considerando apenas a pronúncia, por palavras adequadas que façam sentido no contexto do texto. Além disso, corrija questões gramáticais para facilitar a compreensão. Corrija a seguinte transcrição: {transcription}') | |
LLM = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.1, model_name="gpt-3.5-turbo") | |
corrector = LLMChain(prompt=correction_prompt, llm=LLM) | |
with get_openai_callback() as cb: | |
corrected_transcription = corrector.run({"transcription": transcription}) | |
total_t = cb.total_tokens | |
prompt_t = cb.prompt_tokens | |
completion_t = cb.completion_tokens | |
total_cost = cb.total_cost | |
return corrected_transcription | |
# Criando o chatbot | |
def alan_videos(vectorstore): | |
""" Função que inicializa e configura um LLM da OpenAI | |
e retorna um chatbot configurado pronto para uso. """ | |
memory = ConversationBufferWindowMemory(memory_key='chat_history', return_messages=True, k=3) | |
LLM = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.25, model_name="gpt-3.5-turbo") | |
retriever=vectorstore.as_retriever() | |
chatbot = ConversationalRetrievalChain.from_llm(llm=LLM, retriever=retriever, memory=memory) | |
return chatbot | |
# Criando um modelo de chat: | |
def chat(pergunta): | |
""" Função que processa uma pergunta utilizando o chatbot | |
configurado (alan_videos) e retorna sua resposta. """ | |
with get_openai_callback() as cb: | |
resposta = st.session_state.alanvideos.invoke({"question": pergunta}) | |
total_t = cb.total_tokens | |
prompt_t = cb.prompt_tokens | |
completion_t = cb.completion_tokens | |
total_cost = cb.total_cost | |
return resposta['answer'] | |
# Criando a função para customização com CSS: | |
def local_css(file_name): | |
""" Função que carrega um arquivo CSS local e aplica o estilo | |
ao Streamlit app, personalizando a interface do usuário. """ | |
with open(file_name, "r") as f: | |
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) | |
# Configurando a aba do site: | |
icon = Image.open("Traçado laranja #f1863d.png") | |
st.set_page_config(page_title="AlanVideos", page_icon=icon, layout="wide", initial_sidebar_state="auto") | |
# Configurando o site: | |
def main(): | |
local_css("style.css") | |
header = st.container() | |
model = st.container() | |
model_1 = st.container() | |
model_2 = st.container() | |
# Configurando a barra lateral: | |
with st.sidebar: | |
with st.form("data_collection"): | |
link = st.text_area(label="Coloque o link do seu vídeo do YouTube:", height=25, placeholder="Digite seu link...") | |
language = st.selectbox('Qual a linguagem do seu vídeo?', ('Português (pt)', 'Inglês (en)')) | |
translate = st.selectbox('Você deseja que o seu vídeo seja traduzido para Português?', ('Não', 'Sim')) | |
compression_rate = st.slider(label="Selecione o percentual de sumarização:", min_value=0.05, max_value=0.50, value=0.25, step=0.05) | |
submitted = st.form_submit_button("Enviar") | |
# Verificando se o link do vídeo é válido: | |
if link != '': | |
if re.match((r'(https?://)?(www\.)?''(youtube|youtu|youtube-nocookie)\.(com|be)/''(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})'), link): | |
st.success('Dados coletados!', icon="✅") | |
else: | |
st.error('Link inválido. Por favor, insira um link do YouTube.', icon="🚨") | |
if "alanvideos" not in st.session_state: | |
st.session_state.alanvideos = None | |
# Configurando o cabeçalho: | |
with header: | |
st.title(":orange[Alan]Videos") | |
st.subheader("Olá, usuário! Este é um projeto que utiliza técnicas de inteligência artificial para simplificar e acelerar a compreensão de conteúdo audiovisual. Por favor, preencha o formulário ao lado para que possamos responder as suas dúvidas a respeito de um vídeo do YouTube! :)", divider = "orange") | |
# Configurando os modelos: | |
with model: | |
if submitted and re.match((r'(https?://)?(www\.)?''(youtube|youtu|youtube-nocookie)\.(com|be)/''(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})'), link): | |
with st.spinner("Carregando modelos..."): | |
nltk.download("punkt") | |
extractive = load_extractive() | |
with st.spinner("Transcrevendo texto..."): | |
transcription = get_transcriptions(link) | |
texto_cru = ''.join(list(transcription.values())) | |
# Preparando o modelo de sumarização após o envio de um link: | |
with model_1: | |
st.header("Texto Sumarizado:") | |
with st.spinner("Carregando sumarização..."): | |
summary = get_summarization(extractive, texto_cru, compression_rate) | |
# Usando o GPT para corrigir a transcrição, caso o vídeo seja em PT-BR: | |
if language == 'Português (pt)': | |
summary = get_correction(summary) | |
# Usando o GPT para traduzir a transcrição para PT-BR, caso o usuário prefira: | |
elif language == 'Inglês (en)' and translate == 'Sim': | |
with st.spinner("Traduzindo sumarização..."): | |
summary = get_translation(summary) | |
st.session_state['summary'] = summary | |
# Preparando o AlanVideos após o envio de um link: | |
with model_2: | |
st.header("Resposta das perguntas:") | |
with st.spinner("Preparando AlanVideos..."): | |
# Separando o texto em chunks: | |
chunks = get_chunks(texto_cru) | |
# Armazenando os pedaços de texto em embeddings: | |
vectorstore = get_vectorstore(chunks) | |
# Criando o chatbot: | |
st.session_state.alanvideos = alan_videos(vectorstore) | |
# Apresentando os resultados: | |
with model: | |
# Resultado do modelo de sumarização: | |
with model_1: | |
if 'summary' in st.session_state: | |
st.write(st.session_state['summary']) | |
# Resultado do AlanVideos: | |
if 'summary' in st.session_state: | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [AIMessage(content=f"Olá, meu nome é Alan, e estou aqui para responder perguntas sobre o vídeo. Como posso ajudar?")] | |
pergunta = st.chat_input('Faça uma pergunta sobre o vídeo: ') | |
if pergunta is not None and pergunta != "": | |
resposta = chat(pergunta) | |
st.session_state.chat_history.append(HumanMessage(content=pergunta)) | |
st.session_state.chat_history.append(AIMessage(content=resposta)) | |
for message in st.session_state.chat_history: | |
if isinstance(message, AIMessage): | |
with st.chat_message("AI"): | |
st.write(message.content) | |
elif isinstance(message, HumanMessage): | |
with st.chat_message("Human"): | |
st.write(message.content) | |
main() |