import whisper import os import random import openai import yt_dlp from pytube import YouTube, extract import pandas as pd import plotly_express as px import nltk import plotly.graph_objects as go from optimum.onnxruntime import ORTModelForSequenceClassification from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM, AutoModelForTokenClassification from sentence_transformers import SentenceTransformer, CrossEncoder, util import streamlit as st import en_core_web_lg import validators import re import itertools import numpy as np from bs4 import BeautifulSoup import base64, time from annotated_text import annotated_text import pickle, math import wikipedia from pyvis.network import Network import torch from pydub import AudioSegment from langchain.docstore.document import Document from langchain.embeddings import HuggingFaceEmbeddings, HuggingFaceBgeEmbeddings, HuggingFaceInstructEmbeddings from langchain.vectorstores import FAISS from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.chat_models import ChatOpenAI from langchain.chains import QAGenerationChain from langchain.callbacks import StreamlitCallbackHandler from langchain.agents import OpenAIFunctionsAgent, AgentExecutor from langchain.agents.agent_toolkits import create_retriever_tool from langchain.agents.openai_functions_agent.agent_token_buffer_memory import ( AgentTokenBufferMemory, ) from langchain.prompts import MessagesPlaceholder from langchain.prompts.chat import ( ChatPromptTemplate, SystemMessagePromptTemplate, AIMessagePromptTemplate, HumanMessagePromptTemplate, ) from langchain.schema import ( AIMessage, HumanMessage, SystemMessage ) from langchain.prompts import PromptTemplate from langsmith import Client client = Client() nltk.download('punkt') from nltk import sent_tokenize OPEN_AI_KEY = os.environ.get('OPEN_AI_KEY') time_str = time.strftime("%d%m%Y-%H%M%S") HTML_WRAPPER = """
{}
""" ###################### Functions ####################################################################################### #load all required models and cache @st.cache_resource def load_models(): '''Load and cache all the models to be used''' q_model = ORTModelForSequenceClassification.from_pretrained("nickmuchi/quantized-optimum-finbert-tone") ner_model = AutoModelForTokenClassification.from_pretrained("xlm-roberta-large-finetuned-conll03-english") q_tokenizer = AutoTokenizer.from_pretrained("nickmuchi/quantized-optimum-finbert-tone") ner_tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-large-finetuned-conll03-english") sent_pipe = pipeline("text-classification",model=q_model, tokenizer=q_tokenizer) sum_pipe = pipeline("summarization",model="philschmid/flan-t5-base-samsum",clean_up_tokenization_spaces=True) ner_pipe = pipeline("ner", model=ner_model, tokenizer=ner_tokenizer, grouped_entities=True) cross_encoder = CrossEncoder('cross-encoder/mmarco-mMiniLMv2-L12-H384-v1') #cross-encoder/ms-marco-MiniLM-L-12-v2 sbert = SentenceTransformer('all-MiniLM-L6-v2') return sent_pipe, sum_pipe, ner_pipe, cross_encoder, sbert @st.cache_data def load_asr_model(model_name): '''Load the open source whisper model in cases where the API is not working''' model = whisper.load_model(model_name) return model @st.cache_resource def get_spacy(): nlp = en_core_web_lg.load() return nlp nlp = get_spacy() sent_pipe, sum_pipe, ner_pipe, cross_encoder, sbert = load_models() @st.cache_data def get_yt_audio(url): '''Get YT video from given URL link''' yt = YouTube(url) title = yt.title # Get the first available audio stream and download it audio_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download() return audio_stream, title @st.cache_data def get_yt_audio_dl(url): '''Back up for when pytube is down''' temp_audio_file = os.path.join('output', 'audio') ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'outtmpl': temp_audio_file, 'quiet': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) title = info.get('title', None) ydl.download([url]) #with open(temp_audio_file+'.mp3', 'rb') as file: audio_file = os.path.join('output', 'audio.mp3') return audio_file, title @st.cache_data def load_whisper_api(audio): '''Transcribe YT audio to text using Open AI API''' file = open(audio, "rb") transcript = openai.Audio.translate("whisper-1", file) return transcript @st.cache_data def transcribe_yt_video(link, py_tube=True): '''Transcribe YouTube video''' if py_tube: audio_file, title = get_yt_audio(link) print(f'audio_file:{audio_file}') st.session_state['audio'] = audio_file print(f"audio_file_session_state:{st.session_state['audio'] }") #Get size of audio file audio_size = round(os.path.getsize(st.session_state['audio'])/(1024*1024),1) #Check if file is > 24mb, if not then use Whisper API if audio_size <= 25: st.info("`Transcribing YT audio...`") #Use whisper API results = load_whisper_api(st.session_state['audio'])['text'] else: st.warning('File size larger than 24mb, applying chunking and transcription',icon="⚠️") song = AudioSegment.from_file(st.session_state['audio'], format='mp4') # PyDub handles time in milliseconds twenty_minutes = 20 * 60 * 1000 chunks = song[::twenty_minutes] transcriptions = [] video_id = extract.video_id(link) for i, chunk in enumerate(chunks): chunk.export(f'output/chunk_{i}_{video_id}.mp4', format='mp4') transcriptions.append(load_whisper_api(f'output/chunk_{i}_{video_id}.mp4')['text']) results = ','.join(transcriptions) else: audio_file, title = get_yt_audio_dl(link) print(f'audio_file:{audio_file}') st.session_state['audio'] = audio_file print(f"audio_file_session_state:{st.session_state['audio'] }") #Get size of audio file audio_size = round(os.path.getsize(st.session_state['audio'])/(1024*1024),1) #Check if file is > 24mb, if not then use Whisper API if audio_size <= 25: st.info("`Transcribing YT audio...`") #Use whisper API results = load_whisper_api(st.session_state['audio'])['text'] else: st.warning('File size larger than 24mb, applying chunking and transcription',icon="⚠️") song = AudioSegment.from_file(st.session_state['audio'], format='mp3') # PyDub handles time in milliseconds twenty_minutes = 20 * 60 * 1000 chunks = song[::twenty_minutes] transcriptions = [] video_id = extract.video_id(link) for i, chunk in enumerate(chunks): chunk.export(f'output/chunk_{i}_{video_id}.mp3', format='mp3') transcriptions.append(load_whisper_api(f'output/chunk_{i}_{video_id}.mp3')['text']) results = ','.join(transcriptions) st.info("`YT Video transcription process complete...`") return results, title @st.cache_data def inference(link, upload): '''Convert Youtube video or Audio upload to text''' try: if validators.url(link): st.info("`Downloading YT audio...`") results, title = transcribe_yt_video(link) return results, title elif _upload: #Get size of audio file audio_size = round(os.path.getsize(_upload)/(1024*1024),1) #Check if file is > 24mb, if not then use Whisper API if audio_size <= 25: st.info("`Transcribing uploaded audio...`") #Use whisper API results = load_whisper_api(_upload)['text'] else: st.write('File size larger than 24mb, applying chunking and transcription') song = AudioSegment.from_file(_upload) # PyDub handles time in milliseconds twenty_minutes = 20 * 60 * 1000 chunks = song[::twenty_minutes] transcriptions = [] st.info("`Transcribing uploaded audio...`") for i, chunk in enumerate(chunks): chunk.export(f'output/chunk_{i}.mp4', format='mp4') transcriptions.append(load_whisper_api(f'output/chunk_{i}.mp4')['text']) results = ','.join(transcriptions) st.info("`Uploaded audio transcription process complete...`") return results, "Transcribed Earnings Audio" except Exception as e: st.error(f'''PyTube Error: {e}, Using yt_dlp module, might take longer than expected''',icon="🚨") results, title = transcribe_yt_video(link, py_tube=False) # results = _asr_model.transcribe(st.session_state['audio'], task='transcribe', language='en') return results, title @st.cache_resource def send_feedback(run_id, score): client.create_feedback(run_id, "user_score", score=score) @st.cache_data def clean_text(text): '''Clean all text after inference''' text = text.encode("ascii", "ignore").decode() # unicode text = re.sub(r"https*\S+", " ", text) # url text = re.sub(r"@\S+", " ", text) # mentions text = re.sub(r"#\S+", " ", text) # hastags text = re.sub(r"\s{2,}", " ", text) # over spaces return text @st.cache_data def chunk_long_text(text,threshold,window_size=3,stride=2): '''Preprocess text and chunk for sentiment analysis''' #Convert cleaned text into sentences sentences = sent_tokenize(text) out = [] #Limit the length of each sentence to a threshold for chunk in sentences: if len(chunk.split()) < threshold: out.append(chunk) else: words = chunk.split() num = int(len(words)/threshold) for i in range(0,num*threshold+1,threshold): out.append(' '.join(words[i:threshold+i])) passages = [] #Combine sentences into a window of size window_size for paragraph in [out]: for start_idx in range(0, len(paragraph), stride): end_idx = min(start_idx+window_size, len(paragraph)) passages.append(" ".join(paragraph[start_idx:end_idx])) return passages @st.cache_data def sentiment_pipe(earnings_text): '''Determine the sentiment of the text''' earnings_sentences = chunk_long_text(earnings_text,150,1,1) earnings_sentiment = sent_pipe(earnings_sentences) return earnings_sentiment, earnings_sentences @st.cache_data def chunk_and_preprocess_text(text, model_name= 'philschmid/flan-t5-base-samsum'): '''Chunk and preprocess text for summarization''' tokenizer = AutoTokenizer.from_pretrained(model_name) sentences = sent_tokenize(text) # initialize length = 0 chunk = "" chunks = [] count = -1 for sentence in sentences: count += 1 combined_length = len(tokenizer.tokenize(sentence)) + length # add the no. of sentence tokens to the length counter if combined_length <= tokenizer.max_len_single_sentence: # if it doesn't exceed chunk += sentence + " " # add the sentence to the chunk length = combined_length # update the length counter # if it is the last sentence if count == len(sentences) - 1: chunks.append(chunk) # save the chunk else: chunks.append(chunk) # save the chunk # reset length = 0 chunk = "" # take care of the overflow sentence chunk += sentence + " " length = len(tokenizer.tokenize(sentence)) return chunks @st.cache_data def summarize_text(text_to_summarize,max_len,min_len): '''Summarize text with HF model''' summarized_text = sum_pipe(text_to_summarize, max_length=max_len, min_length=min_len, do_sample=False, early_stopping=True, num_beams=4) summarized_text = ' '.join([summ['summary_text'] for summ in summarized_text]) return summarized_text @st.cache_data def get_all_entities_per_sentence(text): doc = nlp(''.join(text)) sentences = list(doc.sents) entities_all_sentences = [] for sentence in sentences: entities_this_sentence = [] # SPACY ENTITIES for entity in sentence.ents: entities_this_sentence.append(str(entity)) # XLM ENTITIES entities_xlm = [entity["word"] for entity in ner_pipe(str(sentence))] for entity in entities_xlm: entities_this_sentence.append(str(entity)) entities_all_sentences.append(entities_this_sentence) return entities_all_sentences @st.cache_data def get_all_entities(text): all_entities_per_sentence = get_all_entities_per_sentence(text) return list(itertools.chain.from_iterable(all_entities_per_sentence)) @st.cache_data def get_and_compare_entities(article_content,summary_output): all_entities_per_sentence = get_all_entities_per_sentence(article_content) entities_article = list(itertools.chain.from_iterable(all_entities_per_sentence)) all_entities_per_sentence = get_all_entities_per_sentence(summary_output) entities_summary = list(itertools.chain.from_iterable(all_entities_per_sentence)) matched_entities = [] unmatched_entities = [] for entity in entities_summary: if any(entity.lower() in substring_entity.lower() for substring_entity in entities_article): matched_entities.append(entity) elif any( np.inner(sbert.encode(entity, show_progress_bar=False), sbert.encode(art_entity, show_progress_bar=False)) > 0.9 for art_entity in entities_article): matched_entities.append(entity) else: unmatched_entities.append(entity) matched_entities = list(dict.fromkeys(matched_entities)) unmatched_entities = list(dict.fromkeys(unmatched_entities)) matched_entities_to_remove = [] unmatched_entities_to_remove = [] for entity in matched_entities: for substring_entity in matched_entities: if entity != substring_entity and entity.lower() in substring_entity.lower(): matched_entities_to_remove.append(entity) for entity in unmatched_entities: for substring_entity in unmatched_entities: if entity != substring_entity and entity.lower() in substring_entity.lower(): unmatched_entities_to_remove.append(entity) matched_entities_to_remove = list(dict.fromkeys(matched_entities_to_remove)) unmatched_entities_to_remove = list(dict.fromkeys(unmatched_entities_to_remove)) for entity in matched_entities_to_remove: matched_entities.remove(entity) for entity in unmatched_entities_to_remove: unmatched_entities.remove(entity) return matched_entities, unmatched_entities @st.cache_data def highlight_entities(article_content,summary_output): markdown_start_red = "" markdown_start_green = "" markdown_end = "" matched_entities, unmatched_entities = get_and_compare_entities(article_content,summary_output) for entity in matched_entities: summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_green + entity + markdown_end,summary_output) for entity in unmatched_entities: summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_red + entity + markdown_end,summary_output) print("") print("") soup = BeautifulSoup(summary_output, features="html.parser") return HTML_WRAPPER.format(soup) def summary_downloader(raw_text): '''Download the summary generated''' b64 = base64.b64encode(raw_text.encode()).decode() new_filename = "new_text_file_{}_.txt".format(time_str) st.markdown("#### Download Summary as a File ###") href = f'Click to Download!!' st.markdown(href,unsafe_allow_html=True) @st.cache_data def generate_eval(raw_text, N, chunk): # Generate N questions from context of chunk chars # IN: text, N questions, chunk size to draw question from in the doc # OUT: eval set as JSON list # raw_text = ','.join(raw_text) update = st.empty() ques_update = st.empty() update.info("`Generating sample questions ...`") n = len(raw_text) starting_indices = [random.randint(0, n-chunk) for _ in range(N)] sub_sequences = [raw_text[i:i+chunk] for i in starting_indices] chain = QAGenerationChain.from_llm(ChatOpenAI(temperature=0)) eval_set = [] for i, b in enumerate(sub_sequences): try: qa = chain.run(b) eval_set.append(qa) ques_update.info(f"Creating Question: {i+1}") except Exception as e: print(e) st.warning(f'Error in generating Question: {i+1}...', icon="⚠️") continue eval_set_full = list(itertools.chain.from_iterable(eval_set)) update.empty() ques_update.empty() return eval_set_full @st.cache_resource def create_prompt_and_llm(): '''Create prompt''' llm = ChatOpenAI(temperature=0, streaming=True, model="gpt-4") message = SystemMessage( content=( "You are a helpful chatbot who is tasked with answering questions acuurately about earnings call transcript provided. " "Unless otherwise explicitly stated, it is probably fair to assume that questions are about the earnings call transcript. " "If there is any ambiguity, you probably assume they are about that." "Do not use any information not provided in the earnings context and remember you are a to speak like a finance expert." "If you don't know the answer, just say 'There is no relevant answer in the given earnings call transcript'" "don't try to make up an answer" ) ) prompt = OpenAIFunctionsAgent.create_prompt( system_message=message, extra_prompt_messages=[MessagesPlaceholder(variable_name="history")], ) return prompt, llm @st.cache_resource def gen_embeddings(embedding_model): '''Generate embeddings for given model''' if 'hkunlp' in embedding_model: embeddings = HuggingFaceInstructEmbeddings(model_name=embedding_model, query_instruction='Represent the Financial question for retrieving supporting paragraphs: ', embed_instruction='Represent the Financial paragraph for retrieval: ') elif 'mpnet' in embedding_model: embeddings = HuggingFaceEmbeddings(model_name=embedding_model) elif 'FlagEmbedding' in embedding_model: encode_kwargs = {'normalize_embeddings': True} embeddings = HuggingFaceBgeEmbeddings(model_name=embedding_model, encode_kwargs = encode_kwargs ) return embeddings @st.cache_data def create_vectorstore(corpus, title, embedding_model, chunk_size=1000, overlap=50): '''Process text for Semantic Search''' text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size,chunk_overlap=overlap) texts = text_splitter.split_text(corpus) embeddings = gen_embeddings(embedding_model) vectorstore = FAISS.from_texts(texts, embeddings, metadatas=[{"source": i} for i in range(len(texts))]) return vectorstore @st.cache_data def create_memory_and_agent(_docsearch): '''Embed text and generate semantic search scores''' #create vectorstore vectorstore = _docsearch.as_retriever(search_kwargs={"k": 4}) #create retriever tool tool = create_retriever_tool( vectorstore, "earnings_call_search", "Searches and returns documents using the earnings context provided as a source, relevant to the user input question.", ) tools = [tool] prompt,llm = create_prompt_and_llm() agent = OpenAIFunctionsAgent(llm=llm, tools=tools, prompt=prompt) agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, return_intermediate_steps=True, ) memory = AgentTokenBufferMemory(llm=llm) return memory, agent_executor @st.cache_data def gen_sentiment(text): '''Generate sentiment of given text''' return sent_pipe(text)[0]['label'] @st.cache_data def gen_annotated_text(df): '''Generate annotated text''' tag_list=[] for row in df.itertuples(): label = row[2] text = row[1] if label == 'Positive': tag_list.append((text,label,'#8fce00')) elif label == 'Negative': tag_list.append((text,label,'#f44336')) else: tag_list.append((text,label,'#000000')) return tag_list def display_df_as_table(model,top_k,score='score'): '''Display the df with text and scores as a table''' df = pd.DataFrame([(hit[score],passages[hit['corpus_id']]) for hit in model[0:top_k]],columns=['Score','Text']) df['Score'] = round(df['Score'],2) return df def make_spans(text,results): results_list = [] for i in range(len(results)): results_list.append(results[i]['label']) facts_spans = [] facts_spans = list(zip(sent_tokenizer(text),results_list)) return facts_spans ##Fiscal Sentiment by Sentence def fin_ext(text): results = remote_clx(sent_tokenizer(text)) return make_spans(text,results) ## Knowledge Graphs code def get_article(url): article = Article(url) article.download() article.parse() return article