File size: 8,206 Bytes
457c97d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import google.generativeai as genai
from pinecone import Pinecone, ServerlessSpec
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_pinecone import PineconeVectorStore
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_core.documents import Document
import io
import PyPDF2
import pandas as pd
import logging
import asyncio
from dotenv import load_dotenv
import os
import uuid

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

# Configure Gemini API
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)

# Initialize Pinecone
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
pc = Pinecone(api_key=PINECONE_API_KEY)
cloud = os.environ.get('PINECONE_CLOUD', 'aws')
region = os.environ.get('PINECONE_REGION', 'us-east-1')
spec = ServerlessSpec(cloud=cloud, region=region)

# Define index name and embedding dimension
index_name = "rag-donor-index"
embedding_dimension = 768  # For text-embedding-004

# Check if index exists, create if not
if index_name not in pc.list_indexes().names():
    logger.info(f"Creating Pinecone index: {index_name}")
    pc.create_index(
        name=index_name,
        dimension=embedding_dimension,
        metric="cosine",
        spec=spec
    )
    # Wait for index to be ready
    while not pc.describe_index(index_name).status['ready']:
        asyncio.sleep(1)

logger.info(f"Pinecone index {index_name} is ready.")

# Initialize embeddings
embeddings = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004", google_api_key=GEMINI_API_KEY)

# Function to process uploaded file (PDF, text, CSV, or XLSX) without saving locally
def process_uploaded_file(file_stream, filename):
    logger.info(f"Processing uploaded file: {filename}")
    try:
        if filename.lower().endswith('.pdf'):
            logger.info("Processing as PDF file.")
            pdf_reader = PyPDF2.PdfReader(file_stream)
            text = ""
            for page in pdf_reader.pages:
                text += page.extract_text() or ""
            
            # Split PDF content into chunks
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=500,
                chunk_overlap=100
            )
            chunks = text_splitter.split_text(text)
            documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks]
            logger.info(f"Extracted {len(documents)} chunks from PDF.")
            return documents
        
        elif filename.lower().endswith(('.txt', '.md')):
            logger.info("Processing as text file.")
            content = file_stream.read().decode('utf-8', errors='replace')
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=500,
                chunk_overlap=100
            )
            chunks = text_splitter.split_text(content)
            documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks]
            logger.info(f"Extracted {len(documents)} chunks from text file.")
            return documents
        
        elif filename.lower().endswith('.csv'):
            logger.info("Processing as CSV file.")
            df = pd.read_csv(file_stream)
            # Convert DataFrame to string representation
            text = df.to_string()
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=500,
                chunk_overlap=100
            )
            chunks = text_splitter.split_text(text)
            documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks]
            logger.info(f"Extracted {len(documents)} chunks from CSV.")
            return documents
        
        elif filename.lower().endswith('.xlsx'):
            logger.info("Processing as XLSX file.")
            df = pd.read_excel(file_stream, engine='openpyxl')
            # Convert DataFrame to string representation
            text = df.to_string()
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=500,
                chunk_overlap=100
            )
            chunks = text_splitter.split_text(text)
            documents = [Document(page_content=chunk, metadata={"source": filename, "chunk_id": str(uuid.uuid4())}) for chunk in chunks]
            logger.info(f"Extracted {len(documents)} chunks from XLSX.")
            return documents
        
        else:
            raise ValueError("Unsupported file type. Only PDF, text, CSV, and XLSX files are supported.")
    
    except Exception as e:
        logger.error(f"Error processing file {filename}: {str(e)}")
        raise Exception(f"Error processing file: {str(e)}")

# Function to index documents in Pinecone
def index_documents(documents, namespace="chatbot-knowledge", batch_size=50):
    logger.info(f"Indexing {len(documents)} documents in Pinecone.")
    try:
        vector_store = PineconeVectorStore(
            index_name=index_name,
            embedding=embeddings,
            namespace=namespace
        )
        
        # Batch documents to avoid Pinecone size limits
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            batch_size_bytes = sum(len(doc.page_content.encode('utf-8')) for doc in batch)
            if batch_size_bytes > 4_000_000:
                logger.warning(f"Batch size {batch_size_bytes} bytes exceeds Pinecone limit. Reducing batch size.")
                smaller_batch_size = batch_size // 2
                for j in range(0, len(batch), smaller_batch_size):
                    smaller_batch = batch[j:j + smaller_batch_size]
                    vector_store.add_documents(smaller_batch)
                    logger.info(f"Indexed batch {j // smaller_batch_size + 1} of {len(batch) // smaller_batch_size + 1}")
            else:
                vector_store.add_documents(batch)
                logger.info(f"Indexed batch {i // batch_size + 1} of {len(documents) // batch_size + 1}")
        
        logger.info("Document indexing completed.")
        return vector_store
    
    except Exception as e:
        logger.error(f"Error indexing documents: {e}")
        raise Exception(f"Error indexing documents: {e}")

# RAG chatbot function
def rag_chatbot(query, namespace="chatbot-knowledge"):
    logger.info(f"Processing query: {query}")
    try:
        # Initialize vector store
        vector_store = PineconeVectorStore(
            index_name=index_name,
            embedding=embeddings,
            namespace=namespace
        )
        
        # Retrieve relevant documents
        relevant_docs_with_scores = vector_store.similarity_search_with_score(query, k=3)
        for doc, score in relevant_docs_with_scores:
            logger.info(f"Score: {score:.4f} | Document: {doc.page_content}")
        
        # Combine context from retrieved documents
        context = "\n".join([doc.page_content for doc, score in relevant_docs_with_scores])
        
        # Create prompt for Gemini
        prompt = f"""You are a helpful chatbot that answers questions based on provided context.

Context:

{context}



User Query: {query}



Provide a concise and accurate answer based on the context. If the context doesn't contain relevant information, say so and provide a general response if applicable.

"""
        
        # Initialize Gemini model
        model = genai.GenerativeModel('gemini-1.5-flash')
        
        # Generate response
        response = model.generate_content(prompt)
        logger.info("Generated response successfully.")
        return response.text
    
    except Exception as e:
        logger.error(f"Error processing query: {e}")
        return f"Error processing query: {e}"