VersionRAG / version_rag.py
shahbazdev0's picture
Upload 9 files
f7db2f9 verified
# version_rag.py - Core VersionRAG Implementation (OpenAI Embeddings)
import chromadb
from chromadb.config import Settings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from typing import List, Dict, Optional
import os
from datetime import datetime
import uuid
class VersionRAG:
"""Version-Aware RAG System with Graph + Vector Store"""
def __init__(self, user_id: str, model_name: str = "gpt-3.5-turbo",
embedding_model: str = "text-embedding-3-small"):
self.user_id = user_id
self.model_name = model_name
# Initialize embeddings - Using OpenAI instead of sentence-transformers
self.embeddings = OpenAIEmbeddings(model=embedding_model)
# Initialize ChromaDB with persistence
persist_dir = f"./chroma_db_{user_id}"
os.makedirs(persist_dir, exist_ok=True)
self.chroma_client = chromadb.PersistentClient(path=persist_dir)
# Create collection with tenant metadata
collection_name = f"versionrag_{user_id}"
try:
self.collection = self.chroma_client.get_collection(name=collection_name)
except:
self.collection = self.chroma_client.create_collection(
name=collection_name,
metadata={"tenant_id": user_id}
)
# Initialize LLM
self.llm = ChatOpenAI(
model_name=model_name,
temperature=0
)
# Text splitter
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
self.documents = []
self.metadatas = []
self.graph_manager = None
def set_graph_manager(self, graph_manager):
"""Set graph manager for version tracking"""
self.graph_manager = graph_manager
def add_documents(self, texts: List[str], metadatas: List[Dict], changes: Optional[List[Dict]] = None):
"""Add documents to the vector store with version metadata and changes"""
all_chunks = []
all_chunk_metadatas = []
all_ids = []
for idx, (text, metadata) in enumerate(zip(texts, metadatas)):
# Split text into chunks
chunks = self.text_splitter.split_text(text)
# Add tenant_id to metadata
for chunk_idx, chunk in enumerate(chunks):
chunk_metadata = metadata.copy()
chunk_metadata['tenant_id'] = self.user_id
chunk_metadata['chunk_id'] = len(all_chunks)
chunk_metadata['doc_type'] = 'content'
all_chunks.append(chunk)
all_chunk_metadatas.append(chunk_metadata)
all_ids.append(f"{self.user_id}_content_{uuid.uuid4()}")
# Add change information if provided
if changes and idx < len(changes) and changes[idx]:
change_info = changes[idx]
# Add additions as separate chunks
for addition in change_info.get('additions', [])[:20]:
if len(addition.strip()) > 10:
change_metadata = metadata.copy()
change_metadata['tenant_id'] = self.user_id
change_metadata['doc_type'] = 'change'
change_metadata['change_type'] = 'addition'
all_chunks.append(f"[ADDITION in {metadata.get('version')}] {addition}")
all_chunk_metadatas.append(change_metadata)
all_ids.append(f"{self.user_id}_change_{uuid.uuid4()}")
# Add deletions as separate chunks
for deletion in change_info.get('deletions', [])[:20]:
if len(deletion.strip()) > 10:
change_metadata = metadata.copy()
change_metadata['tenant_id'] = self.user_id
change_metadata['doc_type'] = 'change'
change_metadata['change_type'] = 'deletion'
all_chunks.append(f"[DELETION in {metadata.get('version')}] {deletion}")
all_chunk_metadatas.append(change_metadata)
all_ids.append(f"{self.user_id}_change_{uuid.uuid4()}")
# Add modifications as separate chunks
for modification in change_info.get('modifications', [])[:20]:
if len(modification.strip()) > 10:
change_metadata = metadata.copy()
change_metadata['tenant_id'] = self.user_id
change_metadata['doc_type'] = 'change'
change_metadata['change_type'] = 'modification'
all_chunks.append(f"[MODIFICATION in {metadata.get('version')}] {modification}")
all_chunk_metadatas.append(change_metadata)
all_ids.append(f"{self.user_id}_change_{uuid.uuid4()}")
# Add to ChromaDB
if all_chunks:
embeddings = self.embeddings.embed_documents(all_chunks)
self.collection.add(
embeddings=embeddings,
documents=all_chunks,
metadatas=all_chunk_metadatas,
ids=all_ids
)
self.documents.extend(all_chunks)
self.metadatas.extend(all_chunk_metadatas)
def query(self, query: str, version_filter: Optional[str] = None,
top_k: int = 5) -> Dict:
"""Query with version awareness"""
# Embed query
query_embedding = self.embeddings.embed_query(query)
# Build where clause for filtering
if version_filter:
where = {
"$and": [
{"tenant_id": self.user_id},
{"doc_type": "content"},
{"version": version_filter}
]
}
else:
where = {
"$and": [
{"tenant_id": self.user_id},
{"doc_type": "content"}
]
}
# Query ChromaDB
try:
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=where
)
except Exception as e:
return {
'answer': f"Error querying database: {str(e)}",
'sources': []
}
# Extract results
if not results['documents'][0]:
return {
'answer': "No relevant documents found.",
'sources': []
}
# Prepare context
context_docs = results['documents'][0]
context_metadatas = results['metadatas'][0]
distances = results['distances'][0]
# Build context string
context = "\n\n".join([
f"[Version {meta.get('version', 'N/A')} - {meta.get('topic', 'Unknown')}]\n{doc}"
for doc, meta in zip(context_docs, context_metadatas)
])
# Generate answer using LLM
prompt = f"""Based on the following context, answer the question.
If the answer includes version-specific information, explicitly mention the version.
Be precise and cite the version when relevant.
Context:
{context}
Question: {query}
Answer:"""
try:
response = self.llm.invoke(prompt)
answer = response.content if hasattr(response, 'content') else str(response)
except Exception as e:
answer = f"Error generating answer: {str(e)}"
# Prepare sources
sources = []
for doc, meta, dist in zip(context_docs, context_metadatas, distances):
sources.append({
'content': doc,
'version': meta.get('version', 'N/A'),
'filename': meta.get('filename', 'N/A'),
'domain': meta.get('domain', 'N/A'),
'topic': meta.get('topic', 'N/A'),
'similarity': 1 - dist
})
return {
'answer': answer,
'sources': sources,
'context': context
}
def version_inquiry(self, query: str) -> Dict:
"""Handle version-specific inquiries using graph"""
if self.graph_manager:
documents = self.graph_manager.get_all_documents()
relevant_docs = []
query_lower = query.lower()
for doc in documents:
if any(word in doc.lower() for word in query_lower.split()):
relevant_docs.append(doc)
if relevant_docs:
answer = f"Found version information for {len(relevant_docs)} document(s):\n\n"
versions_found = []
for doc in relevant_docs:
versions = self.graph_manager.get_document_versions(doc)
versions_found.extend(versions)
answer += f"**{doc}**\n"
answer += f"- Versions: {', '.join(versions)}\n"
for version in versions:
info = self.graph_manager.get_version_info(doc, version)
if info:
answer += f" - {version}: {info.get('timestamp', 'N/A')}\n"
answer += "\n"
return {
'answer': answer,
'sources': [],
'versions': list(set(versions_found))
}
# Fallback to vector search
query_embedding = self.embeddings.embed_query(query)
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=20,
where={
"$and": [
{"tenant_id": self.user_id},
{"doc_type": "content"}
]
}
)
versions = set()
version_info = {}
for meta in results['metadatas'][0]:
version = meta.get('version', 'N/A')
if version != 'N/A':
versions.add(version)
if version not in version_info:
version_info[version] = {
'filename': meta.get('filename', 'N/A'),
'domain': meta.get('domain', 'N/A'),
'topic': meta.get('topic', 'N/A')
}
version_list = ", ".join(sorted(versions))
answer = f"Found {len(versions)} version(s): {version_list}\n\n"
for version in sorted(versions):
info = version_info[version]
answer += f"- **{version}**: {info['topic']} ({info['domain']})\n"
return {
'answer': answer,
'sources': [],
'versions': list(versions)
}
def change_retrieval(self, query: str) -> Dict:
"""Retrieve change information between versions"""
query_embedding = self.embeddings.embed_query(query)
try:
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=10,
where={
"$and": [
{"tenant_id": self.user_id},
{"doc_type": "change"}
]
}
)
except:
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=10,
where={"tenant_id": self.user_id}
)
if results['documents'][0] and results['metadatas'][0]:
changes = []
for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
if meta.get('doc_type') == 'change':
changes.append({
'content': doc,
'version': meta.get('version', 'N/A'),
'change_type': meta.get('change_type', 'unknown'),
'filename': meta.get('filename', 'N/A'),
'topic': meta.get('topic', 'N/A')
})
if changes:
answer = "Changes detected:\n\n"
for change in changes[:5]:
answer += f"**[{change['version']} - {change['change_type'].upper()}]**\n"
answer += f"Topic: {change['topic']}\n"
answer += f"{change['content']}\n\n"
return {
'answer': answer,
'sources': changes
}
context_results = self.collection.query(
query_embeddings=[query_embedding],
n_results=5,
where={"tenant_id": self.user_id}
)
if context_results['documents'][0]:
context = "\n\n".join(context_results['documents'][0])
prompt = f"""Based on the context, identify and describe any changes, additions, deletions, or modifications mentioned.
Context:
{context}
Question: {query}
Answer:"""
try:
response = self.llm.invoke(prompt)
answer = response.content if hasattr(response, 'content') else str(response)
except:
answer = "Unable to determine changes."
else:
answer = "No change information found."
return {
'answer': answer,
'sources': context_results['metadatas'][0][:5] if context_results['metadatas'][0] else []
}
class BaselineRAG:
"""Standard RAG system without version awareness"""
def __init__(self, user_id: str, model_name: str = "gpt-3.5-turbo",
embedding_model: str = "text-embedding-3-small"):
self.user_id = user_id
self.model_name = model_name
# Initialize embeddings - Using OpenAI
self.embeddings = OpenAIEmbeddings(model=embedding_model)
persist_dir = f"./chroma_baseline_{user_id}"
os.makedirs(persist_dir, exist_ok=True)
self.chroma_client = chromadb.PersistentClient(path=persist_dir)
collection_name = f"baseline_{user_id}"
try:
self.collection = self.chroma_client.get_collection(name=collection_name)
except:
self.collection = self.chroma_client.create_collection(name=collection_name)
self.llm = ChatOpenAI(
model_name=model_name,
temperature=0
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
def add_documents(self, texts: List[str], metadatas: List[Dict]):
"""Add documents to vector store"""
all_chunks = []
all_metadatas = []
all_ids = []
for text, metadata in zip(texts, metadatas):
chunks = self.text_splitter.split_text(text)
for chunk in chunks:
all_chunks.append(chunk)
all_metadatas.append(metadata.copy())
all_ids.append(f"baseline_{self.user_id}_{uuid.uuid4()}")
if all_chunks:
embeddings = self.embeddings.embed_documents(all_chunks)
self.collection.add(
embeddings=embeddings,
documents=all_chunks,
metadatas=all_metadatas,
ids=all_ids
)
def query(self, query: str, top_k: int = 5) -> Dict:
"""Standard query without version awareness"""
query_embedding = self.embeddings.embed_query(query)
try:
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
except Exception as e:
return {
'answer': f"Error: {str(e)}",
'sources': []
}
if not results['documents'][0]:
return {
'answer': "No relevant documents found.",
'sources': []
}
context = "\n\n".join(results['documents'][0])
prompt = f"""Based on the following context, answer the question.
Context:
{context}
Question: {query}
Answer:"""
try:
response = self.llm.invoke(prompt)
answer = response.content if hasattr(response, 'content') else str(response)
except Exception as e:
answer = f"Error: {str(e)}"
sources = [
{'content': doc, 'metadata': meta}
for doc, meta in zip(results['documents'][0], results['metadatas'][0])
]
return {
'answer': answer,
'sources': sources
}