import os import re import streamlit as st import transformers from transformers import pipeline from transformers import AutoTokenizer import nltk from PIL import Image import torch import ffmpeg import speech_recognition as sr from pytube import YouTube import string import whisper from moviepy.editor import AudioFileClip from langchain.text_splitter import CharacterTextSplitter from langchain_core.messages import AIMessage, HumanMessage, SystemMessage from langchain.embeddings import OpenAIEmbeddings from langchain.vectorstores import FAISS from langchain.chat_models import ChatOpenAI from langchain.memory import ConversationBufferWindowMemory from langchain.chains import ConversationalRetrievalChain from langchain.callbacks import get_openai_callback from langchain import PromptTemplate from langchain import LLMChain from pytube import YouTube import soundfile openai_api_key = os.getenv("openai_api_key") os.environ["OPENAI_API_KEY"] = openai_api_key # Criando a função que corta o texto em chunks: def get_chunks(texto_transcrito): """Função que recebe uma string (um texto transcrito) e devolve uma lista de strings, em que cada string ("chunk") contém um tamanho específico, com overlap entre os pedaços. """ text_splitter=CharacterTextSplitter(separator='.', chunk_size=1000, chunk_overlap=100, length_function=len) chunks = text_splitter.split_text(texto_transcrito) return chunks # Criando a função de vectorstore para transformar os chunks de texto em embeddings: def get_vectorstore(chunks): """Função que recebe uma lista de chunks de texto e os converte em embeddings (representações numéricas) usando um modelo de embeddings pré-treinado. """ embeddings = OpenAIEmbeddings() vectorstore = FAISS.from_texts(texts=chunks, embedding=embeddings) return vectorstore # Criando a função para converter o vídeo para o formato adequado: @st.cache_data def convert_mp3_to_wav(mp3_file, wav_file): """Função que converte um arquivo de vídeo no formato MP3 para um arquivo de áudio no formato WAV a partir das seguintes entradas: o caminho do arquivo de vídeo no formato MP3 que se deseja converter; o caminho onde o arquivo de áudio WAV resultante será salvo. Nenhuma saída explícita é retornada.""" video = AudioFileClip(mp3_file) video.write_audiofile(wav_file) # Criando a função que gera a transcrição: @st.cache_data def get_transcriptions(url): """ Função que recebe um link de um vídeo no YouTube e devolve um dicionário contendo o título do vídeo (key) e a transcrição de seu áudio (value). """ dicionario = {} # Baixando o áudio: youtube_content = YouTube(url) title = youtube_content.title title = re.sub('[^A-z0-9 -]', '', title).replace(" ", " ") audio_streams = youtube_content.streams.filter(only_audio=True) audio_streams[0].download(filename=f"{title}.mp3") # Convertendo para Wav: cwd = os.getcwd() mp3_file = os.path.join(cwd, f"{title}.mp3") wav_file = os.path.join(cwd, f"{title}.wav") convert_mp3_to_wav(mp3_file, wav_file) # Inicializando o reconhecedor de fala: r = sr.Recognizer() # Carregando o áudio gravado pelo Whisper em um objeto de áudio: with sr.AudioFile(wav_file) as source: audio = r.record(source) # Transcrevendo: texto_transcrito = r.recognize_whisper(audio) # Adicionando ao dicionario: dicionario[title] = texto_transcrito return dicionario # Criando a função que carrega o sumarizador extrativo (para português e inglês) via HuggingFace: @st.cache_resource def load_extractive(): """ Função sem valores de entrada que carrega o modelo de sumarização extrativa via HuggingFace. """ return pipeline("summarization", model="NotXia/longformer-bio-ext-summ", tokenizer=AutoTokenizer.from_pretrained("NotXia/longformer-bio-ext-summ"), trust_remote_code=True) # Criando a função que gera a sumarização extrativa: @st.cache_data def get_summarization(_model_pipeline, full_text, ratio): """ Função que recebe um texto completo a ser resumido, juntamente à taxa desejada de sumarização e a pipeline do modelo que fará o sumário, e devolve a versão resumida desse texto. """ sentences = nltk.sent_tokenize(full_text) extractive_sentences = _model_pipeline({"sentences": sentences}, strategy="ratio", strategy_args=ratio) extractive_text = " ".join(extractive_sentences[0]) return extractive_text # Criando a função que gera a tradução do texto a partir do ChatGPT: @st.cache_data def get_translation(summarized_text): """ Função que recebe um resumo a ser traduzido, juntamente à pipeline do modelo que fará a tradução, e devolve a versão traduzida do texto. """ LLM = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.25, model_name="gpt-3.5-turbo") messages = [SystemMessage(content='Você é um especialista em traduções de texto do inglês para português'), HumanMessage(content=summarized_text)] translation = LLM(messages) return translation.content # Criando a função que corrige os erros de transcrição do texto a partir do ChatGPT: @st.cache_data def get_correction(transcription): """ Função que recebe a transcrição (em PT-BR) e corrige eventuais erros do whisper - por exemplo, a troca de palavras que, embora semelhantes na pronúncia, são totalmente absurdas no contexto. """ correction_prompt = PromptTemplate.from_template(template = 'Você é um especialista em correção de textos. Você receberá uma transcrição de um áudio e deverá substituir as palavras transcritas erroneamente, considerando apenas a pronúncia, por palavras adequadas que façam sentido no contexto do texto. Além disso, corrija questões gramáticais para facilitar a compreensão. Corrija a seguinte transcrição: {transcription}') LLM = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.1, model_name="gpt-3.5-turbo") corrector = LLMChain(prompt=correction_prompt, llm=LLM) with get_openai_callback() as cb: corrected_transcription = corrector.run({"transcription": transcription}) total_t = cb.total_tokens prompt_t = cb.prompt_tokens completion_t = cb.completion_tokens total_cost = cb.total_cost return corrected_transcription # Criando o chatbot def alan_videos(vectorstore): """ Função que inicializa e configura um LLM da OpenAI e retorna um chatbot configurado pronto para uso. """ memory = ConversationBufferWindowMemory(memory_key='chat_history', return_messages=True, k=3) LLM = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.25, model_name="gpt-3.5-turbo") retriever=vectorstore.as_retriever() chatbot = ConversationalRetrievalChain.from_llm(llm=LLM, retriever=retriever, memory=memory) return chatbot # Criando um modelo de chat: def chat(pergunta): """ Função que processa uma pergunta utilizando o chatbot configurado (alan_videos) e retorna sua resposta. """ with get_openai_callback() as cb: resposta = st.session_state.alanvideos.invoke({"question": pergunta}) total_t = cb.total_tokens prompt_t = cb.prompt_tokens completion_t = cb.completion_tokens total_cost = cb.total_cost return resposta['answer'] # Criando a função para customização com CSS: def local_css(file_name): """ Função que carrega um arquivo CSS local e aplica o estilo ao Streamlit app, personalizando a interface do usuário. """ with open(file_name, "r") as f: st.markdown(f"", unsafe_allow_html=True) # Configurando a aba do site: icon = Image.open("Traçado laranja #f1863d.png") st.set_page_config(page_title="AlanVideos", page_icon=icon, layout="wide", initial_sidebar_state="auto") # Configurando o site: def main(): local_css("style.css") header = st.container() model = st.container() model_1 = st.container() model_2 = st.container() # Configurando a barra lateral: with st.sidebar: with st.form("data_collection"): link = st.text_area(label="Coloque o link do seu vídeo do YouTube:", height=25, placeholder="Digite seu link...") language = st.selectbox('Qual a linguagem do seu vídeo?', ('Português (pt)', 'Inglês (en)')) translate = st.selectbox('Você deseja que o seu vídeo seja traduzido para Português?', ('Não', 'Sim')) compression_rate = st.slider(label="Selecione o percentual de sumarização:", min_value=0.05, max_value=0.50, value=0.25, step=0.05) submitted = st.form_submit_button("Enviar") # Verificando se o link do vídeo é válido: if link != '': if re.match((r'(https?://)?(www\.)?''(youtube|youtu|youtube-nocookie)\.(com|be)/''(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})'), link): st.success('Dados coletados!', icon="✅") else: st.error('Link inválido. Por favor, insira um link do YouTube.', icon="🚨") if "alanvideos" not in st.session_state: st.session_state.alanvideos = None # Configurando o cabeçalho: with header: st.title(":orange[Alan]Videos") st.subheader("Olá, usuário! Este é um projeto que utiliza técnicas de inteligência artificial para simplificar e acelerar a compreensão de conteúdo audiovisual. Por favor, preencha o formulário ao lado para que possamos responder as suas dúvidas a respeito de um vídeo do YouTube! :)", divider = "orange") # Configurando os modelos: with model: if submitted and re.match((r'(https?://)?(www\.)?''(youtube|youtu|youtube-nocookie)\.(com|be)/''(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})'), link): with st.spinner("Carregando modelos..."): nltk.download("punkt") extractive = load_extractive() with st.spinner("Transcrevendo texto..."): transcription = get_transcriptions(link) texto_cru = ''.join(list(transcription.values())) # Preparando o modelo de sumarização após o envio de um link: with model_1: st.header("Texto Sumarizado:") with st.spinner("Carregando sumarização..."): summary = get_summarization(extractive, texto_cru, compression_rate) # Usando o GPT para corrigir a transcrição, caso o vídeo seja em PT-BR: if language == 'Português (pt)': summary = get_correction(summary) # Usando o GPT para traduzir a transcrição para PT-BR, caso o usuário prefira: elif language == 'Inglês (en)' and translate == 'Sim': with st.spinner("Traduzindo sumarização..."): summary = get_translation(summary) st.session_state['summary'] = summary # Preparando o AlanVideos após o envio de um link: with model_2: st.header("Resposta das perguntas:") with st.spinner("Preparando AlanVideos..."): # Separando o texto em chunks: chunks = get_chunks(texto_cru) # Armazenando os pedaços de texto em embeddings: vectorstore = get_vectorstore(chunks) # Criando o chatbot: st.session_state.alanvideos = alan_videos(vectorstore) # Apresentando os resultados: with model: # Resultado do modelo de sumarização: with model_1: if 'summary' in st.session_state: st.write(st.session_state['summary']) # Resultado do AlanVideos: if 'summary' in st.session_state: if "chat_history" not in st.session_state: st.session_state.chat_history = [AIMessage(content=f"Olá, meu nome é Alan, e estou aqui para responder perguntas sobre o vídeo. Como posso ajudar?")] pergunta = st.chat_input('Faça uma pergunta sobre o vídeo: ') if pergunta is not None and pergunta != "": resposta = chat(pergunta) st.session_state.chat_history.append(HumanMessage(content=pergunta)) st.session_state.chat_history.append(AIMessage(content=resposta)) for message in st.session_state.chat_history: if isinstance(message, AIMessage): with st.chat_message("AI"): st.write(message.content) elif isinstance(message, HumanMessage): with st.chat_message("Human"): st.write(message.content) main()