import re import openai import pandas as pd import pinecone import spacy import streamlit_scrollable_textbox as stx import torch from sentence_transformers import SentenceTransformer from tqdm import tqdm from transformers import ( AutoModelForMaskedLM, AutoModelForSeq2SeqLM, AutoTokenizer, pipeline, ) import streamlit as st @st.experimental_singleton def get_data(): data = pd.read_csv("earnings_calls_cleaned_metadata.csv") return data # Initialize Spacy Model @st.experimental_singleton def get_spacy_model(): return spacy.load("en_core_web_sm") # Initialize models from HuggingFace @st.experimental_singleton def get_t5_model(): return pipeline("summarization", model="t5-small", tokenizer="t5-small") @st.experimental_singleton def get_flan_t5_model(): return pipeline( "summarization", model="google/flan-t5-xl", tokenizer="google/flan-t5-xl", max_length=512, # length_penalty = 0 ) @st.experimental_singleton def get_mpnet_embedding_model(): device = "cuda" if torch.cuda.is_available() else "cpu" model = SentenceTransformer( "sentence-transformers/all-mpnet-base-v2", device=device ) model.max_seq_length = 512 return model @st.experimental_singleton def get_splade_sparse_embedding_model(): model_sparse = "naver/splade-cocondenser-ensembledistil" # check device device = "cuda" if torch.cuda.is_available() else "cpu" tokenizer = AutoTokenizer.from_pretrained(model_sparse) model_sparse = AutoModelForMaskedLM.from_pretrained(model_sparse) # move to gpu if available model_sparse.to(device) return model_sparse, tokenizer @st.experimental_singleton def get_sgpt_embedding_model(): device = "cuda" if torch.cuda.is_available() else "cpu" model = SentenceTransformer( "Muennighoff/SGPT-125M-weightedmean-nli-bitfit", device=device ) model.max_seq_length = 512 return model @st.experimental_memo def save_key(api_key): return api_key def create_dense_embeddings(query, model): dense_emb = model.encode([query]).tolist() return dense_emb def create_sparse_embeddings(query, model, tokenizer): device = "cuda" if torch.cuda.is_available() else "cpu" inputs = tokenizer(query, return_tensors="pt").to(device) with torch.no_grad(): logits = model(**inputs).logits inter = torch.log1p(torch.relu(logits[0])) token_max = torch.max(inter, dim=0) # sum over input tokens nz_tokens = torch.where(token_max.values > 0)[0] nz_weights = token_max.values[nz_tokens] order = torch.sort(nz_weights, descending=True) nz_weights = nz_weights[order[1]] nz_tokens = nz_tokens[order[1]] return { "indices": nz_tokens.cpu().numpy().tolist(), "values": nz_weights.cpu().numpy().tolist(), } def hybrid_score_norm(dense, sparse, alpha: float): """Hybrid score using a convex combination alpha * dense + (1 - alpha) * sparse Args: dense: Array of floats representing sparse: a dict of `indices` and `values` alpha: scale between 0 and 1 """ if alpha < 0 or alpha > 1: raise ValueError("Alpha must be between 0 and 1") hs = { "indices": sparse["indices"], "values": [v * (1 - alpha) for v in sparse["values"]], } return [v * alpha for v in dense], hs def query_pinecone_sparse( dense_vec, sparse_vec, top_k, index, year, quarter, ticker, participant_type, threshold=0.25, ): if participant_type == "Company Speaker": participant = "Answer" else: participant = "Question" if year == "All": if quarter == "All": xc = index.query( vector=dense_vec, sparse_vector=sparse_vec, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: xc = index.query( vector=dense_vec, sparse_vector=sparse_vec, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: # search pinecone index for context passage with the answer xc = index.query( vector=dense_vec, sparse_vector=sparse_vec, top_k=top_k, filter={ "Year": int(year), "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) # filter the context passages based on the score threshold filtered_matches = [] for match in xc["matches"]: if match["score"] >= threshold: filtered_matches.append(match) xc["matches"] = filtered_matches return xc def query_pinecone( dense_vec, top_k, index, year, quarter, ticker, participant_type, threshold=0.25, ): if participant_type == "Company Speaker": participant = "Answer" else: participant = "Question" if year == "All": if quarter == "All": xc = index.query( vector=dense_vec, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: xc = index.query( vector=dense_vec, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: # search pinecone index for context passage with the answer xc = index.query( vector=dense_vec, top_k=top_k, filter={ "Year": int(year), "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) # filter the context passages based on the score threshold filtered_matches = [] for match in xc["matches"]: if match["score"] >= threshold: filtered_matches.append(match) xc["matches"] = filtered_matches return xc def format_query(query_results): # extract passage_text from Pinecone search result context = [ result["metadata"]["Text"] for result in query_results["matches"] ] return context def sentence_id_combine(data, query_results, lag=1): # Extract sentence IDs from query results ids = [ result["metadata"]["Sentence_id"] for result in query_results["matches"] ] # Generate new IDs by adding a lag value to the original IDs new_ids = [id + i for id in ids for i in range(-lag, lag + 1)] # Remove duplicates and sort the new IDs new_ids = sorted(set(new_ids)) # Create a list of lookup IDs by grouping the new IDs in groups of lag*2+1 lookup_ids = [ new_ids[i : i + (lag * 2 + 1)] for i in range(0, len(new_ids), lag * 2 + 1) ] # Create a list of context sentences by joining the sentences corresponding to the lookup IDs context_list = [ " ".join( data.loc[data["Sentence_id"].isin(lookup_id), "Text"].to_list() ) for lookup_id in lookup_ids ] return context_list def text_lookup(data, sentence_ids): context = ". ".join(data.iloc[sentence_ids].to_list()) return context def generate_gpt_prompt(query_text, context_list): context = " ".join(context_list) prompt = f"""Answer the question in 6 long detailed points as accurately as possible using the provided context. Include as many key details as possible. Context: {context} Question: {query_text} Answer:""" return prompt def generate_gpt_prompt_2(query_text, context_list): context = " ".join(context_list) prompt = f""" Context information is below: --------------------- {context} --------------------- Given the context information and prior knowledge, answer this question: {query_text} Try to include as many key details as possible and format the answer in points.""" return prompt def generate_flant5_prompt(query_text, context_list): context = " \n".join(context_list) prompt = f"""Given the context information and prior knowledge, answer this question: {query_text} Context information is below: --------------------- {context} ---------------------""" return prompt def get_context_list_prompt(prompt): prompt_list = prompt.split("---------------------") context = prompt_list[-2].strip() context_list = context.split(" \n") return context_list def gpt_model(prompt): response = openai.Completion.create( model="text-davinci-003", prompt=prompt, temperature=0.1, max_tokens=1024, top_p=1.0, frequency_penalty=0.5, presence_penalty=1, ) return response.choices[0].text # Entity Extraction def extract_quarter_year(string): # Extract year from string year_match = re.search(r"\d{4}", string) if year_match: year = year_match.group() else: return None, None # Extract quarter from string quarter_match = re.search(r"Q\d", string) if quarter_match: quarter = "Q" + quarter_match.group()[1] else: return None, None return quarter, year def extract_entities(query, model): doc = model(query) entities = {ent.label_: ent.text for ent in doc.ents} if "ORG" in entities.keys(): company = entities["ORG"].lower() if "DATE" in entities.keys(): quarter, year = extract_quarter_year(entities["DATE"]) return company, quarter, year else: return company, None, None else: if "DATE" in entities.keys(): quarter, year = extract_quarter_year(entities["DATE"]) return None, quarter, year else: return None, None, None def clean_entities(company, quarter, year): company_ticker_map = { "apple": "AAPL", "amd": "AMD", "amazon": "AMZN", "cisco": "CSCO", "google": "GOOGL", "microsoft": "MSFT", "nvidia": "NVDA", "asml": "ASML", "intel": "INTC", "micron": "MU", } ticker_choice = [ "AAPL", "CSCO", "MSFT", "ASML", "NVDA", "GOOGL", "MU", "INTC", "AMZN", "AMD", ] year_choice = ["2020", "2019", "2018", "2017", "2016", "All"] quarter_choice = ["Q1", "Q2", "Q3", "Q4", "All"] if company is not None: if company in company_ticker_map.keys(): ticker = company_ticker_map[company] ticker_index = ticker_choice.index(ticker) else: ticker_index = 0 else: ticker_index = 0 if quarter is not None: if quarter in quarter_choice: quarter_index = quarter_choice.index(quarter) else: quarter_index = len(quarter_choice) - 1 else: quarter_index = len(quarter_choice) - 1 if year is not None: if year in year_choice: year_index = year_choice.index(year) else: year_index = len(year_choice) - 1 else: year_index = len(year_choice) - 1 return ticker_index, quarter_index, year_index # Transcript Retrieval def retrieve_transcript(data, year, quarter, ticker): if year == "All" or quarter == "All": row = ( data.loc[ (data.Ticker == ticker), ["File_Name"], ] .drop_duplicates() .iloc[0, 0] ) else: row = ( data.loc[ (data.Year == int(year)) & (data.Quarter == quarter) & (data.Ticker == ticker), ["File_Name"], ] .drop_duplicates() .iloc[0, 0] ) # convert row to a string and join values with "-" # row_str = "-".join(row.astype(str)) + ".txt" open_file = open( f"Transcripts/{ticker}/{row}", "r", ) file_text = open_file.read() return file_text