import whisper
import os
import random
import openai
from openai import OpenAI
import yt_dlp
from pytube import YouTube, extract
import pandas as pd
import plotly_express as px
import nltk
import plotly.graph_objects as go
from optimum.onnxruntime import ORTModelForSequenceClassification
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM, AutoModelForTokenClassification
from sentence_transformers import SentenceTransformer, CrossEncoder, util
import streamlit as st
import en_core_web_lg
import validators
import re
import itertools
import numpy as np
from bs4 import BeautifulSoup
import base64, time
from annotated_text import annotated_text
import pickle, math
import torch
from pydub import AudioSegment
from langchain.docstore.document import Document
from langchain.embeddings import HuggingFaceEmbeddings, HuggingFaceBgeEmbeddings, HuggingFaceInstructEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chat_models import ChatOpenAI
from langchain.chains import QAGenerationChain
from langchain.callbacks import StreamlitCallbackHandler
from langchain.agents import OpenAIFunctionsAgent, AgentExecutor
from langchain.agents.agent_toolkits import create_retriever_tool
from langchain.agents.openai_functions_agent.agent_token_buffer_memory import (
AgentTokenBufferMemory,
)
from langchain.prompts import MessagesPlaceholder
from langchain.prompts.chat import (
ChatPromptTemplate,
SystemMessagePromptTemplate,
AIMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from langchain.schema import (
AIMessage,
HumanMessage,
SystemMessage
)
from langchain.prompts import PromptTemplate
from langsmith import Client
client = Client()
openai_audio = OpenAI()
nltk.download('punkt')
from nltk import sent_tokenize
OPEN_AI_KEY = os.environ.get('OPEN_AI_KEY')
time_str = time.strftime("%d%m%Y-%H%M%S")
HTML_WRAPPER = """
{}
"""
###################### Functions #######################################################################################
#load all required models and cache
@st.cache_resource
def load_models():
'''Load and cache all the models to be used'''
q_model = ORTModelForSequenceClassification.from_pretrained("nickmuchi/quantized-optimum-finbert-tone")
ner_model = AutoModelForTokenClassification.from_pretrained("xlm-roberta-large-finetuned-conll03-english")
q_tokenizer = AutoTokenizer.from_pretrained("nickmuchi/quantized-optimum-finbert-tone")
ner_tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-large-finetuned-conll03-english")
sent_pipe = pipeline("text-classification",model=q_model, tokenizer=q_tokenizer)
sum_pipe = pipeline("summarization",model="philschmid/flan-t5-base-samsum",clean_up_tokenization_spaces=True)
ner_pipe = pipeline("ner", model=ner_model, tokenizer=ner_tokenizer, grouped_entities=True)
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') #cross-encoder/ms-marco-MiniLM-L-12-v2
sbert = SentenceTransformer('all-MiniLM-L6-v2')
return sent_pipe, sum_pipe, ner_pipe, cross_encoder, sbert
@st.cache_data
def load_asr_model(model_name):
'''Load the open source whisper model in cases where the API is not working'''
model = whisper.load_model(model_name)
return model
@st.cache_resource
def get_spacy():
nlp = en_core_web_lg.load()
return nlp
nlp = get_spacy()
sent_pipe, sum_pipe, ner_pipe, cross_encoder, sbert = load_models()
@st.cache_data
def get_yt_audio(url):
'''Get YT video from given URL link'''
yt = YouTube(url)
title = yt.title
# Get the first available audio stream and download it
audio_stream = yt.streams.filter(only_audio=True).first().download()
return audio_stream, title
@st.cache_data
def get_yt_audio_dl(url):
'''Back up for when pytube is down'''
temp_audio_file = os.path.join('output', 'audio')
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': temp_audio_file,
'quiet': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
title = info.get('title', None)
ydl.download([url])
#with open(temp_audio_file+'.mp3', 'rb') as file:
audio_file = os.path.join('output', 'audio.mp3')
return audio_file, title
@st.cache_data
def load_whisper_api(audio):
'''Transcribe YT audio to text using Open AI API'''
file = open(audio, "rb")
transcript = openai_audio.audio.transcriptions.create(model="whisper-1", file=file,response_format="text")
return transcript
@st.cache_data
def transcribe_yt_video(link, py_tube=True):
'''Transcribe YouTube video'''
if py_tube:
audio_file, title = get_yt_audio(link)
print(f'audio_file:{audio_file}')
st.session_state['audio'] = audio_file
print(f"audio_file_session_state:{st.session_state['audio'] }")
#Get size of audio file
audio_size = round(os.path.getsize(st.session_state['audio'])/(1024*1024),1)
#Check if file is > 24mb, if not then use Whisper API
if audio_size <= 25:
st.info("`Transcribing YT audio...`")
#Use whisper API
results = load_whisper_api(st.session_state['audio'])
else:
st.warning('File size larger than 24mb, applying chunking and transcription',icon="⚠️")
song = AudioSegment.from_file(st.session_state['audio'], format='mp3')
# PyDub handles time in milliseconds
twenty_minutes = 20 * 60 * 1000
chunks = song[::twenty_minutes]
transcriptions = []
video_id = extract.video_id(link)
print(video_id)
for i, chunk in enumerate(chunks):
chunk.export(f'output/chunk_{i}_{video_id}.mp4', format='mp3')
transcriptions.append(load_whisper_api(f'output/chunk_{i}_{video_id}.mp3'))
results = ','.join(transcriptions)
print(results)
else:
audio_file, title = get_yt_audio_dl(link)
print(f'audio_file:{audio_file}')
st.session_state['audio'] = audio_file
print(f"audio_file_session_state:{st.session_state['audio'] }")
#Get size of audio file
audio_size = round(os.path.getsize(st.session_state['audio'])/(1024*1024),1)
#Check if file is > 24mb, if not then use Whisper API
if audio_size <= 25:
st.info("`Transcribing YT audio...`")
#Use whisper API
results = load_whisper_api(st.session_state['audio'])
else:
st.warning('File size larger than 24mb, applying chunking and transcription',icon="⚠️")
song = AudioSegment.from_file(st.session_state['audio'], format='mp3')
# PyDub handles time in milliseconds
twenty_minutes = 20 * 60 * 1000
chunks = song[::twenty_minutes]
transcriptions = []
video_id = extract.video_id(link)
for i, chunk in enumerate(chunks):
chunk.export(f'output/chunk_{i}_{video_id}.mp3', format='mp3')
transcriptions.append(load_whisper_api(f'output/chunk_{i}_{video_id}.mp3'))
results = ','.join(transcriptions)
st.info("`YT Video transcription process complete...`")
return results, title
@st.cache_data
def inference(link, upload):
'''Convert Youtube video or Audio upload to text'''
try:
if validators.url(link):
st.info("`Downloading YT audio...`")
results, title = transcribe_yt_video(link)
return results, title
elif _upload:
#Get size of audio file
audio_size = round(os.path.getsize(_upload)/(1024*1024),1)
#Check if file is > 24mb, if not then use Whisper API
if audio_size <= 25:
st.info("`Transcribing uploaded audio...`")
#Use whisper API
results = load_whisper_api(_upload)['text']
else:
st.write('File size larger than 24mb, applying chunking and transcription')
song = AudioSegment.from_file(_upload)
# PyDub handles time in milliseconds
twenty_minutes = 20 * 60 * 1000
chunks = song[::twenty_minutes]
transcriptions = []
st.info("`Transcribing uploaded audio...`")
for i, chunk in enumerate(chunks):
chunk.export(f'output/chunk_{i}.mp4', format='mp4')
transcriptions.append(load_whisper_api(f'output/chunk_{i}.mp4')['text'])
results = ','.join(transcriptions)
st.info("`Uploaded audio transcription process complete...`")
return results, "Transcribed Earnings Audio"
except Exception as e:
st.error(f'''PyTube Error: {e},
Using yt_dlp module, might take longer than expected''',icon="🚨")
results, title = transcribe_yt_video(link, py_tube=False)
# results = _asr_model.transcribe(st.session_state['audio'], task='transcribe', language='en')
return results, title
@st.cache_resource
def send_feedback(run_id, score):
client.create_feedback(run_id, "user_score", score=score)
@st.cache_data
def clean_text(text):
'''Clean all text after inference'''
text = text.encode("ascii", "ignore").decode() # unicode
text = re.sub(r"https*\S+", " ", text) # url
text = re.sub(r"@\S+", " ", text) # mentions
text = re.sub(r"#\S+", " ", text) # hastags
text = re.sub(r"\s{2,}", " ", text) # over spaces
return text
@st.cache_data
def chunk_long_text(text,threshold,window_size=3,stride=2):
'''Preprocess text and chunk for sentiment analysis'''
#Convert cleaned text into sentences
sentences = sent_tokenize(text)
out = []
#Limit the length of each sentence to a threshold
for chunk in sentences:
if len(chunk.split()) < threshold:
out.append(chunk)
else:
words = chunk.split()
num = int(len(words)/threshold)
for i in range(0,num*threshold+1,threshold):
out.append(' '.join(words[i:threshold+i]))
passages = []
#Combine sentences into a window of size window_size
for paragraph in [out]:
for start_idx in range(0, len(paragraph), stride):
end_idx = min(start_idx+window_size, len(paragraph))
passages.append(" ".join(paragraph[start_idx:end_idx]))
return passages
@st.cache_data
def sentiment_pipe(earnings_text):
'''Determine the sentiment of the text'''
earnings_sentences = chunk_long_text(earnings_text,150,1,1)
earnings_sentiment = sent_pipe(earnings_sentences)
return earnings_sentiment, earnings_sentences
@st.cache_data
def chunk_and_preprocess_text(text, model_name= 'philschmid/flan-t5-base-samsum'):
'''Chunk and preprocess text for summarization'''
tokenizer = AutoTokenizer.from_pretrained(model_name)
sentences = sent_tokenize(text)
# initialize
length = 0
chunk = ""
chunks = []
count = -1
for sentence in sentences:
count += 1
combined_length = len(tokenizer.tokenize(sentence)) + length # add the no. of sentence tokens to the length counter
if combined_length <= tokenizer.max_len_single_sentence: # if it doesn't exceed
chunk += sentence + " " # add the sentence to the chunk
length = combined_length # update the length counter
# if it is the last sentence
if count == len(sentences) - 1:
chunks.append(chunk) # save the chunk
else:
chunks.append(chunk) # save the chunk
# reset
length = 0
chunk = ""
# take care of the overflow sentence
chunk += sentence + " "
length = len(tokenizer.tokenize(sentence))
return chunks
@st.cache_data
def summarize_text(text_to_summarize,max_len,min_len):
'''Summarize text with HF model'''
summarized_text = sum_pipe(text_to_summarize,
max_length=max_len,
min_length=min_len,
do_sample=False,
early_stopping=True,
num_beams=4)
summarized_text = ' '.join([summ['summary_text'] for summ in summarized_text])
return summarized_text
@st.cache_data
def get_all_entities_per_sentence(text):
doc = nlp(''.join(text))
sentences = list(doc.sents)
entities_all_sentences = []
for sentence in sentences:
entities_this_sentence = []
# SPACY ENTITIES
for entity in sentence.ents:
entities_this_sentence.append(str(entity))
# XLM ENTITIES
entities_xlm = [entity["word"] for entity in ner_pipe(str(sentence))]
for entity in entities_xlm:
entities_this_sentence.append(str(entity))
entities_all_sentences.append(entities_this_sentence)
return entities_all_sentences
@st.cache_data
def get_all_entities(text):
all_entities_per_sentence = get_all_entities_per_sentence(text)
return list(itertools.chain.from_iterable(all_entities_per_sentence))
@st.cache_data
def get_and_compare_entities(article_content,summary_output):
all_entities_per_sentence = get_all_entities_per_sentence(article_content)
entities_article = list(itertools.chain.from_iterable(all_entities_per_sentence))
all_entities_per_sentence = get_all_entities_per_sentence(summary_output)
entities_summary = list(itertools.chain.from_iterable(all_entities_per_sentence))
matched_entities = []
unmatched_entities = []
for entity in entities_summary:
if any(entity.lower() in substring_entity.lower() for substring_entity in entities_article):
matched_entities.append(entity)
elif any(
np.inner(sbert.encode(entity, show_progress_bar=False),
sbert.encode(art_entity, show_progress_bar=False)) > 0.9 for
art_entity in entities_article):
matched_entities.append(entity)
else:
unmatched_entities.append(entity)
matched_entities = list(dict.fromkeys(matched_entities))
unmatched_entities = list(dict.fromkeys(unmatched_entities))
matched_entities_to_remove = []
unmatched_entities_to_remove = []
for entity in matched_entities:
for substring_entity in matched_entities:
if entity != substring_entity and entity.lower() in substring_entity.lower():
matched_entities_to_remove.append(entity)
for entity in unmatched_entities:
for substring_entity in unmatched_entities:
if entity != substring_entity and entity.lower() in substring_entity.lower():
unmatched_entities_to_remove.append(entity)
matched_entities_to_remove = list(dict.fromkeys(matched_entities_to_remove))
unmatched_entities_to_remove = list(dict.fromkeys(unmatched_entities_to_remove))
for entity in matched_entities_to_remove:
matched_entities.remove(entity)
for entity in unmatched_entities_to_remove:
unmatched_entities.remove(entity)
return matched_entities, unmatched_entities
@st.cache_data
def highlight_entities(article_content,summary_output):
markdown_start_red = ""
markdown_start_green = ""
markdown_end = ""
matched_entities, unmatched_entities = get_and_compare_entities(article_content,summary_output)
for entity in matched_entities:
summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_green + entity + markdown_end,summary_output)
for entity in unmatched_entities:
summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_red + entity + markdown_end,summary_output)
print("")
print("")
soup = BeautifulSoup(summary_output, features="html.parser")
return HTML_WRAPPER.format(soup)
def summary_downloader(raw_text):
'''Download the summary generated'''
b64 = base64.b64encode(raw_text.encode()).decode()
new_filename = "new_text_file_{}_.txt".format(time_str)
st.markdown("#### Download Summary as a File ###")
href = f'Click to Download!!'
st.markdown(href,unsafe_allow_html=True)
@st.cache_data
def generate_eval(raw_text, N, chunk):
# Generate N questions from context of chunk chars
# IN: text, N questions, chunk size to draw question from in the doc
# OUT: eval set as JSON list
# raw_text = ','.join(raw_text)
update = st.empty()
ques_update = st.empty()
update.info("`Generating sample questions ...`")
n = len(raw_text)
starting_indices = [random.randint(0, n-chunk) for _ in range(N)]
sub_sequences = [raw_text[i:i+chunk] for i in starting_indices]
chain = QAGenerationChain.from_llm(ChatOpenAI(temperature=0))
eval_set = []
for i, b in enumerate(sub_sequences):
try:
qa = chain.run(b)
eval_set.append(qa)
ques_update.info(f"Creating Question: {i+1}")
except Exception as e:
print(e)
st.warning(f'Error in generating Question: {i+1}...', icon="⚠️")
continue
eval_set_full = list(itertools.chain.from_iterable(eval_set))
update.empty()
ques_update.empty()
return eval_set_full
@st.cache_resource
def create_prompt_and_llm():
'''Create prompt'''
llm = ChatOpenAI(temperature=0, streaming=True, model="gpt-4o")
message = SystemMessage(
content=(
"You are a helpful chatbot who is tasked with answering questions acuurately about earnings call transcript provided. "
"Unless otherwise explicitly stated, it is probably fair to assume that questions are about the earnings call transcript. "
"If there is any ambiguity, you probably assume they are about that."
"Do not use any information not provided in the earnings context and remember you are a to speak like a finance expert."
"If you don't know the answer, just say 'There is no relevant answer in the given earnings call transcript'"
"don't try to make up an answer"
)
)
prompt = OpenAIFunctionsAgent.create_prompt(
system_message=message,
extra_prompt_messages=[MessagesPlaceholder(variable_name="history")],
)
return prompt, llm
@st.cache_resource
def gen_embeddings(embedding_model):
'''Generate embeddings for given model'''
if 'hkunlp' in embedding_model:
embeddings = HuggingFaceInstructEmbeddings(model_name=embedding_model,
query_instruction='Represent the Financial question for retrieving supporting paragraphs: ',
embed_instruction='Represent the Financial paragraph for retrieval: ')
elif 'mpnet' in embedding_model:
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
elif 'FlagEmbedding' in embedding_model:
encode_kwargs = {'normalize_embeddings': True}
embeddings = HuggingFaceBgeEmbeddings(model_name=embedding_model,
encode_kwargs = encode_kwargs
)
return embeddings
@st.cache_data
def create_vectorstore(corpus, title, embedding_model, chunk_size=1000, overlap=50):
'''Process text for Semantic Search'''
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size,chunk_overlap=overlap)
texts = text_splitter.split_text(corpus)
embeddings = gen_embeddings(embedding_model)
vectorstore = FAISS.from_texts(texts, embeddings, metadatas=[{"source": i} for i in range(len(texts))])
return vectorstore
@st.cache_resource
def create_memory_and_agent(_docsearch):
'''Embed text and generate semantic search scores'''
#create vectorstore
vectorstore = _docsearch.as_retriever(search_kwargs={"k": 4})
#create retriever tool
tool = create_retriever_tool(
vectorstore,
"earnings_call_search",
"Searches and returns documents using the earnings context provided as a source, relevant to the user input question.",
)
tools = [tool]
prompt,llm = create_prompt_and_llm()
agent = OpenAIFunctionsAgent(llm=llm, tools=tools, prompt=prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
return_intermediate_steps=True,
)
memory = AgentTokenBufferMemory(llm=llm)
return memory, agent_executor
@st.cache_data
def gen_sentiment(text):
'''Generate sentiment of given text'''
return sent_pipe(text)[0]['label']
@st.cache_data
def gen_annotated_text(df):
'''Generate annotated text'''
tag_list=[]
for row in df.itertuples():
label = row[2]
text = row[1]
if label == 'Positive':
tag_list.append((text,label,'#8fce00'))
elif label == 'Negative':
tag_list.append((text,label,'#f44336'))
else:
tag_list.append((text,label,'#000000'))
return tag_list
def display_df_as_table(model,top_k,score='score'):
'''Display the df with text and scores as a table'''
df = pd.DataFrame([(hit[score],passages[hit['corpus_id']]) for hit in model[0:top_k]],columns=['Score','Text'])
df['Score'] = round(df['Score'],2)
return df
def make_spans(text,results):
results_list = []
for i in range(len(results)):
results_list.append(results[i]['label'])
facts_spans = []
facts_spans = list(zip(sent_tokenizer(text),results_list))
return facts_spans
##Fiscal Sentiment by Sentence
def fin_ext(text):
results = remote_clx(sent_tokenizer(text))
return make_spans(text,results)
## Knowledge Graphs code
def get_article(url):
article = Article(url)
article.download()
article.parse()
return article