import openai import pandas as pd import streamlit_scrollable_textbox as stx import torch from sentence_transformers import SentenceTransformer from tqdm import tqdm from transformers import ( AutoModelForMaskedLM, AutoModelForSeq2SeqLM, AutoTokenizer, pipeline, ) import pinecone import streamlit as st @st.experimental_singleton def get_data(): data = pd.read_csv("earnings_calls_cleaned_metadata.csv") return data # Initialize models from HuggingFace @st.experimental_singleton def get_t5_model(): return pipeline("summarization", model="t5-small", tokenizer="t5-small") @st.experimental_singleton def get_flan_t5_model(): return pipeline( "summarization", model="google/flan-t5-small", tokenizer="google/flan-t5-small", max_length=512, # length_penalty = 0 ) @st.experimental_singleton def get_mpnet_embedding_model(): device = "cuda" if torch.cuda.is_available() else "cpu" model = SentenceTransformer( "sentence-transformers/all-mpnet-base-v2", device=device ) model.max_seq_length = 512 return model @st.experimental_singleton def get_splade_sparse_embedding_model(): model_sparse = "naver/splade-cocondenser-ensembledistil" # check device device = "cuda" if torch.cuda.is_available() else "cpu" tokenizer = AutoTokenizer.from_pretrained(model_sparse) model_sparse = AutoModelForMaskedLM.from_pretrained(model_sparse) # move to gpu if available model_sparse.to(device) return model_sparse, tokenizer @st.experimental_singleton def get_sgpt_embedding_model(): device = "cuda" if torch.cuda.is_available() else "cpu" model = SentenceTransformer( "Muennighoff/SGPT-125M-weightedmean-nli-bitfit", device=device ) model.max_seq_length = 512 return model @st.experimental_memo def save_key(api_key): return api_key def create_dense_embeddings(query, model): dense_emb = model.encode([query]).tolist() return dense_emb def create_sparse_embeddings(query, model, tokenizer): device = "cuda" if torch.cuda.is_available() else "cpu" inputs = tokenizer(query, return_tensors="pt").to(device) with torch.no_grad(): logits = model(**inputs).logits inter = torch.log1p(torch.relu(logits[0])) token_max = torch.max(inter, dim=0) # sum over input tokens nz_tokens = torch.where(token_max.values > 0)[0] nz_weights = token_max.values[nz_tokens] order = torch.sort(nz_weights, descending=True) nz_weights = nz_weights[order[1]] nz_tokens = nz_tokens[order[1]] return { "indices": nz_tokens.cpu().numpy().tolist(), "values": nz_weights.cpu().numpy().tolist(), } def hybrid_score_norm(dense, sparse, alpha: float): """Hybrid score using a convex combination alpha * dense + (1 - alpha) * sparse Args: dense: Array of floats representing sparse: a dict of `indices` and `values` alpha: scale between 0 and 1 """ if alpha < 0 or alpha > 1: raise ValueError("Alpha must be between 0 and 1") hs = { "indices": sparse["indices"], "values": [v * (1 - alpha) for v in sparse["values"]], } return [v * alpha for v in dense], hs def query_pinecone_sparse( dense_vec, sparse_vec, top_k, index, year, quarter, ticker, participant_type, threshold=0.25, ): if participant_type == "Company Speaker": participant = "Answer" else: participant = "Question" if year == "All": if quarter == "All": xc = index.query( vector=dense_vec, sparse_vector=sparse_vec, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: xc = index.query( vector=dense_vec, sparse_vector=sparse_vec, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: # search pinecone index for context passage with the answer xc = index.query( vector=dense_vec, sparse_vector=sparse_vec, top_k=top_k, filter={ "Year": int(year), "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) # filter the context passages based on the score threshold filtered_matches = [] for match in xc["matches"]: if match["score"] >= threshold: filtered_matches.append(match) xc["matches"] = filtered_matches return xc def query_pinecone( dense_vec, top_k, index, year, quarter, ticker, participant_type, threshold=0.25, ): if participant_type == "Company Speaker": participant = "Answer" else: participant = "Question" if year == "All": if quarter == "All": xc = index.query( vector=dense_vec, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: xc = index.query( vector=dense_vec, top_k=top_k, filter={ "Year": { "$in": [ int("2020"), int("2019"), int("2018"), int("2017"), int("2016"), ] }, "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) else: # search pinecone index for context passage with the answer xc = index.query( vector=dense_vec, top_k=top_k, filter={ "Year": int(year), "Quarter": {"$eq": quarter}, "Ticker": {"$eq": ticker}, "QA_Flag": {"$eq": participant}, }, include_metadata=True, ) # filter the context passages based on the score threshold filtered_matches = [] for match in xc["matches"]: if match["score"] >= threshold: filtered_matches.append(match) xc["matches"] = filtered_matches return xc def format_query(query_results): # extract passage_text from Pinecone search result context = [ result["metadata"]["Text"] for result in query_results["matches"] ] return context def sentence_id_combine(data, query_results, lag=1): # Extract sentence IDs from query results ids = [ result["metadata"]["Sentence_id"] for result in query_results["matches"] ] # Generate new IDs by adding a lag value to the original IDs new_ids = [id + i for id in ids for i in range(-lag, lag + 1)] # Remove duplicates and sort the new IDs new_ids = sorted(set(new_ids)) # Create a list of lookup IDs by grouping the new IDs in groups of lag*2+1 lookup_ids = [ new_ids[i : i + (lag * 2 + 1)] for i in range(0, len(new_ids), lag * 2 + 1) ] # Create a list of context sentences by joining the sentences corresponding to the lookup IDs context_list = [ " ".join( data.loc[data["Sentence_id"].isin(lookup_id), "Text"].to_list() ) for lookup_id in lookup_ids ] return context_list def text_lookup(data, sentence_ids): context = ". ".join(data.iloc[sentence_ids].to_list()) return context def generate_prompt(query_text, context_list): context = " ".join(context_list) prompt = f"""Answer the question in 6 long detailed points as accurately as possible using the provided context. Include as many key details as possible. Context: {context} Question: {query_text} Answer:""" return prompt def generate_prompt_2(query_text, context_list): context = " ".join(context_list) prompt = f""" Context information is below: --------------------- {context} --------------------- Given the context information and prior knowledge, answer this question: {query_text} Try to include as many key details as possible and format the answer in points.""" return prompt def gpt_model(prompt): response = openai.Completion.create( model="text-davinci-003", prompt=prompt, temperature=0.1, max_tokens=1024, top_p=1.0, frequency_penalty=0.5, presence_penalty=1, ) return response.choices[0].text # Transcript Retrieval def retrieve_transcript(data, year, quarter, ticker): if year == "All" or quarter == "All": row = ( data.loc[ (data.Ticker == ticker), ["File_Name"], ] .drop_duplicates() .iloc[0, 0] ) else: row = ( data.loc[ (data.Year == int(year)) & (data.Quarter == quarter) & (data.Ticker == ticker), ["File_Name"], ] .drop_duplicates() .iloc[0, 0] ) # convert row to a string and join values with "-" # row_str = "-".join(row.astype(str)) + ".txt" open_file = open( f"Transcripts/{ticker}/{row}", "r", ) file_text = open_file.read() return file_text