import json import os import time import random import string import shutil import base64 from pathlib import Path from dotenv import load_dotenv load_dotenv() import streamlit as st from streamlit_option_menu import option_menu from streamlit_lottie import st_lottie import torch from langchain_community.embeddings import HuggingFaceInstructEmbeddings from langchain_community.document_loaders import PyPDFLoader, TextLoader, WikipediaLoader from langchain_community.vectorstores import Chroma from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.document_loaders.merge import MergedDataLoader from langchain_groq import ChatGroq from langchain.chains.combine_documents import create_stuff_documents_chain from langchain_core.prompts import ChatPromptTemplate from langchain.chains import create_retrieval_chain # langchain-openai==0.1.6 # langchain-text-splitters==0.0.1 # langdetect==1.0.9 # langsmith==0.1.53 # --------------------------------------------------------- Helper Function/Class 1 def loadLottieFile(filePath: str): with open(file=filePath, mode = "r") as f: return json.load(f) # --------------------------------------------------------- Helper Function/Class 2 def loadItOnce(container, animation, height, quality = 'high'): with container.container(): st_lottie(animation_source=animation, height=height, quality=quality) # --------------------------------------------------------- Helper Function/Class 3 def initialRampUp(llamaAnimation): with st.lottie_spinner(llamaAnimation, height = 700): if 'filesUploadedRecords' not in st.session_state: # To Maintain Active File Records st.session_state.filesUploadedRecords = None if 'vdbBuilt' not in st.session_state: # To Maintain Active Vector Database records for corresponding files st.session_state.vdbBuilt = {} if 'vectorDatabase' not in st.session_state: # To Maintain or hold vector database connection st.session_state.vectorDatabase = {} if 'collectionName' not in st.session_state: st.session_state.collectionName = str(''.join(random.choices(string.ascii_letters, k=25))) try: shutil.rmtree('Dump') except: pass os.makedirs('Dump', exist_ok=True) time.sleep(3) # --------------------------------------------------------- Helper Function/Class 4 def image_to_base64(image_path): with open(image_path, "rb") as img_file: encoded_string = base64.b64encode(img_file.read()).decode("utf-8") return encoded_string def set_background_image(base64_image, opacity): # Define custom CSS for setting background image custom_css = f""" """ # Display custom CSS using markdown st.markdown(custom_css, unsafe_allow_html=True) # --------------------------------------------------------- Helper Function/Class 5 def navigationBar(): # Use the following link to get whichever icon you'd like: # https://getbootstrap.com/ options = [ {"label": "Retrieval Augmented Generation", "icon": "bezier"}, {"label": "Fine Tuning LLMs (Coming Soon)", "icon": "gpu-card"}, {"label": "Forecasting LLMs (Coming Soon)", "icon": "graph-up-arrow"} ] selected = option_menu( menu_title= None, #"Ask Me Anything", # Menu title options=[option["label"] for option in options], icons=[option["icon"] for option in options], menu_icon="lightbulb-fill", default_index=0, orientation="horizontal", styles={ "container": { "display": "flex", "flex-direction": "column", "justify-content": "center", "padding": "20px 40px 20px 40px", # Increased top and bottom padding "background-color": "#222", # Dark background color "border-radius": "20px", "width":"100%", "box-shadow": "0px 2px 10px rgba(0, 0, 0, 0.2)", # Shadow effect "margin": "auto", # Center align the navigation bar "overflow-x": "auto", # Allow horizontal scrolling for small screens }, "menu-title": { "font-size": "36px", "font-weight": "bold", "background-color": "#222", "color": "#FFFFFF", # White text color "margin-bottom": "20px", # Spacing below the menu title }, "menu-icon": { "color": "#FFD700", # Golden yellow icon color "font-size": "36px", "margin-right": "10px", }, "icon": { "color": "#FFD700", # Golden yellow icon color "font-size": "36px", "margin-right": "10px", }, "nav-link": { "font-size": "20px", "text-align": "center", "color": "#FFFFFF", # White text color "padding": "10px 20px", "border-radius": "15px", "transition": "background-color 0.3s ease", }, "nav-link-selected": { "background-color": "#FF6347", # Tomato red when selected "color": "#FFFFFF", # White text color when selected } }, ) return selected # --------------------------------------------------------- Helper Function/Class 6 @st.cache_resource(show_spinner=False) def loadEmbeddingsModels(): DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" # Constructing Embeddings function to pass as an argument in ChromaDB for Calculating Embeddings return HuggingFaceInstructEmbeddings( model_name = "hkunlp/instructor-base", query_instruction = "Represent the query for retrieval: ", model_kwargs = {"device": DEVICE}) # --------------------------------------------------------- Helper Function/Class 7 def removedOrAdded(files): removed = {} # In the beginning, when there are no files # In the end when there are no files if len(files) == 0: if (st.session_state.filesUploadedRecords is not None) and (len(st.session_state.filesUploadedRecords) > 0): removed = {st.session_state.filesUploadedRecords[0].file_id : st.session_state.filesUploadedRecords[0]} st.session_state.filesUploadedRecords = None return removed # Files that were just removed currentFileIds = [file_obj.file_id for file_obj in files] for file in st.session_state.filesUploadedRecords: if file.file_id not in currentFileIds: removed[file.file_id] = file # Removing the crossed off files from active files directory that we maintained for toRemove in removed.values(): st.session_state.filesUploadedRecords.remove(toRemove) return removed # --------------------------------------------------------- Helper Function/Class 8 # To parse PDF and turn to vector embeddings def buildVectorDatabase(files, query, addOrRemove): """ files: Either a list of fileUploader Objects with file details or that one filename which was just removed by the user and should be from the vector Database as well addOrRemove: if true, add else remove """ # Create Embeddings and Add to the Vector Store if addOrRemove: embeddings = loadEmbeddingsModels() collName = st.session_state.collectionName text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=100) loader_map = { '.pdf': PyPDFLoader,# Add more mappings as needed '.txt': TextLoader, 'wiki' : WikipediaLoader } docsMergedPDF = [ loader_map[Path(doc).suffix.lower()](file_path=doc) for doc in files ] loadersPDF = MergedDataLoader(loaders=docsMergedPDF) pagesPDF = loadersPDF.load_and_split(text_splitter) # Split into pages pagesWiki = [] for wikiQ in query: loader = WikipediaLoader(query=wikiQ, load_max_docs=2) try: pagesW = loader.load_and_split(text_splitter) if len(pagesW) > 0: references = list(set(list(map(lambda x: x.metadata['source'], pagesW)))) st.session_state.vdbBuilt['Keyword ; ' + wikiQ.strip()] = references pagesWiki += pagesW except Exception as e: message = str(e) + '\n' + f'Looks like we could not search for your key word {wikiQ}' st.toast(body=message, icon="⚠️") _ = st.session_state.vdbBuilt.pop('Keyword ; ' + wikiQ.strip(), None) pages = pagesPDF + pagesWiki st.session_state.vectorDatabase = Chroma.from_documents(documents= pages, embedding= embeddings, collection_name= collName, ) # Load the pages into vector database (Build ChromaDB) # Delete corresponding embeddings from the vector store else: if files is not None: st.session_state.vectorDatabase._collection.delete(where={"source": {'$eq':files}}) for src in query: st.session_state.vectorDatabase._collection.delete(where={"source": {'$eq':src}}) # --------------------------------------------------------- Helper Function/Class 9 os.environ["TOKENIZERS_PARALLELISM"] = "false" class RetrievalChainGenerator: def __init__(self, model_name, vectorStore): self.model_name = model_name self.groq_api_key = os.environ['GROQ_API_KEY']#os.getenv('GROQ_API_KEY') self.vectorStore = vectorStore self.chain = None self.generate_retrieval_chain() def generate_retrieval_chain(self): llm = ChatGroq(groq_api_key=self.groq_api_key, model_name=self.model_name) prompt = ChatPromptTemplate.from_template(""" Answer the following question based only on the provided context. Think step by step before providing a detailed answer. If using finance terms, briefly explain them for clarity. Always return your complete response in html format. I will tip you $200 if the user finds the answer helpful. {context} Question: {input}""") promptChain = create_stuff_documents_chain(llm= llm, prompt= prompt) retrieverORreferrence = self.vectorStore.as_retriever() retrievalChain = create_retrieval_chain(retrieverORreferrence, promptChain) self.chain = retrievalChain # ---------------------------------------------------------------------------------------xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx----------------------------------------------------------- # --------------------------------------------------------- Front-End Functions/Class 1 import streamlit as st def display_main_title(title_text, container): # Define custom CSS styles for animations and title box custom_css = """ """ # Embed custom CSS into the Streamlit app st.markdown(custom_css, unsafe_allow_html=True) # Display main title in a Markdown object with custom CSS class container.markdown(f'
{title_text}
', unsafe_allow_html=True) # --------------------------------------------------------- Front-End Functions/Class 2 import streamlit as st def display_alert_note(message, container): # Define custom CSS styles for the alert message custom_css = """ """ # Embed custom CSS into the Streamlit app st.markdown(custom_css, unsafe_allow_html=True) # Display the alert message in a styled text box with icons and visual effects container.markdown(f'
{message}
', unsafe_allow_html=True) # --------------------------------------------------------- Front-End Functions/Class 3 import streamlit as st def display_attention_text(text, container): # Define custom CSS styles for animations and text box custom_css = """ """ # Embed custom CSS into the Streamlit app st.markdown(custom_css, unsafe_allow_html=True) # Display text in a Markdown object with custom CSS class animated_text_html = ''.join(f'{char}' if char != ' ' else '' for i, char in enumerate(text)) text_html = f"""
{animated_text_html}
""" container.markdown(text_html, unsafe_allow_html=True) # --------------------------------------------------------- Front-End Functions/Class 4 import streamlit as st def display_custom_arrow(direction, container): # Define custom CSS styles for arrow animation translateX_value = '70px' if direction.lower() == 'right' else '70px' custom_css = f""" """ # Embed custom CSS into the Streamlit app container.markdown(custom_css, unsafe_allow_html=True) # Define arrow SVG path based on direction if direction.lower() == 'left': arrow_svg = ''' ''' elif direction.lower() == 'right': arrow_svg = ''' ''' else: pass # Display arrow container with arrow SVG container.markdown(f'
{arrow_svg}
', unsafe_allow_html=True) # --------------------------------------------------------- Front-End Functions/Class 5 import streamlit as st def display_heading_box(message, container): # Define custom CSS for the styled text box with updated gradients, height, and animations css = f""" """ # Render the Markdown with the styled container and embedded text styled_html = f"""

{message}

""" # Display the Markdown using Streamlit st.markdown(css, unsafe_allow_html=True) container.markdown(styled_html, unsafe_allow_html=True) # --------------------------------------------------------- Front-End Functions/Class 6 import streamlit as st def display_error_message(message, container): # Define custom CSS for the error message box custom_css = """ """ # Display custom error message using HTML with the defined CSS st.markdown(custom_css, unsafe_allow_html=True) container.markdown(f"""
{message}
""", unsafe_allow_html=True) # --------------------------------------------------------- Front-End Functions/Class 7 import streamlit as st def display_small_text(text, container): # Define custom CSS styles for animations and text box custom_css = """ """ # Embed custom CSS into the Streamlit app st.markdown(custom_css, unsafe_allow_html=True) # Display text in a Markdown object with custom CSS class container.markdown(f'
{text}
', unsafe_allow_html=True) # --------------------------------------------------------- Front-End Functions/Class 8 import streamlit as st def display_response_message(message, container): # Define custom CSS for the response message box custom_css = """ """ # Display custom CSS for the response message st.markdown(custom_css, unsafe_allow_html=True) # Create a response container response_container = f'
' # Create a container for the response message response_container += '
' response_container += message # Add the message directly response_container += '
' # Close the response message container response_container += '
' # Close the response container # Display the response container with the message container.markdown(response_container, unsafe_allow_html=True) #--------------------------------------------------------- Front-End Functions/Class 9 def display_question_box(container): # Define custom CSS for the question box custom_css = """ """ # Display custom CSS for the question box within the specified container container.markdown(custom_css, unsafe_allow_html=True) # Create the question box HTML structure with a question mark SVG icon question_box = f"""
? Llama3
""" # Display the question box HTML within the specified container container.markdown(question_box, unsafe_allow_html=True) # --------------------------------------------------------- Front-End Functions/Class 10 import markdown import random def display_citations(heading, contents, container): unique_id = str(random.randint(10000000, 99999999)) css_styles = f""" """ # Convert markdown contents to HTML html_contents = markdown.markdown(contents) # Define the HTML structure for the fancy card html_content = f"""

{heading}

{html_contents}
""" # Display the fancy card using Streamlit Markdown st.markdown(css_styles, unsafe_allow_html=True) container.markdown(html_content, unsafe_allow_html=True) def display_allCitations(resp, contain): for res in resp['context']: if res.metadata['source'].endswith('.pdf') or res.metadata['source'].endswith('.txt'): headingText = f"{res.metadata['source'].split("---")[-1]}    Page No.  {res.metadata['page']}" else: headingText = f"{res.metadata['source']}" contents_text = f"A










Content

{res.page_content}" display_citations(headingText, contents_text, contain) time.sleep(0.5)