# Importing necessary libraries import sys import os import time # # Importing RecursiveUrlLoader for web scraping and BeautifulSoup for HTML parsing # from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader # from bs4 import BeautifulSoup as Soup # import mimetypes # # List of URLs to scrape # urls = ["https://langchain-doc.readthedocs.io/en/latest"] # # Initialize an empty list to store the documents # docs = [] # # Looping through each URL in the list - this could take some time! # stf = time.time() # Start time for performance measurement # for url in urls: # try: # st = time.time() # Start time for performance measurement # # Create a RecursiveUrlLoader instance with a specified URL and depth # # The extractor function uses BeautifulSoup to parse the HTML content and extract text # loader = RecursiveUrlLoader(url=url, max_depth=5, extractor=lambda x: Soup(x, "html.parser").text) # # Load the documents from the URL and extend the docs list # docs.extend(loader.load()) # et = time.time() - st # Calculate time taken for splitting # print(f'Time taken for downloading documents from {url}: {et} seconds.') # except Exception as e: # # Print an error message if there is an issue with loading or parsing the URL # print(f"Failed to load or parse the URL {url}. Error: {e}", file=sys.stderr) # etf = time.time() - stf # Calculate time taken for splitting # print(f'Total time taken for downloading {len(docs)} documents: {etf} seconds.') # # Import necessary modules for text splitting and vectorization # from langchain.text_splitter import RecursiveCharacterTextSplitter # import time # from langchain_community.vectorstores import FAISS # from langchain.vectorstores.utils import filter_complex_metadata # from langchain_community.embeddings import HuggingFaceEmbeddings # # Configure the text splitter # text_splitter = RecursiveCharacterTextSplitter( # separators=["\n\n", "\n", "(?<=\. )", " ", ""], # Define the separators for splitting text # chunk_size=500, # The size of each text chunk # chunk_overlap=50, # Overlap between chunks to ensure continuity # length_function=len, # Function to determine the length of each chunk # ) # try: # # Stage one: Splitting the documents into chunks for vectorization # st = time.time() # Start time for performance measurement # print('Loading documents and creating chunks ...') # # Split each document into chunks using the configured text splitter # chunks = text_splitter.create_documents([doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs]) # et = time.time() - st # Calculate time taken for splitting # print(f"created "+chunks+" chunks") # print(f'Time taken for document chunking: {et} seconds.') # except Exception as e: # print(f"Error during document chunking: {e}", file=sys.stderr) # # Path for saving the FAISS index # FAISS_INDEX_PATH = "./vectorstore/lc-faiss-multi-mpnet-500" # try: # # Stage two: Vectorization of the document chunks # model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1" # Model used for embedding # # Initialize HuggingFace embeddings with the specified model # embeddings = HuggingFaceEmbeddings(model_name=model_name) # print(f'Loading chunks into vector store ...') # st = time.time() # Start time for performance measurement # # Create a FAISS vector store from the document chunks and save it locally # db = FAISS.from_documents(filter_complex_metadata(chunks), embeddings) # db.save_local(FAISS_INDEX_PATH) # et = time.time() - st # Calculate time taken for vectorization # print(f'Time taken for vectorization and saving: {et} seconds.') # except Exception as e: # print(f"Error during vectorization or FAISS index saving: {e}", file=sys.stderr) # alternatively download a preparaed vectorized index from S3 and load the index into vectorstore # Import necessary libraries for AWS S3 interaction, file handling, and FAISS vector stores import boto3 from botocore import UNSIGNED from botocore.client import Config import zipfile from langchain_community.vectorstores import FAISS from langchain_community.embeddings import HuggingFaceEmbeddings from dotenv import load_dotenv # Load environment variables from a .env file config = load_dotenv(".env") # Retrieve the Hugging Face API token from environment variables HUGGINGFACEHUB_API_TOKEN = os.getenv('HUGGINGFACEHUB_API_TOKEN') S3_LOCATION = os.getenv("S3_LOCATION") try: # Initialize an S3 client with unsigned configuration for public access s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) # Define the FAISS index path and the destination for the downloaded file FAISS_INDEX_PATH = './vectorstore/lc-faiss-multi-mpnet-500-markdown' VS_DESTINATION = FAISS_INDEX_PATH + ".zip" # Download the pre-prepared vectorized index from the S3 bucket print("Downloading the pre-prepared vectorized index from S3...") s3.download_file(S3_LOCATION, 'vectorstores/lc-faiss-multi-mpnet-500-markdown.zip', VS_DESTINATION) # Extract the downloaded zip file with zipfile.ZipFile(VS_DESTINATION, 'r') as zip_ref: zip_ref.extractall('./vectorstore/') print("Download and extraction completed.") except Exception as e: print(f"Error during downloading or extracting from S3: {e}", file=sys.stderr) # Define the model name for embeddings model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1" try: # Initialize HuggingFace embeddings with the specified model embeddings = HuggingFaceEmbeddings(model_name=model_name) # Load the local FAISS index with the specified embeddings db = FAISS.load_local(FAISS_INDEX_PATH, embeddings) print("FAISS index loaded successfully.") except Exception as e: print(f"Error during FAISS index loading: {e}", file=sys.stderr) # Import necessary modules for environment variable management and HuggingFace integration from langchain_community.llms import HuggingFaceHub # Initialize the vector store as a retriever for the RAG pipeline retriever = db.as_retriever() try: # Load the model from the Hugging Face Hub model_id = HuggingFaceHub(repo_id="mistralai/Mixtral-8x7B-Instruct-v0.1", model_kwargs={ "temperature": 0.1, # Controls randomness in response generation (lower value means less random) "max_new_tokens": 1024, # Maximum number of new tokens to generate in responses "repetition_penalty": 1.2, # Penalty for repeating the same words (higher value increases penalty) "return_full_text": False # If False, only the newly generated text is returned; if True, the input is included as well }) print("Model loaded successfully from Hugging Face Hub.") except Exception as e: print(f"Error loading model from Hugging Face Hub: {e}", file=sys.stderr) # Importing necessary modules for retrieval-based question answering and prompt handling from langchain.chains import RetrievalQA from langchain.prompts import PromptTemplate from langchain.memory import ConversationBufferMemory # Declare a global variable 'qa' for the retrieval-based question answering system global qa # Define a prompt template for guiding the model's responses template = """ You are the friendly documentation buddy Arti, if you don't know the answer say 'I don't know' and don't make things up.\ Use the following context (delimited by ) and the chat history (delimited by ) to answer the question : ------ {context} ------ {history} ------ {question} Answer: """ # Create a PromptTemplate object with specified input variables and the defined template prompt = PromptTemplate.from_template( #input_variables=["history", "context", "question"], # Variables to be included in the prompt template=template, # The prompt template as defined above ) prompt.format(context="context", history="history", question="question") # Create a memory buffer to manage conversation history memory = ConversationBufferMemory( memory_key="history", # Key for storing the conversation history input_key="question" # Key for the input question ) # Initialize the RetrievalQA object with the specified model, retriever, and additional configurations qa = RetrievalQA.from_chain_type( llm=model_id, # Language model loaded from Hugging Face Hub retriever=retriever, # The vector store retriever initialized earlier return_source_documents=True, # Option to return source documents along with responses chain_type_kwargs={ "verbose": True, # Enables verbose output for debugging and analysis "memory": memory, # Memory buffer for managing conversation history "prompt": prompt # Prompt template for guiding the model's responses } ) # Import Gradio for UI, along with other necessary libraries import gradio as gr import random import time # Function to add a new input to the chat history def add_text(history, text): # Append the new text to the history with a placeholder for the response history = history + [(text, None)] return history, "" # Function representing the bot's response mechanism def bot(history): # Obtain the response from the 'infer' function using the latest input response = infer(history[-1][0], history) # Update the history with the bot's response history[-1][1] = response['result'] return history # Function to infer the response using the RAG model def infer(question, history): # Use the question and history to query the RAG model result = qa({"query": question, "history": history, "question": question}) return result # CSS styling for the Gradio interface css = """ #col-container {max-width: 700px; margin-left: auto; margin-right: auto;} """ # HTML content for the Gradio interface title title = """

Chat with your Documentation

Chat with LangChain Documentation,
You can ask questions about the LangChain docu ;)

""" # Building the Gradio interface with gr.Blocks(css=css) as demo: with gr.Column(elem_id="col-container"): gr.HTML(title) # Add the HTML title to the interface chatbot = gr.Chatbot([], elem_id="chatbot") # Initialize the chatbot component clear = gr.Button("Clear") # Add a button to clear the chat # Create a row for the question input with gr.Row(): question = gr.Textbox(label="Question", placeholder="Type your question and hit Enter ") # Define the action when the question is submitted question.submit(add_text, [chatbot, question], [chatbot, question], queue=False).then( bot, chatbot, chatbot ) # Define the action for the clear button clear.click(lambda: None, None, chatbot, queue=False) # Launch the Gradio demo interface demo.launch(share=False)