File size: 6,537 Bytes
e4d5155 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
"""
Core context management module for efficient-context library.
"""
from typing import List, Dict, Any, Optional, Union
import logging
from pydantic import BaseModel, Field
from efficient_context.compression.base import BaseCompressor
from efficient_context.chunking.base import BaseChunker
from efficient_context.retrieval.base import BaseRetriever
from efficient_context.memory.memory_manager import MemoryManager
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Document(BaseModel):
"""A document to be processed by the context manager."""
id: str = Field(..., description="Unique identifier for the document")
content: str = Field(..., description="Text content of the document")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Optional metadata for the document")
class ContextManager:
"""
Main class for managing context efficiently for LLMs in CPU-constrained environments.
This class orchestrates the compression, chunking, retrieval, and memory management
components to optimize context handling for LLMs running on limited hardware.
"""
def __init__(
self,
compressor: Optional[BaseCompressor] = None,
chunker: Optional[BaseChunker] = None,
retriever: Optional[BaseRetriever] = None,
memory_manager: Optional[MemoryManager] = None,
max_context_size: int = 4096,
):
"""
Initialize the context manager with configurable components.
Args:
compressor: Component for compressing context content
chunker: Component for chunking content
retriever: Component for retrieving relevant chunks
memory_manager: Component for managing memory usage
max_context_size: Maximum size of context in tokens
"""
from efficient_context.compression import SemanticDeduplicator
from efficient_context.chunking import SemanticChunker
from efficient_context.retrieval import CPUOptimizedRetriever
from efficient_context.memory import MemoryManager
self.compressor = compressor or SemanticDeduplicator()
self.chunker = chunker or SemanticChunker()
self.retriever = retriever or CPUOptimizedRetriever()
self.memory_manager = memory_manager or MemoryManager()
self.max_context_size = max_context_size
self.documents = {}
self.chunks = []
logger.info("Context Manager initialized with max context size: %d", max_context_size)
def add_document(self, document: Union[Document, Dict, str], document_id: Optional[str] = None) -> str:
"""
Add a document to the context manager.
Args:
document: Document to add (can be a Document object, dict, or string content)
document_id: Optional ID for the document (generated if not provided)
Returns:
document_id: ID of the added document
"""
# Convert input to Document object
if isinstance(document, str):
if document_id is None:
import uuid
document_id = str(uuid.uuid4())
doc = Document(id=document_id, content=document)
elif isinstance(document, dict):
if 'id' in document:
document_id = document['id']
elif document_id is None:
import uuid
document_id = str(uuid.uuid4())
doc = Document(
id=document_id,
content=document.get('content', ''),
metadata=document.get('metadata', {})
)
else:
doc = document
document_id = doc.id
# Store the document
self.documents[document_id] = doc
# Process the document
with self.memory_manager.optimize_memory():
# Compress the document
compressed_content = self.compressor.compress(doc.content)
# Chunk the compressed content
doc_chunks = self.chunker.chunk(compressed_content, metadata=doc.metadata, document_id=doc.id)
# Index the chunks for retrieval
self.retriever.index_chunks(doc_chunks)
# Store the chunks
self.chunks.extend(doc_chunks)
logger.info("Added document with ID %s (%d chunks)", document_id, len(doc_chunks))
return document_id
def add_documents(self, documents: List[Union[Document, Dict, str]]) -> List[str]:
"""
Add multiple documents to the context manager.
Args:
documents: List of documents to add
Returns:
document_ids: List of IDs of added documents
"""
document_ids = []
for doc in documents:
doc_id = self.add_document(doc)
document_ids.append(doc_id)
return document_ids
def generate_context(self, query: str, max_size: Optional[int] = None) -> str:
"""
Generate optimized context for a given query.
Args:
query: The query for which to generate context
max_size: Maximum size of the context (defaults to self.max_context_size)
Returns:
context: Optimized context for the query
"""
max_size = max_size or self.max_context_size
with self.memory_manager.optimize_memory():
# Retrieve relevant chunks
relevant_chunks = self.retriever.retrieve(query, top_k=max_size)
# Combine chunks into a context
context_parts = [chunk.content for chunk in relevant_chunks]
# Final compression to ensure we're within size limits
combined_context = "\n\n".join(context_parts)
if len(combined_context.split()) > max_size:
combined_context = self.compressor.compress(combined_context, target_size=max_size)
logger.info("Generated context of size ~%d tokens for query", len(combined_context.split()))
return combined_context
def clear(self):
"""Clear all documents and chunks from the context manager."""
self.documents = {}
self.chunks = []
self.retriever.clear()
logger.info("Context manager cleared")
|