import google.generativeai as genai from pinecone import Pinecone, ServerlessSpec from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_pinecone import PineconeVectorStore from langchain_google_genai import GoogleGenerativeAIEmbeddings from langchain_core.documents import Document import io import PyPDF2 import pandas as pd import logging import asyncio from dotenv import load_dotenv import os import uuid # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Configure Gemini API GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") genai.configure(api_key=GEMINI_API_KEY) # Initialize Pinecone PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") pc = Pinecone(api_key=PINECONE_API_KEY) cloud = os.environ.get('PINECONE_CLOUD', 'aws') region = os.environ.get('PINECONE_REGION', 'us-east-1') spec = ServerlessSpec(cloud=cloud, region=region) # Define index name and embedding dimension index_name = "rag-donor-index" embedding_dimension = 768 # For text-embedding-004 # Check if index exists, create if not if index_name not in pc.list_indexes().names(): logger.info(f"Creating Pinecone index: {index_name}") pc.create_index( name=index_name, dimension=embedding_dimension, metric="cosine", spec=spec ) # Wait for index to be ready while not pc.describe_index(index_name).status['ready']: asyncio.sleep(1) logger.info(f"Pinecone index {index_name} is ready.") # Initialize embeddings embeddings = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004", google_api_key=GEMINI_API_KEY) # Function to process uploaded file (PDF, text, CSV, or XLSX) without saving locally def process_uploaded_file(file_stream, filename): logger.info(f"Processing uploaded file: {filename}") try: if filename.lower().endswith('.pdf'): logger.info("Processing as PDF file.") pdf_reader = PyPDF2.PdfReader(file_stream) text = "" for page in pdf_reader.pages: text += page.extract_text() or "" # Split PDF content into chunks text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=100 ) chunks = text_splitter.split_text(text) documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks] logger.info(f"Extracted {len(documents)} chunks from PDF.") return documents elif filename.lower().endswith(('.txt', '.md')): logger.info("Processing as text file.") content = file_stream.read().decode('utf-8', errors='replace') text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=100 ) chunks = text_splitter.split_text(content) documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks] logger.info(f"Extracted {len(documents)} chunks from text file.") return documents elif filename.lower().endswith('.csv'): logger.info("Processing as CSV file.") df = pd.read_csv(file_stream) # Convert DataFrame to string representation text = df.to_string() text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=100 ) chunks = text_splitter.split_text(text) documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks] logger.info(f"Extracted {len(documents)} chunks from CSV.") return documents elif filename.lower().endswith('.xlsx'): logger.info("Processing as XLSX file.") df = pd.read_excel(file_stream, engine='openpyxl') # Convert DataFrame to string representation text = df.to_string() text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=100 ) chunks = text_splitter.split_text(text) documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks] logger.info(f"Extracted {len(documents)} chunks from XLSX.") return documents else: raise ValueError("Unsupported file type. Only PDF, text, CSV, and XLSX files are supported.") except Exception as e: logger.error(f"Error processing file {filename}: {str(e)}") raise Exception(f"Error processing file: {str(e)}") # Function to index documents in Pinecone def index_documents(documents, namespace="chatbot-knowledge", batch_size=50): logger.info(f"Indexing {len(documents)} documents in Pinecone.") try: vector_store = PineconeVectorStore( index_name=index_name, embedding=embeddings, namespace=namespace ) # Batch documents to avoid Pinecone size limits for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] batch_size_bytes = sum(len(doc.page_content.encode('utf-8')) for doc in batch) if batch_size_bytes > 4_000_000: logger.warning(f"Batch size {batch_size_bytes} bytes exceeds Pinecone limit. Reducing batch size.") smaller_batch_size = batch_size // 2 for j in range(0, len(batch), smaller_batch_size): smaller_batch = batch[j:j + smaller_batch_size] vector_store.add_documents(smaller_batch) logger.info(f"Indexed batch {j // smaller_batch_size + 1} of {len(batch) // smaller_batch_size + 1}") else: vector_store.add_documents(batch) logger.info(f"Indexed batch {i // batch_size + 1} of {len(documents) // batch_size + 1}") logger.info("Document indexing completed.") return vector_store except Exception as e: logger.error(f"Error indexing documents: {e}") raise Exception(f"Error indexing documents: {e}") # RAG chatbot function def rag_chatbot(query, namespace="chatbot-knowledge"): logger.info(f"Processing query: {query}") try: # Initialize vector store vector_store = PineconeVectorStore( index_name=index_name, embedding=embeddings, namespace=namespace ) # Retrieve relevant documents relevant_docs_with_scores = vector_store.similarity_search_with_score(query, k=3) for doc, score in relevant_docs_with_scores: logger.info(f"Score: {score:.4f} | Document: {doc.page_content}") # Combine context from retrieved documents context = "\n".join([doc.page_content for doc, score in relevant_docs_with_scores]) # Create prompt for Gemini prompt = f"""You are a helpful chatbot that answers questions based on provided context. Context: {context} User Query: {query} Provide a concise and accurate answer based on the context. If the context doesn't contain relevant information, say so and provide a general response if applicable. """ # Initialize Gemini model model = genai.GenerativeModel('gemini-1.5-flash') # Generate response response = model.generate_content(prompt) logger.info("Generated response successfully.") return response.text except Exception as e: logger.error(f"Error processing query: {e}") return f"Error processing query: {e}"