nickmuchi's picture
Update functions.py
fdb5d4d
raw history blame
No virus
23.2 kB
import whisper
import os
import random
import openai
import yt_dlp
from pytube import YouTube, extract
import pandas as pd
import plotly_express as px
import nltk
import plotly.graph_objects as go
from optimum.onnxruntime import ORTModelForSequenceClassification
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM, AutoModelForTokenClassification
from sentence_transformers import SentenceTransformer, CrossEncoder, util
import streamlit as st
import en_core_web_lg
import validators
import re
import itertools
import numpy as np
from bs4 import BeautifulSoup
import base64, time
from annotated_text import annotated_text
import pickle, math
import wikipedia
from pyvis.network import Network
import torch
from pydub import AudioSegment
from langchain.docstore.document import Document
from langchain.embeddings import HuggingFaceEmbeddings, HuggingFaceBgeEmbeddings, HuggingFaceInstructEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chat_models import ChatOpenAI
from langchain.chains import QAGenerationChain
from langchain.callbacks import StreamlitCallbackHandler
from langchain.agents import OpenAIFunctionsAgent, AgentExecutor
from langchain.agents.agent_toolkits import create_retriever_tool
from langchain.agents.openai_functions_agent.agent_token_buffer_memory import (
AgentTokenBufferMemory,
)
from langchain.prompts import MessagesPlaceholder
from langchain.prompts.chat import (
ChatPromptTemplate,
SystemMessagePromptTemplate,
AIMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from langchain.schema import (
AIMessage,
HumanMessage,
SystemMessage
)
from langchain.prompts import PromptTemplate
from langsmith import Client
client = Client()
nltk.download('punkt')
from nltk import sent_tokenize
OPEN_AI_KEY = os.environ.get('OPEN_AI_KEY')
time_str = time.strftime("%d%m%Y-%H%M%S")
HTML_WRAPPER = """<div style="overflow-x: auto; border: 1px solid #e6e9ef; border-radius: 0.25rem; padding: 1rem;
margin-bottom: 2.5rem">{}</div> """
###################### Functions #######################################################################################
#load all required models and cache
@st.cache_resource
def load_models():
'''Load and cache all the models to be used'''
q_model = ORTModelForSequenceClassification.from_pretrained("nickmuchi/quantized-optimum-finbert-tone")
ner_model = AutoModelForTokenClassification.from_pretrained("xlm-roberta-large-finetuned-conll03-english")
q_tokenizer = AutoTokenizer.from_pretrained("nickmuchi/quantized-optimum-finbert-tone")
ner_tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-large-finetuned-conll03-english")
sent_pipe = pipeline("text-classification",model=q_model, tokenizer=q_tokenizer)
sum_pipe = pipeline("summarization",model="philschmid/flan-t5-base-samsum",clean_up_tokenization_spaces=True)
ner_pipe = pipeline("ner", model=ner_model, tokenizer=ner_tokenizer, grouped_entities=True)
cross_encoder = CrossEncoder('cross-encoder/mmarco-mMiniLMv2-L12-H384-v1') #cross-encoder/ms-marco-MiniLM-L-12-v2
sbert = SentenceTransformer('all-MiniLM-L6-v2')
return sent_pipe, sum_pipe, ner_pipe, cross_encoder, sbert
@st.cache_data
def load_asr_model(model_name):
'''Load the open source whisper model in cases where the API is not working'''
model = whisper.load_model(model_name)
return model
@st.cache_resource
def get_spacy():
nlp = en_core_web_lg.load()
return nlp
nlp = get_spacy()
sent_pipe, sum_pipe, ner_pipe, cross_encoder, sbert = load_models()
@st.cache_data
def get_yt_audio(url):
'''Get YT video from given URL link'''
yt = YouTube(url)
title = yt.title
# Get the first available audio stream and download it
audio_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download()
return audio_stream, title
@st.cache_data
def get_yt_audio_dl(url):
'''Back up for when pytube is down'''
temp_audio_file = os.path.join('output', 'audio')
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': temp_audio_file,
'quiet': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
title = info.get('title', None)
ydl.download([url])
#with open(temp_audio_file+'.mp3', 'rb') as file:
audio_file = os.path.join('output', 'audio.mp3')
return audio_file, title
@st.cache_data
def load_whisper_api(audio):
'''Transcribe YT audio to text using Open AI API'''
file = open(audio, "rb")
transcript = openai.Audio.translate("whisper-1", file)
return transcript
@st.cache_data
def transcribe_yt_video(link, py_tube=True):
'''Transcribe YouTube video'''
if py_tube:
audio_file, title = get_yt_audio(link)
print(f'audio_file:{audio_file}')
st.session_state['audio'] = audio_file
print(f"audio_file_session_state:{st.session_state['audio'] }")
#Get size of audio file
audio_size = round(os.path.getsize(st.session_state['audio'])/(1024*1024),1)
#Check if file is > 24mb, if not then use Whisper API
if audio_size <= 25:
st.info("`Transcribing YT audio...`")
#Use whisper API
results = load_whisper_api(st.session_state['audio'])['text']
else:
st.warning('File size larger than 24mb, applying chunking and transcription',icon="⚠️")
song = AudioSegment.from_file(st.session_state['audio'], format='mp4')
# PyDub handles time in milliseconds
twenty_minutes = 20 * 60 * 1000
chunks = song[::twenty_minutes]
transcriptions = []
video_id = extract.video_id(link)
for i, chunk in enumerate(chunks):
chunk.export(f'output/chunk_{i}_{video_id}.mp4', format='mp4')
transcriptions.append(load_whisper_api(f'output/chunk_{i}_{video_id}.mp4')['text'])
results = ','.join(transcriptions)
else:
audio_file, title = get_yt_audio_dl(link)
print(f'audio_file:{audio_file}')
st.session_state['audio'] = audio_file
print(f"audio_file_session_state:{st.session_state['audio'] }")
#Get size of audio file
audio_size = round(os.path.getsize(st.session_state['audio'])/(1024*1024),1)
#Check if file is > 24mb, if not then use Whisper API
if audio_size <= 25:
st.info("`Transcribing YT audio...`")
#Use whisper API
results = load_whisper_api(st.session_state['audio'])['text']
else:
st.warning('File size larger than 24mb, applying chunking and transcription',icon="⚠️")
song = AudioSegment.from_file(st.session_state['audio'], format='mp3')
# PyDub handles time in milliseconds
twenty_minutes = 20 * 60 * 1000
chunks = song[::twenty_minutes]
transcriptions = []
video_id = extract.video_id(link)
for i, chunk in enumerate(chunks):
chunk.export(f'output/chunk_{i}_{video_id}.mp3', format='mp3')
transcriptions.append(load_whisper_api(f'output/chunk_{i}_{video_id}.mp3')['text'])
results = ','.join(transcriptions)
st.info("`YT Video transcription process complete...`")
return results, title
@st.cache_data
def inference(link, upload):
'''Convert Youtube video or Audio upload to text'''
try:
if validators.url(link):
st.info("`Downloading YT audio...`")
results, title = transcribe_yt_video(link)
return results, title
elif _upload:
#Get size of audio file
audio_size = round(os.path.getsize(_upload)/(1024*1024),1)
#Check if file is > 24mb, if not then use Whisper API
if audio_size <= 25:
st.info("`Transcribing uploaded audio...`")
#Use whisper API
results = load_whisper_api(_upload)['text']
else:
st.write('File size larger than 24mb, applying chunking and transcription')
song = AudioSegment.from_file(_upload)
# PyDub handles time in milliseconds
twenty_minutes = 20 * 60 * 1000
chunks = song[::twenty_minutes]
transcriptions = []
st.info("`Transcribing uploaded audio...`")
for i, chunk in enumerate(chunks):
chunk.export(f'output/chunk_{i}.mp4', format='mp4')
transcriptions.append(load_whisper_api(f'output/chunk_{i}.mp4')['text'])
results = ','.join(transcriptions)
st.info("`Uploaded audio transcription process complete...`")
return results, "Transcribed Earnings Audio"
except Exception as e:
st.error(f'''PyTube Error: {e},
Using yt_dlp module, might take longer than expected''',icon="🚨")
results, title = transcribe_yt_video(link, py_tube=False)
# results = _asr_model.transcribe(st.session_state['audio'], task='transcribe', language='en')
return results, title
@st.cache_resource
def send_feedback(run_id, score):
client.create_feedback(run_id, "user_score", score=score)
@st.cache_data
def clean_text(text):
'''Clean all text after inference'''
text = text.encode("ascii", "ignore").decode() # unicode
text = re.sub(r"https*\S+", " ", text) # url
text = re.sub(r"@\S+", " ", text) # mentions
text = re.sub(r"#\S+", " ", text) # hastags
text = re.sub(r"\s{2,}", " ", text) # over spaces
return text
@st.cache_data
def chunk_long_text(text,threshold,window_size=3,stride=2):
'''Preprocess text and chunk for sentiment analysis'''
#Convert cleaned text into sentences
sentences = sent_tokenize(text)
out = []
#Limit the length of each sentence to a threshold
for chunk in sentences:
if len(chunk.split()) < threshold:
out.append(chunk)
else:
words = chunk.split()
num = int(len(words)/threshold)
for i in range(0,num*threshold+1,threshold):
out.append(' '.join(words[i:threshold+i]))
passages = []
#Combine sentences into a window of size window_size
for paragraph in [out]:
for start_idx in range(0, len(paragraph), stride):
end_idx = min(start_idx+window_size, len(paragraph))
passages.append(" ".join(paragraph[start_idx:end_idx]))
return passages
@st.cache_data
def sentiment_pipe(earnings_text):
'''Determine the sentiment of the text'''
earnings_sentences = chunk_long_text(earnings_text,150,1,1)
earnings_sentiment = sent_pipe(earnings_sentences)
return earnings_sentiment, earnings_sentences
@st.cache_data
def chunk_and_preprocess_text(text, model_name= 'philschmid/flan-t5-base-samsum'):
'''Chunk and preprocess text for summarization'''
tokenizer = AutoTokenizer.from_pretrained(model_name)
sentences = sent_tokenize(text)
# initialize
length = 0
chunk = ""
chunks = []
count = -1
for sentence in sentences:
count += 1
combined_length = len(tokenizer.tokenize(sentence)) + length # add the no. of sentence tokens to the length counter
if combined_length <= tokenizer.max_len_single_sentence: # if it doesn't exceed
chunk += sentence + " " # add the sentence to the chunk
length = combined_length # update the length counter
# if it is the last sentence
if count == len(sentences) - 1:
chunks.append(chunk) # save the chunk
else:
chunks.append(chunk) # save the chunk
# reset
length = 0
chunk = ""
# take care of the overflow sentence
chunk += sentence + " "
length = len(tokenizer.tokenize(sentence))
return chunks
@st.cache_data
def summarize_text(text_to_summarize,max_len,min_len):
'''Summarize text with HF model'''
summarized_text = sum_pipe(text_to_summarize,
max_length=max_len,
min_length=min_len,
do_sample=False,
early_stopping=True,
num_beams=4)
summarized_text = ' '.join([summ['summary_text'] for summ in summarized_text])
return summarized_text
@st.cache_data
def get_all_entities_per_sentence(text):
doc = nlp(''.join(text))
sentences = list(doc.sents)
entities_all_sentences = []
for sentence in sentences:
entities_this_sentence = []
# SPACY ENTITIES
for entity in sentence.ents:
entities_this_sentence.append(str(entity))
# XLM ENTITIES
entities_xlm = [entity["word"] for entity in ner_pipe(str(sentence))]
for entity in entities_xlm:
entities_this_sentence.append(str(entity))
entities_all_sentences.append(entities_this_sentence)
return entities_all_sentences
@st.cache_data
def get_all_entities(text):
all_entities_per_sentence = get_all_entities_per_sentence(text)
return list(itertools.chain.from_iterable(all_entities_per_sentence))
@st.cache_data
def get_and_compare_entities(article_content,summary_output):
all_entities_per_sentence = get_all_entities_per_sentence(article_content)
entities_article = list(itertools.chain.from_iterable(all_entities_per_sentence))
all_entities_per_sentence = get_all_entities_per_sentence(summary_output)
entities_summary = list(itertools.chain.from_iterable(all_entities_per_sentence))
matched_entities = []
unmatched_entities = []
for entity in entities_summary:
if any(entity.lower() in substring_entity.lower() for substring_entity in entities_article):
matched_entities.append(entity)
elif any(
np.inner(sbert.encode(entity, show_progress_bar=False),
sbert.encode(art_entity, show_progress_bar=False)) > 0.9 for
art_entity in entities_article):
matched_entities.append(entity)
else:
unmatched_entities.append(entity)
matched_entities = list(dict.fromkeys(matched_entities))
unmatched_entities = list(dict.fromkeys(unmatched_entities))
matched_entities_to_remove = []
unmatched_entities_to_remove = []
for entity in matched_entities:
for substring_entity in matched_entities:
if entity != substring_entity and entity.lower() in substring_entity.lower():
matched_entities_to_remove.append(entity)
for entity in unmatched_entities:
for substring_entity in unmatched_entities:
if entity != substring_entity and entity.lower() in substring_entity.lower():
unmatched_entities_to_remove.append(entity)
matched_entities_to_remove = list(dict.fromkeys(matched_entities_to_remove))
unmatched_entities_to_remove = list(dict.fromkeys(unmatched_entities_to_remove))
for entity in matched_entities_to_remove:
matched_entities.remove(entity)
for entity in unmatched_entities_to_remove:
unmatched_entities.remove(entity)
return matched_entities, unmatched_entities
@st.cache_data
def highlight_entities(article_content,summary_output):
markdown_start_red = "<mark class=\"entity\" style=\"background: rgb(238, 135, 135);\">"
markdown_start_green = "<mark class=\"entity\" style=\"background: rgb(121, 236, 121);\">"
markdown_end = "</mark>"
matched_entities, unmatched_entities = get_and_compare_entities(article_content,summary_output)
for entity in matched_entities:
summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_green + entity + markdown_end,summary_output)
for entity in unmatched_entities:
summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_red + entity + markdown_end,summary_output)
print("")
print("")
soup = BeautifulSoup(summary_output, features="html.parser")
return HTML_WRAPPER.format(soup)
def summary_downloader(raw_text):
'''Download the summary generated'''
b64 = base64.b64encode(raw_text.encode()).decode()
new_filename = "new_text_file_{}_.txt".format(time_str)
st.markdown("#### Download Summary as a File ###")
href = f'<a href="data:file/txt;base64,{b64}" download="{new_filename}">Click to Download!!</a>'
st.markdown(href,unsafe_allow_html=True)
@st.cache_data
def generate_eval(raw_text, N, chunk):
# Generate N questions from context of chunk chars
# IN: text, N questions, chunk size to draw question from in the doc
# OUT: eval set as JSON list
# raw_text = ','.join(raw_text)
update = st.empty()
ques_update = st.empty()
update.info("`Generating sample questions ...`")
n = len(raw_text)
starting_indices = [random.randint(0, n-chunk) for _ in range(N)]
sub_sequences = [raw_text[i:i+chunk] for i in starting_indices]
chain = QAGenerationChain.from_llm(ChatOpenAI(temperature=0))
eval_set = []
for i, b in enumerate(sub_sequences):
try:
qa = chain.run(b)
eval_set.append(qa)
ques_update.info(f"Creating Question: {i+1}")
except Exception as e:
print(e)
st.warning(f'Error in generating Question: {i+1}...', icon="⚠️")
continue
eval_set_full = list(itertools.chain.from_iterable(eval_set))
update.empty()
ques_update.empty()
return eval_set_full
@st.cache_resource
def create_prompt_and_llm():
'''Create prompt'''
llm = ChatOpenAI(temperature=0, streaming=True, model="gpt-4")
message = SystemMessage(
content=(
"You are a helpful chatbot who is tasked with answering questions acuurately about earnings call transcript provided. "
"Unless otherwise explicitly stated, it is probably fair to assume that questions are about the earnings call transcript. "
"If there is any ambiguity, you probably assume they are about that."
"Do not use any information not provided in the earnings context and remember you are a to speak like a finance expert."
"If you don't know the answer, just say 'There is no relevant answer in the given earnings call transcript'"
"don't try to make up an answer"
)
)
prompt = OpenAIFunctionsAgent.create_prompt(
system_message=message,
extra_prompt_messages=[MessagesPlaceholder(variable_name="history")],
)
return prompt, llm
@st.cache_resource
def gen_embeddings(embedding_model):
'''Generate embeddings for given model'''
if 'hkunlp' in embedding_model:
embeddings = HuggingFaceInstructEmbeddings(model_name=embedding_model,
query_instruction='Represent the Financial question for retrieving supporting paragraphs: ',
embed_instruction='Represent the Financial paragraph for retrieval: ')
elif 'mpnet' in embedding_model:
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
elif 'FlagEmbedding' in embedding_model:
encode_kwargs = {'normalize_embeddings': True}
embeddings = HuggingFaceBgeEmbeddings(model_name=embedding_model,
encode_kwargs = encode_kwargs
)
return embeddings
@st.cache_data
def create_vectorstore(corpus, title, embedding_model, chunk_size=1000, overlap=50):
'''Process text for Semantic Search'''
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size,chunk_overlap=overlap)
texts = text_splitter.split_text(corpus)
embeddings = gen_embeddings(embedding_model)
vectorstore = FAISS.from_texts(texts, embeddings, metadatas=[{"source": i} for i in range(len(texts))])
return vectorstore
@st.cache_data
def create_memory_and_agent(_docsearch):
'''Embed text and generate semantic search scores'''
#create vectorstore
vectorstore = _docsearch.as_retriever(search_kwargs={"k": 4})
#create retriever tool
tool = create_retriever_tool(
vectorstore,
"earnings_call_search",
"Searches and returns documents using the earnings context provided as a source, relevant to the user input question.",
)
tools = [tool]
prompt,llm = create_prompt_and_llm()
agent = OpenAIFunctionsAgent(llm=llm, tools=tools, prompt=prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
return_intermediate_steps=True,
)
memory = AgentTokenBufferMemory(llm=llm)
return memory, agent_executor
@st.cache_data
def gen_sentiment(text):
'''Generate sentiment of given text'''
return sent_pipe(text)[0]['label']
@st.cache_data
def gen_annotated_text(df):
'''Generate annotated text'''
tag_list=[]
for row in df.itertuples():
label = row[2]
text = row[1]
if label == 'Positive':
tag_list.append((text,label,'#8fce00'))
elif label == 'Negative':
tag_list.append((text,label,'#f44336'))
else:
tag_list.append((text,label,'#000000'))
return tag_list
def display_df_as_table(model,top_k,score='score'):
'''Display the df with text and scores as a table'''
df = pd.DataFrame([(hit[score],passages[hit['corpus_id']]) for hit in model[0:top_k]],columns=['Score','Text'])
df['Score'] = round(df['Score'],2)
return df
def make_spans(text,results):
results_list = []
for i in range(len(results)):
results_list.append(results[i]['label'])
facts_spans = []
facts_spans = list(zip(sent_tokenizer(text),results_list))
return facts_spans
##Fiscal Sentiment by Sentence
def fin_ext(text):
results = remote_clx(sent_tokenizer(text))
return make_spans(text,results)
## Knowledge Graphs code
def get_article(url):
article = Article(url)
article.download()
article.parse()
return article