Spaces:
Sleeping
Sleeping
# Importing necessary libraries | |
import sys | |
import os | |
import time | |
# # Importing RecursiveUrlLoader for web scraping and BeautifulSoup for HTML parsing | |
# from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader | |
# from bs4 import BeautifulSoup as Soup | |
# import mimetypes | |
# # List of URLs to scrape | |
# urls = ["https://langchain-doc.readthedocs.io/en/latest"] | |
# # Initialize an empty list to store the documents | |
# docs = [] | |
# # Looping through each URL in the list - this could take some time! | |
# stf = time.time() # Start time for performance measurement | |
# for url in urls: | |
# try: | |
# st = time.time() # Start time for performance measurement | |
# # Create a RecursiveUrlLoader instance with a specified URL and depth | |
# # The extractor function uses BeautifulSoup to parse the HTML content and extract text | |
# loader = RecursiveUrlLoader(url=url, max_depth=5, extractor=lambda x: Soup(x, "html.parser").text) | |
# # Load the documents from the URL and extend the docs list | |
# docs.extend(loader.load()) | |
# et = time.time() - st # Calculate time taken for splitting | |
# print(f'Time taken for downloading documents from {url}: {et} seconds.') | |
# except Exception as e: | |
# # Print an error message if there is an issue with loading or parsing the URL | |
# print(f"Failed to load or parse the URL {url}. Error: {e}", file=sys.stderr) | |
# etf = time.time() - stf # Calculate time taken for splitting | |
# print(f'Total time taken for downloading {len(docs)} documents: {etf} seconds.') | |
# # Import necessary modules for text splitting and vectorization | |
# from langchain.text_splitter import RecursiveCharacterTextSplitter | |
# import time | |
# from langchain_community.vectorstores import FAISS | |
# from langchain.vectorstores.utils import filter_complex_metadata | |
# from langchain_community.embeddings import HuggingFaceEmbeddings | |
# # Configure the text splitter | |
# text_splitter = RecursiveCharacterTextSplitter( | |
# separators=["\n\n", "\n", "(?<=\. )", " ", ""], # Define the separators for splitting text | |
# chunk_size=500, # The size of each text chunk | |
# chunk_overlap=50, # Overlap between chunks to ensure continuity | |
# length_function=len, # Function to determine the length of each chunk | |
# ) | |
# try: | |
# # Stage one: Splitting the documents into chunks for vectorization | |
# st = time.time() # Start time for performance measurement | |
# print('Loading documents and creating chunks ...') | |
# # Split each document into chunks using the configured text splitter | |
# chunks = text_splitter.create_documents([doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs]) | |
# et = time.time() - st # Calculate time taken for splitting | |
# print(f"created "+chunks+" chunks") | |
# print(f'Time taken for document chunking: {et} seconds.') | |
# except Exception as e: | |
# print(f"Error during document chunking: {e}", file=sys.stderr) | |
# # Path for saving the FAISS index | |
# FAISS_INDEX_PATH = "./vectorstore/lc-faiss-multi-mpnet-500" | |
# try: | |
# # Stage two: Vectorization of the document chunks | |
# model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1" # Model used for embedding | |
# # Initialize HuggingFace embeddings with the specified model | |
# embeddings = HuggingFaceEmbeddings(model_name=model_name) | |
# print(f'Loading chunks into vector store ...') | |
# st = time.time() # Start time for performance measurement | |
# # Create a FAISS vector store from the document chunks and save it locally | |
# db = FAISS.from_documents(filter_complex_metadata(chunks), embeddings) | |
# db.save_local(FAISS_INDEX_PATH) | |
# et = time.time() - st # Calculate time taken for vectorization | |
# print(f'Time taken for vectorization and saving: {et} seconds.') | |
# except Exception as e: | |
# print(f"Error during vectorization or FAISS index saving: {e}", file=sys.stderr) | |
# alternatively download a preparaed vectorized index from S3 and load the index into vectorstore | |
# Import necessary libraries for AWS S3 interaction, file handling, and FAISS vector stores | |
import boto3 | |
from botocore import UNSIGNED | |
from botocore.client import Config | |
import zipfile | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from dotenv import load_dotenv | |
# Load environment variables from a .env file | |
config = load_dotenv(".env") | |
# Retrieve the Hugging Face API token from environment variables | |
HUGGINGFACEHUB_API_TOKEN = os.getenv('HUGGINGFACEHUB_API_TOKEN') | |
S3_LOCATION = os.getenv("S3_LOCATION") | |
try: | |
# Initialize an S3 client with unsigned configuration for public access | |
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) | |
# Define the FAISS index path and the destination for the downloaded file | |
FAISS_INDEX_PATH = './vectorstore/lc-faiss-multi-mpnet-500-markdown' | |
VS_DESTINATION = FAISS_INDEX_PATH + ".zip" | |
# Download the pre-prepared vectorized index from the S3 bucket | |
print("Downloading the pre-prepared vectorized index from S3...") | |
s3.download_file(S3_LOCATION, 'vectorstores/lc-faiss-multi-mpnet-500-markdown.zip', VS_DESTINATION) | |
# Extract the downloaded zip file | |
with zipfile.ZipFile(VS_DESTINATION, 'r') as zip_ref: | |
zip_ref.extractall('./vectorstore/') | |
print("Download and extraction completed.") | |
except Exception as e: | |
print(f"Error during downloading or extracting from S3: {e}", file=sys.stderr) | |
# Define the model name for embeddings | |
model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1" | |
try: | |
# Initialize HuggingFace embeddings with the specified model | |
embeddings = HuggingFaceEmbeddings(model_name=model_name) | |
# Load the local FAISS index with the specified embeddings | |
db = FAISS.load_local(FAISS_INDEX_PATH, embeddings) | |
print("FAISS index loaded successfully.") | |
except Exception as e: | |
print(f"Error during FAISS index loading: {e}", file=sys.stderr) | |
# Import necessary modules for environment variable management and HuggingFace integration | |
from langchain_community.llms import HuggingFaceHub | |
# Initialize the vector store as a retriever for the RAG pipeline | |
retriever = db.as_retriever() | |
try: | |
# Load the model from the Hugging Face Hub | |
model_id = HuggingFaceHub(repo_id="mistralai/Mixtral-8x7B-Instruct-v0.1", model_kwargs={ | |
"temperature": 0.1, # Controls randomness in response generation (lower value means less random) | |
"max_new_tokens": 1024, # Maximum number of new tokens to generate in responses | |
"repetition_penalty": 1.2, # Penalty for repeating the same words (higher value increases penalty) | |
"return_full_text": False # If False, only the newly generated text is returned; if True, the input is included as well | |
}) | |
print("Model loaded successfully from Hugging Face Hub.") | |
except Exception as e: | |
print(f"Error loading model from Hugging Face Hub: {e}", file=sys.stderr) | |
# Importing necessary modules for retrieval-based question answering and prompt handling | |
from langchain.chains import RetrievalQA | |
from langchain.prompts import PromptTemplate | |
from langchain.memory import ConversationBufferMemory | |
# Declare a global variable 'qa' for the retrieval-based question answering system | |
global qa | |
# Define a prompt template for guiding the model's responses | |
template = """ | |
You are the friendly documentation buddy Arti, if you don't know the answer say 'I don't know' and don't make things up.\ | |
Use the following context (delimited by <ctx></ctx>) and the chat history (delimited by <hs></hs>) to answer the question : | |
------ | |
<ctx> | |
{context} | |
</ctx> | |
------ | |
<hs> | |
{history} | |
</hs> | |
------ | |
{question} | |
Answer: | |
""" | |
# Create a PromptTemplate object with specified input variables and the defined template | |
prompt = PromptTemplate.from_template( | |
#input_variables=["history", "context", "question"], # Variables to be included in the prompt | |
template=template, # The prompt template as defined above | |
) | |
prompt.format(context="context", history="history", question="question") | |
# Create a memory buffer to manage conversation history | |
memory = ConversationBufferMemory( | |
memory_key="history", # Key for storing the conversation history | |
input_key="question" # Key for the input question | |
) | |
# Initialize the RetrievalQA object with the specified model, retriever, and additional configurations | |
qa = RetrievalQA.from_chain_type( | |
llm=model_id, # Language model loaded from Hugging Face Hub | |
retriever=retriever, # The vector store retriever initialized earlier | |
return_source_documents=True, # Option to return source documents along with responses | |
chain_type_kwargs={ | |
"verbose": True, # Enables verbose output for debugging and analysis | |
"memory": memory, # Memory buffer for managing conversation history | |
"prompt": prompt # Prompt template for guiding the model's responses | |
} | |
) | |
# Import Gradio for UI, along with other necessary libraries | |
import gradio as gr | |
import random | |
import time | |
# Function to add a new input to the chat history | |
def add_text(history, text): | |
# Append the new text to the history with a placeholder for the response | |
history = history + [(text, None)] | |
return history, "" | |
# Function representing the bot's response mechanism | |
def bot(history): | |
# Obtain the response from the 'infer' function using the latest input | |
response = infer(history[-1][0], history) | |
# Update the history with the bot's response | |
history[-1][1] = response['result'] | |
return history | |
# Function to infer the response using the RAG model | |
def infer(question, history): | |
# Use the question and history to query the RAG model | |
result = qa({"query": question, "history": history, "question": question}) | |
return result | |
# CSS styling for the Gradio interface | |
css = """ | |
#col-container {max-width: 700px; margin-left: auto; margin-right: auto;} | |
""" | |
# HTML content for the Gradio interface title | |
title = """ | |
<div style="text-align: center;max-width: 700px;"> | |
<h1>Chat with your Documentation</h1> | |
<p style="text-align: center;">Chat with LangChain Documentation, <br /> | |
You can ask questions about the LangChain docu ;)</p> | |
</div> | |
""" | |
# Building the Gradio interface | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(elem_id="col-container"): | |
gr.HTML(title) # Add the HTML title to the interface | |
chatbot = gr.Chatbot([], elem_id="chatbot") # Initialize the chatbot component | |
clear = gr.Button("Clear") # Add a button to clear the chat | |
# Create a row for the question input | |
with gr.Row(): | |
question = gr.Textbox(label="Question", placeholder="Type your question and hit Enter ") | |
# Define the action when the question is submitted | |
question.submit(add_text, [chatbot, question], [chatbot, question], queue=False).then( | |
bot, chatbot, chatbot | |
) | |
# Define the action for the clear button | |
clear.click(lambda: None, None, chatbot, queue=False) | |
# Launch the Gradio demo interface | |
demo.launch(share=False) | |