|
import whisper |
|
import os |
|
from pytube import YouTube |
|
import pandas as pd |
|
import plotly_express as px |
|
import nltk |
|
import plotly.graph_objects as go |
|
from optimum.onnxruntime import ORTModelForSequenceClassification |
|
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification |
|
from sentence_transformers import SentenceTransformer, CrossEncoder, util |
|
import streamlit as st |
|
|
|
nltk.download('punkt') |
|
|
|
from nltk import sent_tokenize |
|
|
|
|
|
st.set_page_config( |
|
page_title="Home", |
|
page_icon="π", |
|
) |
|
|
|
auth_token = os.environ.get("auth_token") |
|
|
|
@st.experimental_singleton() |
|
def load_models(): |
|
asr_model = whisper.load_model("small") |
|
q_model = ORTModelForSequenceClassification.from_pretrained("nickmuchi/quantized-optimum-finbert-tone") |
|
q_tokenizer = AutoTokenizer.from_pretrained("nickmuchi/quantized-optimum-finbert-tone") |
|
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') |
|
|
|
return asr_model, q_model, q_tokenizer, cross_encoder |
|
|
|
asr_model, q_model, q_tokenizer, cross_encoder = load_models() |
|
|
|
@st.experimental_memo(suppress_st_warning=True) |
|
def inference(link, upload): |
|
'''Convert Youtube video or Audio upload to text''' |
|
|
|
if validators.url(link): |
|
|
|
yt = YouTube(link) |
|
title = yt.title |
|
path = yt.streams.filter(only_audio=True)[0].download(filename="audio.mp4") |
|
options = whisper.DecodingOptions(without_timestamps=True) |
|
results = asr_model.transcribe(path) |
|
|
|
return results, yt.title |
|
|
|
elif upload: |
|
results = asr_model.transcribe(upload) |
|
|
|
return results, "Transcribed Earnings Audio" |
|
|
|
@st.experimental_memo(suppress_st_warning=True) |
|
def sentiment_pipe(earnings_text): |
|
'''Determine the sentiment of the text''' |
|
|
|
remote_clx = pipeline("text-classification",model=q_model, tokenizer=q_tokenizer) |
|
|
|
earnings_sentiment = remote_clx(sent_tokenize(earnings_text)) |
|
|
|
return earnings_sentiment |
|
|
|
|
|
def preprocess_plain_text(text,window_size=3): |
|
'''Preprocess text for semantic search''' |
|
|
|
text = text.encode("ascii", "ignore").decode() |
|
text = re.sub(r"https*\S+", " ", text) |
|
text = re.sub(r"@\S+", " ", text) |
|
text = re.sub(r"#\S+", " ", text) |
|
text = re.sub(r"\s{2,}", " ", text) |
|
|
|
|
|
|
|
lines = [line.strip() for line in text.splitlines()] |
|
|
|
|
|
chunks = [phrase.strip() for line in lines for phrase in line.split(" ")] |
|
|
|
|
|
text = '\n'.join(chunk for chunk in chunks if chunk) |
|
|
|
|
|
paragraphs = [] |
|
for paragraph in text.replace('\n',' ').split("\n\n"): |
|
if len(paragraph.strip()) > 0: |
|
paragraphs.append(sent_tokenize(paragraph.strip())) |
|
|
|
|
|
|
|
|
|
window_size = window_size |
|
passages = [] |
|
for paragraph in paragraphs: |
|
for start_idx in range(0, len(paragraph), window_size): |
|
end_idx = min(start_idx+window_size, len(paragraph)) |
|
passages.append(" ".join(paragraph[start_idx:end_idx])) |
|
|
|
print(f"Sentences: {sum([len(p) for p in paragraphs])}") |
|
print(f"Passages: {len(passages)}") |
|
|
|
return passages |
|
|
|
def display_df_as_table(model,top_k,score='score'): |
|
'''Display the df with text and scores as a table''' |
|
|
|
df = pd.DataFrame([(hit[score],passages[hit['corpus_id']]) for hit in model[0:top_k]],columns=['Score','Text']) |
|
df['Score'] = round(df['Score'],2) |
|
|
|
return df |
|
|
|
def make_spans(text,results): |
|
results_list = [] |
|
for i in range(len(results)): |
|
results_list.append(results[i]['label']) |
|
facts_spans = [] |
|
facts_spans = list(zip(sent_tokenizer(text),results_list)) |
|
return facts_spans |
|
|
|
|
|
def fin_ext(text): |
|
results = remote_clx(sent_tokenizer(text)) |
|
return make_spans(text,results) |
|
|
|
|