newvoice / chatbot.py
AiDeveloper1's picture
Upload 5 files
457c97d verified
import google.generativeai as genai
from pinecone import Pinecone, ServerlessSpec
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_pinecone import PineconeVectorStore
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_core.documents import Document
import io
import PyPDF2
import pandas as pd
import logging
import asyncio
from dotenv import load_dotenv
import os
import uuid
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Configure Gemini API
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)
# Initialize Pinecone
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
pc = Pinecone(api_key=PINECONE_API_KEY)
cloud = os.environ.get('PINECONE_CLOUD', 'aws')
region = os.environ.get('PINECONE_REGION', 'us-east-1')
spec = ServerlessSpec(cloud=cloud, region=region)
# Define index name and embedding dimension
index_name = "rag-donor-index"
embedding_dimension = 768 # For text-embedding-004
# Check if index exists, create if not
if index_name not in pc.list_indexes().names():
logger.info(f"Creating Pinecone index: {index_name}")
pc.create_index(
name=index_name,
dimension=embedding_dimension,
metric="cosine",
spec=spec
)
# Wait for index to be ready
while not pc.describe_index(index_name).status['ready']:
asyncio.sleep(1)
logger.info(f"Pinecone index {index_name} is ready.")
# Initialize embeddings
embeddings = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004", google_api_key=GEMINI_API_KEY)
# Function to process uploaded file (PDF, text, CSV, or XLSX) without saving locally
def process_uploaded_file(file_stream, filename):
logger.info(f"Processing uploaded file: {filename}")
try:
if filename.lower().endswith('.pdf'):
logger.info("Processing as PDF file.")
pdf_reader = PyPDF2.PdfReader(file_stream)
text = ""
for page in pdf_reader.pages:
text += page.extract_text() or ""
# Split PDF content into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100
)
chunks = text_splitter.split_text(text)
documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks]
logger.info(f"Extracted {len(documents)} chunks from PDF.")
return documents
elif filename.lower().endswith(('.txt', '.md')):
logger.info("Processing as text file.")
content = file_stream.read().decode('utf-8', errors='replace')
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100
)
chunks = text_splitter.split_text(content)
documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks]
logger.info(f"Extracted {len(documents)} chunks from text file.")
return documents
elif filename.lower().endswith('.csv'):
logger.info("Processing as CSV file.")
df = pd.read_csv(file_stream)
# Convert DataFrame to string representation
text = df.to_string()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100
)
chunks = text_splitter.split_text(text)
documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks]
logger.info(f"Extracted {len(documents)} chunks from CSV.")
return documents
elif filename.lower().endswith('.xlsx'):
logger.info("Processing as XLSX file.")
df = pd.read_excel(file_stream, engine='openpyxl')
# Convert DataFrame to string representation
text = df.to_string()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100
)
chunks = text_splitter.split_text(text)
documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks]
logger.info(f"Extracted {len(documents)} chunks from XLSX.")
return documents
else:
raise ValueError("Unsupported file type. Only PDF, text, CSV, and XLSX files are supported.")
except Exception as e:
logger.error(f"Error processing file {filename}: {str(e)}")
raise Exception(f"Error processing file: {str(e)}")
# Function to index documents in Pinecone
def index_documents(documents, namespace="chatbot-knowledge", batch_size=50):
logger.info(f"Indexing {len(documents)} documents in Pinecone.")
try:
vector_store = PineconeVectorStore(
index_name=index_name,
embedding=embeddings,
namespace=namespace
)
# Batch documents to avoid Pinecone size limits
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
batch_size_bytes = sum(len(doc.page_content.encode('utf-8')) for doc in batch)
if batch_size_bytes > 4_000_000:
logger.warning(f"Batch size {batch_size_bytes} bytes exceeds Pinecone limit. Reducing batch size.")
smaller_batch_size = batch_size // 2
for j in range(0, len(batch), smaller_batch_size):
smaller_batch = batch[j:j + smaller_batch_size]
vector_store.add_documents(smaller_batch)
logger.info(f"Indexed batch {j // smaller_batch_size + 1} of {len(batch) // smaller_batch_size + 1}")
else:
vector_store.add_documents(batch)
logger.info(f"Indexed batch {i // batch_size + 1} of {len(documents) // batch_size + 1}")
logger.info("Document indexing completed.")
return vector_store
except Exception as e:
logger.error(f"Error indexing documents: {e}")
raise Exception(f"Error indexing documents: {e}")
# RAG chatbot function
def rag_chatbot(query, namespace="chatbot-knowledge"):
logger.info(f"Processing query: {query}")
try:
# Initialize vector store
vector_store = PineconeVectorStore(
index_name=index_name,
embedding=embeddings,
namespace=namespace
)
# Retrieve relevant documents
relevant_docs_with_scores = vector_store.similarity_search_with_score(query, k=3)
for doc, score in relevant_docs_with_scores:
logger.info(f"Score: {score:.4f} | Document: {doc.page_content}")
# Combine context from retrieved documents
context = "\n".join([doc.page_content for doc, score in relevant_docs_with_scores])
# Create prompt for Gemini
prompt = f"""You are a helpful chatbot that answers questions based on provided context.
Context:
{context}
User Query: {query}
Provide a concise and accurate answer based on the context. If the context doesn't contain relevant information, say so and provide a general response if applicable.
"""
# Initialize Gemini model
model = genai.GenerativeModel('gemini-1.5-flash')
# Generate response
response = model.generate_content(prompt)
logger.info("Generated response successfully.")
return response.text
except Exception as e:
logger.error(f"Error processing query: {e}")
return f"Error processing query: {e}"