ReallyFloppyPenguin's picture
Update app.py
41e4860 verified
import gradio as gr
import os
import json
import hashlib
import datetime
from typing import List, Dict, Any, Optional
import requests
import time
import uuid
from pinecone import Pinecone
class RAGMemorySystem:
"""RAG system using Pinecone with integrated inference for embeddings and vector storage"""
def __init__(self):
# Initialize Pinecone - use the hardcoded key or environment variable
self.pinecone_api_key = os.getenv("PINECONE_API_KEY", "pcsk_6Ydj5y_QqLzPNzMEh2NMJv5Crh5XVYTTTkZTHkWjQkZAiU5SDthzYZW4ZvDF2qo1g9GPUR")
self.pinecone_environment = os.getenv("PINECONE_ENVIRONMENT", "us-east-1") # Serverless doesn't need specific environment
# Use a consistent index name instead of generating unique ones
self.index_name = os.getenv("PINECONE_INDEX_NAME", "shared-ai-experiences")
# Pinecone inference model configuration
self.embedding_model = os.getenv("PINECONE_EMBEDDING_MODEL", "multilingual-e5-large")
self.rerank_model = os.getenv("PINECONE_RERANK_MODEL", "pinecone-rerank-v0")
# Initialize OpenRouter
self.openrouter_api_key = os.getenv("OPENROUTER_API_KEY")
self.model_name = os.getenv("MODEL_NAME", "meta-llama/llama-3.2-3b-instruct:free")
# Initialize Pinecone client
self.pc = None
self.index = None
# Initialize Pinecone
self.init_pinecone()
def update_model(self, new_model: str):
"""Update the OpenRouter model"""
if new_model and new_model.strip():
self.model_name = new_model.strip()
return f"✅ Model updated to: {self.model_name}"
return "❌ Please enter a valid model name"
def init_pinecone(self):
"""Initialize Pinecone connection with integrated inference"""
try:
if self.pinecone_api_key:
# Initialize Pinecone client
self.pc = Pinecone(api_key=self.pinecone_api_key)
print(f"Attempting to connect to Pinecone...")
# Check existing indexes
try:
existing_indexes = [idx.name for idx in self.pc.list_indexes()]
print(f"Existing indexes: {existing_indexes}")
except Exception as list_error:
print(f"Error listing indexes: {list_error}")
existing_indexes = []
# Only create index if it doesn't exist
if self.index_name not in existing_indexes:
print(f"Index '{self.index_name}' not found. Creating new Pinecone index with integrated inference...")
try:
# Create index with integrated embedding model
index_model = self.pc.create_index_for_model(
name=self.index_name,
cloud="aws",
region="us-east-1",
embed={
"model": self.embedding_model,
"field_map": {"text": "content"} # Map 'text' field to 'content' field
}
)
print(f"Successfully created index with integrated inference: {self.index_name}")
print(f"Index details: {index_model}")
# Wait for index to be ready
print("Waiting for index to be ready...")
time.sleep(10)
except Exception as create_error:
print(f"Error creating index with integrated inference: {create_error}")
# Fallback to traditional index creation
try:
print("Attempting fallback to traditional index creation...")
self.pc.create_index(
name=self.index_name,
dimension=1024, # multilingual-e5-large dimension
metric="cosine",
spec={
"serverless": {
"cloud": "aws",
"region": "us-east-1"
}
}
)
print(f"Created fallback traditional index: {self.index_name}")
time.sleep(5)
except Exception as fallback_error:
print(f"Failed to create fallback index: {fallback_error}")
self.index = None
return
else:
print(f"Index '{self.index_name}' already exists. Connecting to existing index...")
# Connect to the index (whether existing or newly created)
try:
self.index = self.pc.Index(self.index_name)
print(f"Successfully connected to Pinecone index: {self.index_name}")
# Test the connection and get stats
stats = self.index.describe_index_stats()
print(f"Index stats: {stats}")
# Check if this is an existing index with data
total_vectors = stats.get('total_vector_count', 0)
if total_vectors > 0:
print(f"Found existing index with {total_vectors} stored experiences. Continuing with shared knowledge base.")
else:
print("Index is empty. Ready to start building shared knowledge base.")
except Exception as connect_error:
print(f"Error connecting to index: {connect_error}")
self.index = None
else:
print("Warning: Pinecone API key not found. Memory storage disabled.")
self.index = None
except Exception as e:
print(f"Error initializing Pinecone: {e}")
self.index = None
def create_embedding(self, text: str) -> List[float]:
"""Create embedding using Pinecone's inference API"""
try:
if not self.pc:
print("Pinecone client not available, returning zero vector")
return [0.0] * 1024
# Use Pinecone's inference API for embeddings
response = self.pc.inference.embed(
model=self.embedding_model,
inputs=[text],
parameters={
"input_type": "passage", # Use 'passage' for storing, 'query' for searching
"truncate": "END"
}
)
if response and len(response.data) > 0:
return response.data[0].values
else:
print("No embedding data received, returning zero vector")
return [0.0] * 1024
except Exception as e:
print(f"Error creating embedding with Pinecone inference: {e}")
return [0.0] * 1024 # Return zero vector as fallback
def create_query_embedding(self, text: str) -> List[float]:
"""Create embedding for query using Pinecone's inference API"""
try:
if not self.pc:
print("Pinecone client not available, returning zero vector")
return [0.0] * 1024
# Use Pinecone's inference API for query embeddings
response = self.pc.inference.embed(
model=self.embedding_model,
inputs=[text],
parameters={
"input_type": "query", # Use 'query' for searching
"truncate": "END"
}
)
if response and len(response.data) > 0:
return response.data[0].values
else:
print("No embedding data received, returning zero vector")
return [0.0] * 1024
except Exception as e:
print(f"Error creating query embedding with Pinecone inference: {e}")
return [0.0] * 1024 # Return zero vector as fallback
def store_experience(self, user_input: str, ai_response: str, context: str = "") -> str:
"""Store conversation experience in Pinecone using integrated inference"""
if not self.index:
return "Memory storage not available (Pinecone not configured)"
try:
# Create a unique ID for this experience
experience_id = hashlib.md5(
f"{user_input}_{ai_response}_{datetime.datetime.now()}_{uuid.uuid4()}".encode()
).hexdigest()
# Create combined text for embedding
combined_text = f"User: {user_input}\nAI: {ai_response}\nContext: {context}"
# Check if index supports integrated inference
try:
# Try using integrated inference first (if index was created with create_index_for_model)
record = {
"id": experience_id,
"content": combined_text, # This will be automatically embedded
"metadata": {
"user_input": user_input[:1000],
"ai_response": ai_response[:1000],
"context": context[:500],
"timestamp": datetime.datetime.now().isoformat(),
"interaction_type": "conversation",
"session_id": getattr(self, 'session_id', 'shared')
}
}
# Try upsert with integrated inference
self.index.upsert_records([record])
return f"✅ Experience stored with integrated inference, ID: {experience_id[:8]}..."
except Exception as integrated_error:
print(f"Integrated inference failed: {integrated_error}")
# Fallback to manual embedding
embedding = self.create_embedding(combined_text)
# Store in Pinecone with manual embedding
self.index.upsert([(experience_id, embedding, {
"user_input": user_input[:1000],
"ai_response": ai_response[:1000],
"context": context[:500],
"timestamp": datetime.datetime.now().isoformat(),
"interaction_type": "conversation",
"session_id": getattr(self, 'session_id', 'shared')
})])
return f"✅ Experience stored with manual embedding, ID: {experience_id[:8]}..."
except Exception as e:
return f"❌ Error storing experience: {e}"
def retrieve_relevant_experiences(self, query: str, top_k: int = 3) -> List[Dict]:
"""Retrieve relevant past experiences based on query using Pinecone inference"""
if not self.index:
return []
try:
# Try integrated search first
try:
results = self.index.search_records(
query={
"top_k": top_k,
"inputs": {"text": query}
},
include_metadata=True
)
relevant_experiences = []
if hasattr(results, 'matches'):
for match in results.matches:
if match.score > 0.3:
relevant_experiences.append({
"score": match.score,
"user_input": match.metadata.get("user_input", ""),
"ai_response": match.metadata.get("ai_response", ""),
"context": match.metadata.get("context", ""),
"timestamp": match.metadata.get("timestamp", ""),
"id": match.id
})
return relevant_experiences
except Exception as integrated_error:
print(f"Integrated search failed: {integrated_error}")
# Fallback to manual embedding + query
query_embedding = self.create_query_embedding(query)
# Search Pinecone with manual embedding
results = self.index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
relevant_experiences = []
for match in results.matches:
if match.score > 0.3:
relevant_experiences.append({
"score": match.score,
"user_input": match.metadata.get("user_input", ""),
"ai_response": match.metadata.get("ai_response", ""),
"context": match.metadata.get("context", ""),
"timestamp": match.metadata.get("timestamp", ""),
"id": match.id
})
return relevant_experiences
except Exception as e:
print(f"Error retrieving experiences: {e}")
return []
def rerank_results(self, query: str, documents: List[str]) -> List[Dict]:
"""Rerank results using Pinecone's reranking model"""
if not self.pc or not documents:
return []
try:
# Use Pinecone's inference API for reranking
response = self.pc.inference.rerank(
model=self.rerank_model,
query=query,
documents=documents,
top_k=min(len(documents), 5) # Rerank top 5
)
reranked_results = []
if response and hasattr(response, 'data'):
for result in response.data:
reranked_results.append({
"document": result.document.text,
"score": result.relevance_score,
"index": result.index
})
return reranked_results
except Exception as e:
print(f"Error reranking results: {e}")
return []
def call_openrouter(self, messages: List[Dict], temperature: float = 0.7) -> str:
"""Call OpenRouter API"""
if not self.openrouter_api_key:
return "Error: OpenRouter API key not configured. Please set the OPENROUTER_API_KEY environment variable."
try:
headers = {
"Authorization": f"Bearer {self.openrouter_api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://huggingface.co",
"X-Title": "AI RAG Memory System"
}
data = {
"model": self.model_name,
"messages": messages,
"temperature": temperature,
"max_tokens": 1000
}
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers=headers,
json=data,
timeout=30
)
if response.status_code == 200:
result = response.json()
return result["choices"][0]["message"]["content"]
else:
return f"API Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error calling OpenRouter: {e}"
def generate_response_with_rag(self, user_input: str, conversation_history: List = None) -> tuple:
"""Generate AI response using RAG with stored experiences and Pinecone inference"""
# Retrieve relevant past experiences
relevant_experiences = self.retrieve_relevant_experiences(user_input)
# Build context from relevant experiences
context_parts = []
if relevant_experiences:
context_parts.append("🧠 Relevant past experiences from the shared knowledge base (powered by Pinecone inference):")
# Extract documents for reranking
documents = [f"User: {exp['user_input']} AI: {exp['ai_response']}" for exp in relevant_experiences]
# Try to rerank the results
reranked = self.rerank_results(user_input, documents)
if reranked:
context_parts.append(f"\n🔄 Reranked results using {self.rerank_model}:")
for i, result in enumerate(reranked, 1):
context_parts.append(f"{i}. (Relevance: {result['score']:.3f}) {result['document'][:200]}...")
else:
# Fallback to original results
for i, exp in enumerate(relevant_experiences, 1):
context_parts.append(f"\n{i}. Previous interaction from shared knowledge (similarity: {exp['score']:.2f}):")
context_parts.append(f" 👤 User: {exp['user_input'][:200]}...")
context_parts.append(f" 🤖 AI: {exp['ai_response'][:200]}...")
context_parts.append(f" 🕒 Time: {exp['timestamp'][:19]}")
if exp['context']:
context_parts.append(f" 📝 Context: {exp['context'][:100]}...")
context_parts.append("")
else:
context_parts.append("🆕 No previous relevant experiences found in the shared knowledge base. This is a fresh conversation!")
context_str = "\n".join(context_parts)
# Build messages for the AI
messages = [
{
"role": "system",
"content": f"""You are an AI assistant with access to a shared knowledge base of past conversations and interactions through Pinecone's vector database with integrated inference.
IMPORTANT: The context below contains conversations from OTHER USERS and previous AI responses - this is NOT your personal memory, but rather a shared knowledge base that multiple users contribute to. Each conversation you have will also be added to this shared knowledge base for future users.
The embeddings are generated using {self.embedding_model} and results are reranked with {self.rerank_model}.
SHARED KNOWLEDGE BASE CONTEXT:
{context_str}
Guidelines for using shared knowledge:
- The experiences above are from OTHER USERS' conversations, not your own memories
- Use these shared experiences to provide helpful, informed responses
- When referencing past interactions, make it clear they came from the shared knowledge base
- Don't claim personal ownership of experiences that belong to other users
- Learn from the collective knowledge while maintaining your own conversational identity
- Be transparent that you're drawing from a shared pool of experiences
- Build upon the collective wisdom while providing fresh, contextual responses
- Acknowledge when information comes from the shared knowledge base vs. the current conversation
Remember: You're part of a learning system where each conversation contributes to helping future users, but you should be clear about the source of your knowledge."""
}
]
# Add conversation history if provided
if conversation_history:
for msg in conversation_history[-5:]: # Last 5 messages
messages.append(msg)
# Add current user input
messages.append({"role": "user", "content": user_input})
# Generate response
ai_response = self.call_openrouter(messages)
# Store this interaction as a new experience
storage_result = self.store_experience(user_input, ai_response, context_str)
return ai_response, context_str, storage_result
def chat_with_rag(message: str, history: List = None) -> tuple:
"""Main chat function for Gradio interface"""
if not message.strip():
return "Please enter a message.", "", ""
# Convert Gradio history format to OpenAI format
conversation_history = []
if history:
for user_msg, ai_msg in history:
if user_msg:
conversation_history.append({"role": "user", "content": user_msg})
if ai_msg:
conversation_history.append({"role": "assistant", "content": ai_msg})
# Generate response with RAG
ai_response, context_used, storage_info = rag_system.generate_response_with_rag(
message, conversation_history
)
return ai_response, context_used, storage_info
def clear_conversation():
"""Clear the conversation history"""
return [], "", "", ""
def get_system_status():
"""Get current system status"""
status = []
# Check Pinecone connection
if rag_system.index:
try:
stats = rag_system.index.describe_index_stats()
total_vectors = stats.get('total_vector_count', 0)
status.append(f"✅ Pinecone: Connected ({total_vectors} experiences)")
status.append(f"🧠 Embedding: {rag_system.embedding_model}")
except Exception as e:
status.append(f"⚠️ Pinecone: Connected but stats unavailable")
else:
status.append("❌ Pinecone: Not connected")
# Check OpenRouter
if rag_system.openrouter_api_key:
status.append(f"✅ OpenRouter: {rag_system.model_name}")
else:
status.append("❌ OpenRouter: Not configured")
return "\n".join(status)
# Minimal CSS for clean appearance
minimal_css = """
/* Clean, minimal styling */
.gradio-container {
max-width: 1100px !important;
margin: 0 auto !important;
}
/* Remove excess padding and margins */
.block {
border: none !important;
box-shadow: none !important;
}
/* Simple header */
.header {
text-align: center;
padding: 1rem;
background: linear-gradient(90deg, #4f46e5, #7c3aed);
color: white;
border-radius: 8px;
margin-bottom: 1rem;
}
/* Clean chatbot styling */
.chatbot {
border: 1px solid #e5e7eb !important;
border-radius: 8px !important;
}
/* Simple input styling */
.input-box {
border: 1px solid #d1d5db !important;
border-radius: 6px !important;
}
/* Clean buttons */
.primary-btn {
background: #4f46e5 !important;
border: none !important;
border-radius: 6px !important;
color: white !important;
}
.secondary-btn {
background: #f3f4f6 !important;
border: 1px solid #d1d5db !important;
border-radius: 6px !important;
color: #374151 !important;
}
/* Context area */
.context-area {
background: #f9fafb !important;
border: 1px solid #e5e7eb !important;
border-radius: 6px !important;
font-family: monospace !important;
font-size: 12px !important;
}
/* Status display */
.status-display {
background: #f0f9ff !important;
border: 1px solid #bae6fd !important;
border-radius: 6px !important;
font-family: monospace !important;
font-size: 12px !important;
}
/* Memory info */
.memory-display {
background: #f0fdf4 !important;
border: 1px solid #bbf7d0 !important;
border-radius: 6px !important;
font-size: 12px !important;
}
/* Remove default gradio styling */
.gr-button {
font-size: 14px !important;
}
.gr-textbox {
font-size: 14px !important;
}
/* Tabs styling */
.tab-nav {
border-bottom: 1px solid #e5e7eb;
}
/* Collapsible sections */
.accordion {
border: 1px solid #e5e7eb;
border-radius: 6px;
margin: 0.5rem 0;
}
"""
# Initialize the RAG system
rag_system = RAGMemorySystem()
# Create minimal Gradio interface
with gr.Blocks(
title="AI Assistant with RAG",
css=minimal_css,
theme=gr.themes.Soft()
) as demo:
# Simple header
gr.HTML("""
<div class="header">
<h2 style="margin: 0;">🤖 AI Assistant with RAG</h2>
<p style="margin: 5px 0 0 0; opacity: 0.9;">Powered by Pinecone Vector Search</p>
</div>
""")
# Main chat interface
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(
height=450,
show_label=False,
elem_classes=["chatbot"]
)
with gr.Row():
msg = gr.Textbox(
placeholder="Type your message...",
show_label=False,
scale=4,
elem_classes=["input-box"]
)
send_btn = gr.Button(
"Send",
variant="primary",
scale=1,
elem_classes=["primary-btn"]
)
with gr.Row():
clear_btn = gr.Button(
"Clear Chat",
variant="secondary",
elem_classes=["secondary-btn"]
)
with gr.Column(scale=1):
# Context display (collapsible)
with gr.Accordion("Knowledge Context", open=False):
context_display = gr.Textbox(
lines=8,
interactive=False,
show_label=False,
placeholder="Retrieved context appears here...",
elem_classes=["context-area"]
)
# Storage info
storage_info = gr.Textbox(
lines=1,
interactive=False,
show_label=False,
placeholder="Storage status...",
elem_classes=["memory-display"]
)
# Settings section (collapsible)
with gr.Accordion("Settings", open=False):
with gr.Row():
with gr.Column():
gr.Markdown("### Model Configuration")
with gr.Row():
model_input = gr.Textbox(
label="OpenRouter Model",
value=rag_system.model_name,
placeholder="Enter model name...",
scale=3
)
update_btn = gr.Button(
"Update",
variant="primary",
scale=1,
elem_classes=["primary-btn"]
)
model_status = gr.Textbox(
label="Current Model",
value=f"Using: {rag_system.model_name}",
interactive=False
)
gr.Markdown("""
**Free Models:**
- `meta-llama/llama-3.2-3b-instruct:free`
- `microsoft/phi-3-mini-128k-instruct:free`
- `google/gemma-2-9b-it:free`
""")
with gr.Column():
gr.Markdown("### System Status")
status_display = gr.Textbox(
value=get_system_status(),
lines=4,
interactive=False,
show_label=False,
elem_classes=["status-display"]
)
refresh_btn = gr.Button(
"Refresh",
variant="secondary",
elem_classes=["secondary-btn"]
)
# About section (collapsible)
with gr.Accordion("About", open=False):
gr.Markdown("""
### AI Assistant with RAG
This application uses **Retrieval-Augmented Generation** to provide more informed responses by:
- Storing conversations in a **Pinecone vector database**
- Retrieving relevant past experiences using **semantic search**
- Using **multilingual-e5-large** embeddings for understanding
- Reranking results with **pinecone-rerank-v0** for better relevance
**Privacy:** Conversations are stored in a shared knowledge base. No personal data is retained.
""")
# Event handlers
def respond(message, history):
if not message:
return history, "", "", ""
ai_response, context_used, storage_info_text = chat_with_rag(message, history)
if history is None:
history = []
history.append((message, ai_response))
return history, "", context_used, storage_info_text
def update_model_handler(new_model):
result = rag_system.update_model(new_model)
status = f"Using: {rag_system.model_name}"
return "", status, get_system_status()
# Wire up events
send_btn.click(
respond,
inputs=[msg, chatbot],
outputs=[chatbot, msg, context_display, storage_info]
)
msg.submit(
respond,
inputs=[msg, chatbot],
outputs=[chatbot, msg, context_display, storage_info]
)
clear_btn.click(
clear_conversation,
outputs=[chatbot, msg, context_display, storage_info]
)
update_btn.click(
update_model_handler,
inputs=[model_input],
outputs=[model_input, model_status, status_display]
)
refresh_btn.click(
get_system_status,
outputs=[status_display]
)
# Launch
if __name__ == "__main__":
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)