Spaces:
Runtime error
Runtime error
from datasets import load_dataset | |
# Load 70% of the Wikipedia dataset | |
# dataset = load_dataset('wikimedia/wikipedia', "20231101.en", split='train[:70%]') | |
dataset = load_dataset('lucadiliello/wikipedia_512_pretraining',split = 'train[:70%]') | |
# from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig | |
# # Define the quantization configuration for 4-bit | |
# quantization_config = BitsAndBytesConfig( | |
# load_in_4bit=True, # Enable 4-bit precision | |
# bnb_4bit_quant_type="nf4", # Use the NF4 quantization type (good for reducing memory) | |
# bnb_4bit_use_double_quant=True, # Enables double quantization to improve accuracy | |
# bnb_4bit_compute_dtype="float16" # Use float16 for faster computation | |
# ) | |
# # Load the tokenizer | |
# tokenizer = AutoTokenizer.from_pretrained('TinyLlama/TinyLlama-1.1B-Chat-v1.0') | |
# # Load the model with the quantization configuration | |
# model = AutoModelForCausalLM.from_pretrained( | |
# 'TinyLlama/TinyLlama-1.1B-Chat-v1.0', | |
# quantization_config=quantization_config, # Apply the 4-bit quantization config | |
# device_map='auto' # Automatically map model to available devices (e.g., GPU/CPU) | |
# ) | |
# # Enable gradient checkpointing to reduce memory usage during training | |
# model.gradient_checkpointing_enable() | |
########################################################### gpt2 #################################################### | |
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig | |
# Define the quantization configuration for 4-bit | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, # Enable 4-bit precision | |
bnb_4bit_quant_type="nf4", # Use the NF4 quantization type (good for reducing memory) | |
bnb_4bit_use_double_quant=True, # Enables double quantization to improve accuracy | |
bnb_4bit_compute_dtype="float16" # Use float16 for faster computation | |
) | |
# Load the tokenizer | |
tokenizer = AutoTokenizer.from_pretrained('gpt2') | |
# Load the model with the quantization configuration | |
model = AutoModelForCausalLM.from_pretrained( | |
'gpt2', | |
quantization_config=quantization_config, # Apply the 4-bit quantization config | |
device_map='auto' # Automatically map model to available devices (e.g., GPU/CPU) | |
) | |
# Enable gradient checkpointing to reduce memory usage during training | |
model.gradient_checkpointing_enable() | |
from peft import LoraConfig, get_peft_model | |
import bitsandbytes as bnb | |
# Configure PEFT with 4-bit precision | |
# lora_config = LoraConfig(r=16, lora_alpha=32, target_modules=["q_proj", "v_proj"], lora_dropout=0.05, bias="none") | |
lora_config = LoraConfig(r=16, lora_alpha=32, target_modules=["attn.c_attn", "mlp.c_fc", "mlp.c_proj"], lora_dropout=0.05, bias="none") | |
peft_model = get_peft_model(model, lora_config) | |
# Set the pad token (using eos_token or adding a new special token) | |
if tokenizer.pad_token is None: | |
# Option 1: Use eos_token as pad_token | |
tokenizer.pad_token = tokenizer.eos_token | |
# Option 2: Add [PAD] as a new pad token if needed | |
# tokenizer.add_special_tokens({'pad_token': '[PAD]'}) | |
# Tokenize the dataset with optimized settings | |
def tokenize_function(examples): | |
return tokenizer(examples['text'], truncation=True, padding='max_length', max_length=150) | |
tokenized_dataset = dataset.select(range(100000)).map(tokenize_function, batched=True) | |
def prepare_labels(batch): | |
batch["labels"] = batch["input_ids"].copy() # Copy input_ids as labels for language modeling | |
return batch | |
# Apply the transformation to add labels | |
tokenized_dataset = tokenized_dataset.map(prepare_labels, batched=True) | |
# Step 1: Install FAISS for the Vector Database | |
from datasets import Dataset | |
from transformers import AutoModel, AutoTokenizer | |
import faiss | |
import numpy as np | |
from tqdm import tqdm # Import tqdm for progress bar | |
# Load your tokenizer and model | |
embedding_model_name = "sentence-transformers/all-mpnet-base-v2" | |
embedding_model = AutoModel.from_pretrained(embedding_model_name) | |
embedding_tokenizer = AutoTokenizer.from_pretrained(embedding_model_name) | |
# Move the model to GPU if available | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
embedding_model.to(device) | |
# Function to generate embeddings in batches | |
def embed_text_batch(texts, batch_size=16): | |
all_embeddings = [] | |
for i in tqdm(range(0, len(texts), batch_size), desc="Generating embeddings"): | |
batch_texts = texts[i:i + batch_size] | |
# Tokenize and move inputs to the GPU | |
inputs = embedding_tokenizer(batch_texts, padding=True, truncation=True, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
# Generate embeddings and move them back to CPU | |
embeddings = embedding_model(**inputs).last_hidden_state.mean(dim=1).cpu().numpy() # Mean pooling | |
all_embeddings.extend(embeddings) | |
return np.array(all_embeddings) | |
# Step 1: Process the dataset in batches | |
texts = tokenized_dataset["text"] | |
batch_size = 16 # Adjust based on Colab memory | |
embeddings = embed_text_batch(texts, batch_size=batch_size) | |
# Step 2: Add embeddings as a new column to the dataset | |
tokenized_dataset = tokenized_dataset.add_column("embeddings", embeddings.tolist()) | |
# Step 3: Add FAISS index | |
dimension = embeddings.shape[1] # Dimension of embeddings | |
faiss_index = faiss.IndexFlatL2(dimension) | |
# Step 4: Add embeddings to FAISS index | |
faiss_index.add(embeddings) | |
# Step 5: Save the dataset and FAISS index | |
tokenized_dataset.save_to_disk("wikipedia_dataset_with_embeddings") | |
faiss.write_index(faiss_index, "wikipedia_faiss.index") | |
print("FAISS index and dataset saved successfully.") | |
def embed_query(query): | |
# Tokenize and embed the query | |
inputs = embedding_tokenizer([query], padding=True, truncation=True, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
query_embedding = embedding_model(**inputs).last_hidden_state.mean(dim=1).cpu().numpy() | |
return query_embedding | |
def search_faiss(query_embedding, faiss_index, top_k=5): | |
# Search the FAISS index | |
distances, indices = faiss_index.search(query_embedding, top_k) | |
return distances, indices | |
def get_top_answer(indices, dataset): | |
# Retrieve the top answer(s) from the dataset based on the indices | |
return dataset["text"][indices[0][0]] # Assuming top result, can adjust for more answers | |
import torch | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
import faiss | |
import numpy as np | |
# Assuming embeddings and faiss_index are already created as in your previous code | |
# Load the pre-trained LLM for generation (you can replace it with a different one) | |
llm_model_name = "facebook/bart-large-cnn" # Example: You can use GPT-3, BART, T5, etc. | |
llm_model = AutoModelForSeq2SeqLM.from_pretrained(llm_model_name) | |
llm_tokenizer = AutoTokenizer.from_pretrained(llm_model_name) | |
# Move model to GPU if available | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
llm_model.to(device) | |
# Embedding model used for creating the vector database (same as the one used to generate embeddings for dataset) | |
embedding_model_name = "sentence-transformers/all-mpnet-base-v2" | |
embedding_tokenizer = AutoTokenizer.from_pretrained(embedding_model_name) | |
embedding_model = AutoModel.from_pretrained(embedding_model_name) | |
embedding_model.to(device) | |
# Function to embed a query (same as before) | |
def embed_query(query): | |
inputs = embedding_tokenizer([query], padding=True, truncation=True, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
query_embedding = embedding_model(**inputs).last_hidden_state.mean(dim=1).cpu().numpy() | |
return query_embedding | |
# Function to search FAISS index and retrieve top k results | |
def search_faiss(query_embedding, faiss_index, top_k=5): | |
distances, indices = faiss_index.search(query_embedding, top_k) | |
return distances, indices | |
# Function to generate an answer using the LLM based on the retrieved documents | |
def generate_answer(query, retrieved_texts): | |
# Combine the query and the retrieved texts into a single input | |
context = " ".join(retrieved_texts) | |
input_text = f"Question: {query}\nContext: {context}\nAnswer:" | |
# Tokenize and pass to the LLM | |
inputs = llm_tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True).to(device) | |
with torch.no_grad(): | |
generated_ids = llm_model.generate(inputs['input_ids'], max_length=150) | |
# Decode the generated response | |
answer = llm_tokenizer.decode(generated_ids[0], skip_special_tokens=True) | |
return answer | |
# Function to retrieve the texts from the dataset based on FAISS index results | |
def get_retrieved_texts(indices, dataset, top_k=5): | |
retrieved_texts = [] | |
for idx in indices[0][:top_k]: # Get the top K results | |
retrieved_texts.append(dataset['text'][idx]) # Assuming 'text' is the relevant field in the dataset | |
return retrieved_texts | |
# Example usage | |
def rag_pipeline(question, faiss_index, dataset, top_k=3): | |
# Step 1: Embed the query | |
query_embedding = embed_query(question) | |
# Step 2: Search the FAISS index for the top K similar documents | |
distances, indices = search_faiss(query_embedding, faiss_index, top_k=top_k) | |
# Step 3: Retrieve the top K relevant documents from the dataset | |
retrieved_texts = get_retrieved_texts(indices, dataset, top_k=top_k) | |
# Step 4: Generate the answer using the retrieved texts and the LLM | |
answer = generate_answer(question, retrieved_texts) | |
return answer | |
# Import the necessary modules | |
from langchain_community.llms import Ollama | |
# Load the Ollama model | |
gen_model = Ollama(model="llama2") | |
# Define a function to get predefined responses for specific queries | |
def get_predefined_response(question): | |
predefined_responses = { | |
"hi": "Hello! How can I assist you today?", | |
"hello": "Hi there! 😊 What can I help you with?", | |
"who made you?": "I was created by Vinmay and his team.", | |
"what is your purpose?": "I'm here to assist you with educational queries and provide information.", | |
# Add more predefined responses as needed | |
} | |
# Normalize the question to make it case insensitive | |
normalized_question = question.lower() | |
return predefined_responses.get(normalized_question, None) | |
# Modify the generate_response function to check for predefined responses | |
def generate_response(markdown, question, user_instructions=None, max_new_tokens=250, temperature=0.9, top_p=0.95): | |
# Check for predefined response first | |
predefined_response = get_predefined_response(question) | |
if predefined_response: | |
return predefined_response | |
instruction_text = f" Please follow these instructions: {user_instructions}" if user_instructions else "" | |
prompt = ( | |
f"Using the provided context, please generate a unique and insightful answer that directly addresses the question:\n\n" | |
f"Context:\n{markdown}\n\n" | |
f"Question: {question}\n" | |
f"{instruction_text}\n" | |
f"If any personal query asked then refer{predefined_response}\n and based upon it, genarate your own answer" | |
f"Please synthesize your response by integrating the information with your own understanding: " | |
) | |
# Call the Ollama model using the `invoke` method | |
response = gen_model.invoke(prompt, max_tokens=max_new_tokens, temperature=temperature, top_p=top_p) | |
# Check if the response is a string (direct generated text) or a dictionary (with metadata) | |
if isinstance(response, str): | |
return response # Return the raw text if it's a string | |
elif isinstance(response, dict) and "choices" in response: | |
return response["choices"][0]["text"] # Extract the text from the structured response | |
else: | |
return "Unexpected response format." | |
# # Example usage | |
# markdown = "The sky appears blue due to the scattering of light by the atmosphere." | |
# question = "Hi" | |
# response = generate_response(markdown, question) | |
# print(f"Model Response: {response}") | |
import gradio as gr | |
from langchain_community.llms import Ollama | |
# Load the Ollama model | |
gen_model = Ollama(model="llama2") | |
# Define the manual responses | |
manual_responses = { | |
"hi": "Hello! How can I assist you today?", | |
"hello": "Hi there! What would you like to know?", | |
"who made you?": "I was created by OpenAI.", | |
"what is your purpose?": "I'm here to assist with educational queries!" | |
} | |
# Function to generate responses | |
def generate_response(user_input): | |
# Normalize user input for matching | |
normalized_input = user_input.lower().strip() | |
# Check for manual responses | |
if normalized_input in manual_responses: | |
return manual_responses[normalized_input] | |
# For other questions, generate a response using the model | |
prompt = f"Please provide a detailed answer to the following question:\n\nQuestion: {user_input}\n" | |
response = gen_model.invoke(prompt) | |
return response.strip() | |
# Create the Gradio interface | |
iface = gr.Interface( | |
fn=generate_response, | |
inputs=gr.Textbox(label="Ask a Question"), | |
outputs=gr.Textbox(label="Response"), | |
title="Q&A System", | |
description="Ask me anything and I will respond accordingly." | |
) | |
# Launch the Gradio app | |
if __name__ == "__main__": | |
iface.launch(share=True, inline = False) # Use share=True to make it public if needed | |