import whisper import os import random import openai from openai import OpenAI import yt_dlp from pytube import YouTube, extract import pandas as pd import plotly_express as px import nltk import plotly.graph_objects as go from optimum.onnxruntime import ORTModelForSequenceClassification from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM, AutoModelForTokenClassification from sentence_transformers import SentenceTransformer, CrossEncoder, util import streamlit as st import en_core_web_lg import validators import re import itertools import numpy as np from bs4 import BeautifulSoup import base64, time from annotated_text import annotated_text import pickle, math import torch from pydub import AudioSegment from langchain.docstore.document import Document from langchain.embeddings import HuggingFaceEmbeddings, HuggingFaceBgeEmbeddings, HuggingFaceInstructEmbeddings from langchain.vectorstores import FAISS from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.chat_models import ChatOpenAI from langchain.chains import QAGenerationChain from langchain.callbacks import StreamlitCallbackHandler from langchain.agents import OpenAIFunctionsAgent, AgentExecutor from langchain.agents.agent_toolkits import create_retriever_tool from langchain.agents.openai_functions_agent.agent_token_buffer_memory import ( AgentTokenBufferMemory, ) from langchain.prompts import MessagesPlaceholder from langchain.prompts.chat import ( ChatPromptTemplate, SystemMessagePromptTemplate, AIMessagePromptTemplate, HumanMessagePromptTemplate, ) from langchain.schema import ( AIMessage, HumanMessage, SystemMessage ) from langchain.prompts import PromptTemplate from langsmith import Client client = Client() openai_audio = OpenAI() nltk.download('punkt') from nltk import sent_tokenize OPEN_AI_KEY = os.environ.get('OPEN_AI_KEY') time_str = time.strftime("%d%m%Y-%H%M%S") HTML_WRAPPER = """
{}
""" ###################### Functions ####################################################################################### #load all required models and cache @st.cache_resource def load_models(): '''Load and cache all the models to be used''' q_model = ORTModelForSequenceClassification.from_pretrained("nickmuchi/quantized-optimum-finbert-tone") ner_model = AutoModelForTokenClassification.from_pretrained("xlm-roberta-large-finetuned-conll03-english") q_tokenizer = AutoTokenizer.from_pretrained("nickmuchi/quantized-optimum-finbert-tone") ner_tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-large-finetuned-conll03-english") sent_pipe = pipeline("text-classification",model=q_model, tokenizer=q_tokenizer) sum_pipe = pipeline("summarization",model="philschmid/flan-t5-base-samsum",clean_up_tokenization_spaces=True) ner_pipe = pipeline("ner", model=ner_model, tokenizer=ner_tokenizer, grouped_entities=True) cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') #cross-encoder/ms-marco-MiniLM-L-12-v2 sbert = SentenceTransformer('all-MiniLM-L6-v2') return sent_pipe, sum_pipe, ner_pipe, cross_encoder, sbert @st.cache_data def load_asr_model(model_name): '''Load the open source whisper model in cases where the API is not working''' model = whisper.load_model(model_name) return model @st.cache_resource def get_spacy(): nlp = en_core_web_lg.load() return nlp nlp = get_spacy() sent_pipe, sum_pipe, ner_pipe, cross_encoder, sbert = load_models() @st.cache_data def get_yt_audio(url): '''Get YT video from given URL link''' yt = YouTube(url) title = yt.title # Get the first available audio stream and download it audio_stream = yt.streams.filter(only_audio=True).first().download() return audio_stream, title @st.cache_data def get_yt_audio_dl(url): '''Back up for when pytube is down''' temp_audio_file = os.path.join('output', 'audio') ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'outtmpl': temp_audio_file, 'quiet': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) title = info.get('title', None) ydl.download([url]) #with open(temp_audio_file+'.mp3', 'rb') as file: audio_file = os.path.join('output', 'audio.mp3') return audio_file, title @st.cache_data def load_whisper_api(audio): '''Transcribe YT audio to text using Open AI API''' file = open(audio, "rb") transcript = openai_audio.audio.transcriptions.create(model="whisper-1", file=file,response_format="text") return transcript @st.cache_data def transcribe_yt_video(link, py_tube=True): '''Transcribe YouTube video''' if py_tube: audio_file, title = get_yt_audio(link) print(f'audio_file:{audio_file}') st.session_state['audio'] = audio_file print(f"audio_file_session_state:{st.session_state['audio'] }") #Get size of audio file audio_size = round(os.path.getsize(st.session_state['audio'])/(1024*1024),1) #Check if file is > 24mb, if not then use Whisper API if audio_size <= 25: st.info("`Transcribing YT audio...`") #Use whisper API results = load_whisper_api(st.session_state['audio']) else: st.warning('File size larger than 24mb, applying chunking and transcription',icon="⚠️") song = AudioSegment.from_file(st.session_state['audio'], format='mp3') # PyDub handles time in milliseconds twenty_minutes = 20 * 60 * 1000 chunks = song[::twenty_minutes] transcriptions = [] video_id = extract.video_id(link) print(video_id) for i, chunk in enumerate(chunks): chunk.export(f'output/chunk_{i}_{video_id}.mp4', format='mp3') transcriptions.append(load_whisper_api(f'output/chunk_{i}_{video_id}.mp3')) results = ','.join(transcriptions) print(results) else: audio_file, title = get_yt_audio_dl(link) print(f'audio_file:{audio_file}') st.session_state['audio'] = audio_file print(f"audio_file_session_state:{st.session_state['audio'] }") #Get size of audio file audio_size = round(os.path.getsize(st.session_state['audio'])/(1024*1024),1) #Check if file is > 24mb, if not then use Whisper API if audio_size <= 25: st.info("`Transcribing YT audio...`") #Use whisper API results = load_whisper_api(st.session_state['audio']) else: st.warning('File size larger than 24mb, applying chunking and transcription',icon="⚠️") song = AudioSegment.from_file(st.session_state['audio'], format='mp3') # PyDub handles time in milliseconds twenty_minutes = 20 * 60 * 1000 chunks = song[::twenty_minutes] transcriptions = [] video_id = extract.video_id(link) for i, chunk in enumerate(chunks): chunk.export(f'output/chunk_{i}_{video_id}.mp3', format='mp3') transcriptions.append(load_whisper_api(f'output/chunk_{i}_{video_id}.mp3')) results = ','.join(transcriptions) st.info("`YT Video transcription process complete...`") return results, title @st.cache_data def inference(link, upload): '''Convert Youtube video or Audio upload to text''' try: if validators.url(link): st.info("`Downloading YT audio...`") results, title = transcribe_yt_video(link) return results, title elif _upload: #Get size of audio file audio_size = round(os.path.getsize(_upload)/(1024*1024),1) #Check if file is > 24mb, if not then use Whisper API if audio_size <= 25: st.info("`Transcribing uploaded audio...`") #Use whisper API results = load_whisper_api(_upload)['text'] else: st.write('File size larger than 24mb, applying chunking and transcription') song = AudioSegment.from_file(_upload) # PyDub handles time in milliseconds twenty_minutes = 20 * 60 * 1000 chunks = song[::twenty_minutes] transcriptions = [] st.info("`Transcribing uploaded audio...`") for i, chunk in enumerate(chunks): chunk.export(f'output/chunk_{i}.mp4', format='mp4') transcriptions.append(load_whisper_api(f'output/chunk_{i}.mp4')['text']) results = ','.join(transcriptions) st.info("`Uploaded audio transcription process complete...`") return results, "Transcribed Earnings Audio" except Exception as e: st.error(f'''PyTube Error: {e}, Using yt_dlp module, might take longer than expected''',icon="🚨") results, title = transcribe_yt_video(link, py_tube=False) # results = _asr_model.transcribe(st.session_state['audio'], task='transcribe', language='en') return results, title @st.cache_resource def send_feedback(run_id, score): client.create_feedback(run_id, "user_score", score=score) @st.cache_data def clean_text(text): '''Clean all text after inference''' text = text.encode("ascii", "ignore").decode() # unicode text = re.sub(r"https*\S+", " ", text) # url text = re.sub(r"@\S+", " ", text) # mentions text = re.sub(r"#\S+", " ", text) # hastags text = re.sub(r"\s{2,}", " ", text) # over spaces return text @st.cache_data def chunk_long_text(text,threshold,window_size=3,stride=2): '''Preprocess text and chunk for sentiment analysis''' #Convert cleaned text into sentences sentences = sent_tokenize(text) out = [] #Limit the length of each sentence to a threshold for chunk in sentences: if len(chunk.split()) < threshold: out.append(chunk) else: words = chunk.split() num = int(len(words)/threshold) for i in range(0,num*threshold+1,threshold): out.append(' '.join(words[i:threshold+i])) passages = [] #Combine sentences into a window of size window_size for paragraph in [out]: for start_idx in range(0, len(paragraph), stride): end_idx = min(start_idx+window_size, len(paragraph)) passages.append(" ".join(paragraph[start_idx:end_idx])) return passages @st.cache_data def sentiment_pipe(earnings_text): '''Determine the sentiment of the text''' earnings_sentences = chunk_long_text(earnings_text,150,1,1) earnings_sentiment = sent_pipe(earnings_sentences) return earnings_sentiment, earnings_sentences @st.cache_data def chunk_and_preprocess_text(text, model_name= 'philschmid/flan-t5-base-samsum'): '''Chunk and preprocess text for summarization''' tokenizer = AutoTokenizer.from_pretrained(model_name) sentences = sent_tokenize(text) # initialize length = 0 chunk = "" chunks = [] count = -1 for sentence in sentences: count += 1 combined_length = len(tokenizer.tokenize(sentence)) + length # add the no. of sentence tokens to the length counter if combined_length <= tokenizer.max_len_single_sentence: # if it doesn't exceed chunk += sentence + " " # add the sentence to the chunk length = combined_length # update the length counter # if it is the last sentence if count == len(sentences) - 1: chunks.append(chunk) # save the chunk else: chunks.append(chunk) # save the chunk # reset length = 0 chunk = "" # take care of the overflow sentence chunk += sentence + " " length = len(tokenizer.tokenize(sentence)) return chunks @st.cache_data def summarize_text(text_to_summarize,max_len,min_len): '''Summarize text with HF model''' summarized_text = sum_pipe(text_to_summarize, max_length=max_len, min_length=min_len, do_sample=False, early_stopping=True, num_beams=4) summarized_text = ' '.join([summ['summary_text'] for summ in summarized_text]) return summarized_text @st.cache_data def get_all_entities_per_sentence(text): doc = nlp(''.join(text)) sentences = list(doc.sents) entities_all_sentences = [] for sentence in sentences: entities_this_sentence = [] # SPACY ENTITIES for entity in sentence.ents: entities_this_sentence.append(str(entity)) # XLM ENTITIES entities_xlm = [entity["word"] for entity in ner_pipe(str(sentence))] for entity in entities_xlm: entities_this_sentence.append(str(entity)) entities_all_sentences.append(entities_this_sentence) return entities_all_sentences @st.cache_data def get_all_entities(text): all_entities_per_sentence = get_all_entities_per_sentence(text) return list(itertools.chain.from_iterable(all_entities_per_sentence)) @st.cache_data def get_and_compare_entities(article_content,summary_output): all_entities_per_sentence = get_all_entities_per_sentence(article_content) entities_article = list(itertools.chain.from_iterable(all_entities_per_sentence)) all_entities_per_sentence = get_all_entities_per_sentence(summary_output) entities_summary = list(itertools.chain.from_iterable(all_entities_per_sentence)) matched_entities = [] unmatched_entities = [] for entity in entities_summary: if any(entity.lower() in substring_entity.lower() for substring_entity in entities_article): matched_entities.append(entity) elif any( np.inner(sbert.encode(entity, show_progress_bar=False), sbert.encode(art_entity, show_progress_bar=False)) > 0.9 for art_entity in entities_article): matched_entities.append(entity) else: unmatched_entities.append(entity) matched_entities = list(dict.fromkeys(matched_entities)) unmatched_entities = list(dict.fromkeys(unmatched_entities)) matched_entities_to_remove = [] unmatched_entities_to_remove = [] for entity in matched_entities: for substring_entity in matched_entities: if entity != substring_entity and entity.lower() in substring_entity.lower(): matched_entities_to_remove.append(entity) for entity in unmatched_entities: for substring_entity in unmatched_entities: if entity != substring_entity and entity.lower() in substring_entity.lower(): unmatched_entities_to_remove.append(entity) matched_entities_to_remove = list(dict.fromkeys(matched_entities_to_remove)) unmatched_entities_to_remove = list(dict.fromkeys(unmatched_entities_to_remove)) for entity in matched_entities_to_remove: matched_entities.remove(entity) for entity in unmatched_entities_to_remove: unmatched_entities.remove(entity) return matched_entities, unmatched_entities @st.cache_data def highlight_entities(article_content,summary_output): markdown_start_red = "" markdown_start_green = "" markdown_end = "" matched_entities, unmatched_entities = get_and_compare_entities(article_content,summary_output) for entity in matched_entities: summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_green + entity + markdown_end,summary_output) for entity in unmatched_entities: summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_red + entity + markdown_end,summary_output) print("") print("") soup = BeautifulSoup(summary_output, features="html.parser") return HTML_WRAPPER.format(soup) def summary_downloader(raw_text): '''Download the summary generated''' b64 = base64.b64encode(raw_text.encode()).decode() new_filename = "new_text_file_{}_.txt".format(time_str) st.markdown("#### Download Summary as a File ###") href = f'Click to Download!!' st.markdown(href,unsafe_allow_html=True) @st.cache_data def generate_eval(raw_text, N, chunk): # Generate N questions from context of chunk chars # IN: text, N questions, chunk size to draw question from in the doc # OUT: eval set as JSON list # raw_text = ','.join(raw_text) update = st.empty() ques_update = st.empty() update.info("`Generating sample questions ...`") n = len(raw_text) starting_indices = [random.randint(0, n-chunk) for _ in range(N)] sub_sequences = [raw_text[i:i+chunk] for i in starting_indices] chain = QAGenerationChain.from_llm(ChatOpenAI(temperature=0)) eval_set = [] for i, b in enumerate(sub_sequences): try: qa = chain.run(b) eval_set.append(qa) ques_update.info(f"Creating Question: {i+1}") except Exception as e: print(e) st.warning(f'Error in generating Question: {i+1}...', icon="⚠️") continue eval_set_full = list(itertools.chain.from_iterable(eval_set)) update.empty() ques_update.empty() return eval_set_full @st.cache_resource def create_prompt_and_llm(): '''Create prompt''' llm = ChatOpenAI(temperature=0, streaming=True, model="gpt-4o") message = SystemMessage( content=( "You are a helpful chatbot who is tasked with answering questions acuurately about earnings call transcript provided. " "Unless otherwise explicitly stated, it is probably fair to assume that questions are about the earnings call transcript. " "If there is any ambiguity, you probably assume they are about that." "Do not use any information not provided in the earnings context and remember you are a to speak like a finance expert." "If you don't know the answer, just say 'There is no relevant answer in the given earnings call transcript'" "don't try to make up an answer" ) ) prompt = OpenAIFunctionsAgent.create_prompt( system_message=message, extra_prompt_messages=[MessagesPlaceholder(variable_name="history")], ) return prompt, llm @st.cache_resource def gen_embeddings(embedding_model): '''Generate embeddings for given model''' if 'hkunlp' in embedding_model: embeddings = HuggingFaceInstructEmbeddings(model_name=embedding_model, query_instruction='Represent the Financial question for retrieving supporting paragraphs: ', embed_instruction='Represent the Financial paragraph for retrieval: ') elif 'mpnet' in embedding_model: embeddings = HuggingFaceEmbeddings(model_name=embedding_model) elif 'FlagEmbedding' in embedding_model: encode_kwargs = {'normalize_embeddings': True} embeddings = HuggingFaceBgeEmbeddings(model_name=embedding_model, encode_kwargs = encode_kwargs ) return embeddings @st.cache_data def create_vectorstore(corpus, title, embedding_model, chunk_size=1000, overlap=50): '''Process text for Semantic Search''' text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size,chunk_overlap=overlap) texts = text_splitter.split_text(corpus) embeddings = gen_embeddings(embedding_model) vectorstore = FAISS.from_texts(texts, embeddings, metadatas=[{"source": i} for i in range(len(texts))]) return vectorstore @st.cache_resource def create_memory_and_agent(_docsearch): '''Embed text and generate semantic search scores''' #create vectorstore vectorstore = _docsearch.as_retriever(search_kwargs={"k": 4}) #create retriever tool tool = create_retriever_tool( vectorstore, "earnings_call_search", "Searches and returns documents using the earnings context provided as a source, relevant to the user input question.", ) tools = [tool] prompt,llm = create_prompt_and_llm() agent = OpenAIFunctionsAgent(llm=llm, tools=tools, prompt=prompt) agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, return_intermediate_steps=True, ) memory = AgentTokenBufferMemory(llm=llm) return memory, agent_executor @st.cache_data def gen_sentiment(text): '''Generate sentiment of given text''' return sent_pipe(text)[0]['label'] @st.cache_data def gen_annotated_text(df): '''Generate annotated text''' tag_list=[] for row in df.itertuples(): label = row[2] text = row[1] if label == 'Positive': tag_list.append((text,label,'#8fce00')) elif label == 'Negative': tag_list.append((text,label,'#f44336')) else: tag_list.append((text,label,'#000000')) return tag_list def display_df_as_table(model,top_k,score='score'): '''Display the df with text and scores as a table''' df = pd.DataFrame([(hit[score],passages[hit['corpus_id']]) for hit in model[0:top_k]],columns=['Score','Text']) df['Score'] = round(df['Score'],2) return df def make_spans(text,results): results_list = [] for i in range(len(results)): results_list.append(results[i]['label']) facts_spans = [] facts_spans = list(zip(sent_tokenizer(text),results_list)) return facts_spans ##Fiscal Sentiment by Sentence def fin_ext(text): results = remote_clx(sent_tokenizer(text)) return make_spans(text,results) ## Knowledge Graphs code def get_article(url): article = Article(url) article.download() article.parse() return article