Spaces:
Running
Running
File size: 22,285 Bytes
a23082c 114747f a23082c 68bd1d5 a23082c 114747f a23082c 68bd1d5 a23082c 114747f a23082c b8f6b7f a23082c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 |
import os
import logging
import json
from typing import List, Dict, Optional, Union, Literal
from llama_index.core.agent.workflow import ReActAgent
from llama_index.core.tools import FunctionTool, QueryEngineTool
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.core import Document, VectorStoreIndex, Settings
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.retrievers import VectorIndexRetriever
# Setup logging
logger = logging.getLogger(__name__)
# Configure LlamaIndex Settings (optional, but good practice)
# Ensure embedding model is set if not using default OpenAI
# Settings.embed_model = ... # Example: HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
# Settings.llm = ... # Can set a default LLM here if needed
# Helper function to load prompt from file
def load_prompt_from_file(filename: str, default_prompt: str) -> str:
"""Loads a prompt from a text file."""
try:
script_dir = os.path.dirname(__file__)
prompt_path = os.path.join(script_dir, filename)
with open(prompt_path, "r") as f:
prompt = f.read()
logger.info(f"Successfully loaded prompt from {prompt_path}")
return prompt
except FileNotFoundError:
logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.")
return default_prompt
except Exception as e:
logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True)
return default_prompt
# --- Internal Context Index Management ---
# Store index and text globally for simplicity in this example
# In a real application, consider a more robust state management approach
_context_index: Optional[VectorStoreIndex] = None
_context_text: Optional[str] = None
_context_source: Optional[str] = None # e.g., filename or description
def _build_or_get_index(text: Optional[str] = None, source: Optional[str] = "loaded_context") -> Optional[VectorStoreIndex]:
"""Builds or retrieves the VectorStoreIndex for the loaded context."""
global _context_index, _context_text, _context_source
if text is not None and (text != _context_text or _context_index is None):
logger.info(f"Building new context index from text (length: {len(text)} chars). Source: {source}")
_context_text = text
_context_source = source
try:
# Use SentenceSplitter for chunking
splitter = SentenceSplitter(chunk_size=1024, chunk_overlap=200)
Settings.node_parser = splitter # Set globally or pass to index construction
documents = [Document(text=_context_text)]
_context_index = VectorStoreIndex.from_documents(documents, show_progress=True)
logger.info("Context index built successfully.")
except Exception as e:
logger.error(f"Failed to build context index: {e}", exc_info=True)
_context_index = None
_context_text = None
_context_source = None
return None
elif _context_index is None:
logger.warning("No context loaded or index built yet.")
return None
return _context_index
def load_text_context(text: str, source: str = "provided_text") -> str:
"""Loads text into the agent's context and builds an index. Replaces existing context."""
logger.info(f"Loading new text context (length: {len(text)} chars). Source: {source}")
index = _build_or_get_index(text=text, source=source)
if index:
return f"Successfully loaded and indexed text context from {source} (Length: {len(text)} chars)."
else:
return "Error: Failed to load or index the provided text context."
# --- Tool Functions ---
def summarize_long_context(detail_level: Literal["brief", "standard", "detailed"] = "standard",
max_length: Optional[int] = None,
min_length: Optional[int] = None) -> str:
"""Summarizes the currently loaded long text context.
Args:
detail_level (str): Level of detail: "brief" (1-2 sentences), "standard" (1-2 paragraphs), "detailed" (multiple paragraphs).
max_length (Optional[int]): Approximate maximum words (overrides detail_level if set).
min_length (Optional[int]): Approximate minimum words.
Returns:
str: The summary or an error message.
"""
global _context_text, _context_source
if _context_text is None:
return "Error: No long context has been loaded yet. Use 'load_text_context' first."
logger.info(f"Summarizing loaded context (Source: {_context_source}, Length: {len(_context_text)} chars). Detail: {detail_level}")
# Determine length guidance based on detail_level if max/min not set
if max_length is None:
if detail_level == "brief":
max_length = 50
min_length = min_length or 10
elif detail_level == "detailed":
max_length = 500
min_length = min_length or 150
else: # standard
max_length = 200
min_length = min_length or 50
min_length = min_length or int(max_length * 0.3) # Default min length
# LLM configuration
llm_model = os.getenv("CONTEXT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Use Pro for potentially long context
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found for summarization LLM.")
return "Error: GEMINI_API_KEY not set."
# Truncate input text only if extremely long, as Pro handles large contexts
# Let the LLM handle context window limits if possible
# max_input_chars = 100000 # Example high limit
# text_to_summarize = _context_text[:max_input_chars] if len(_context_text) > max_input_chars else _context_text
text_to_summarize = _context_text # Rely on LLM context window
prompt = (
f"Summarize the following text concisely, focusing on the main points and key information. "
f"Aim for a length between {min_length} and {max_length} words. "
f"The requested level of detail is '{detail_level}'.\n\n"
f"TEXT:\n{text_to_summarize}\n\nSUMMARY:"
)
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"Using summarization LLM: {llm_model}")
response = llm.complete(prompt)
summary = response.text.strip()
logger.info(f"Summarization successful (output length: {len(summary.split())} words).")
return summary
except Exception as e:
logger.error(f"LLM call failed during summarization: {e}", exc_info=True)
return f"Error during summarization: {e}"
def extract_key_information(query: str, max_results: int = 10) -> Union[List[str], str]:
"""Extracts specific information or answers a question based on the loaded long context using the index.
Args:
query (str): The question or description of information to extract (e.g., "List all decisions made", "What was mentioned about Project X?").
max_results (int): Maximum number of distinct pieces of information or text snippets to return.
Returns:
List[str]: A list of extracted text snippets or answers, or str: Error message.
"""
logger.info(f"Extracting information for query: {query} from loaded context. Max results: {max_results}")
index = _build_or_get_index() # Get existing index
if index is None:
return "Error: No context loaded or index available. Use 'load_text_context' first."
try:
# Use a query engine for extraction
# Configure retriever for potentially broader search
retriever = VectorIndexRetriever(index=index, similarity_top_k=max_results * 2) # Retrieve more initially
# Configure response synthesis (optional, can customize prompt)
# response_synthesizer = ...
query_engine = RetrieverQueryEngine.from_args(retriever=retriever,
# response_synthesizer=response_synthesizer,
# llm=Settings.llm # Use default or specify
)
# Formulate a prompt that encourages extraction rather than synthesis if needed
extraction_prompt = f"Based *only* on the provided context, extract the key information or answer the following query. List distinct findings or provide relevant text snippets. Query: {query}"
response = query_engine.query(extraction_prompt)
# Process response - might need refinement based on LLM output format
# Assuming response.response contains the extracted info, potentially needing splitting
# This part is heuristic and depends on how the LLM responds to the extraction prompt.
extracted_items = [item.strip() for item in response.response.split("\n") if item.strip()]
# Limit results if necessary
final_results = extracted_items[:max_results]
logger.info(f"Extraction successful. Found {len(final_results)} items.")
return final_results if final_results else ["No specific information found matching the query in the context."]
except Exception as e:
logger.error(f"Error during information extraction: {e}", exc_info=True)
return f"Error during extraction: {e}"
def filter_by_relevance(topic: str, threshold: float = 0.75) -> str:
"""Filters the loaded long context, retaining sections relevant to the topic using the index.
Args:
topic (str): The topic or query to filter relevance by.
threshold (float): Similarity threshold (0.0 to 1.0) for relevance. Higher means more strict.
Returns:
str: The filtered text containing only relevant sections, or an error message.
"""
logger.info(f"Filtering loaded context for relevance to topic: {topic}. Threshold: {threshold}")
index = _build_or_get_index() # Get existing index
if index is None:
return "Error: No context loaded or index available. Use 'load_text_context' first."
try:
retriever = VectorIndexRetriever(index=index, similarity_top_k=20) # Retrieve a decent number of candidates
retrieved_nodes = retriever.retrieve(topic)
relevant_texts = []
for node_with_score in retrieved_nodes:
if node_with_score.score >= threshold:
relevant_texts.append(node_with_score.node.get_content())
else:
# Since results are ordered by score, we can stop early
break
if not relevant_texts:
logger.info("No sections found meeting the relevance threshold.")
return "No content found matching the specified relevance threshold for the topic."
# Combine relevant sections (consider adding separators)
filtered_text = "\n\n---\n\n".join(relevant_texts)
logger.info(f"Filtering successful. Combined relevant text length: {len(filtered_text)} chars.")
return filtered_text
except Exception as e:
logger.error(f"Error during relevance filtering: {e}", exc_info=True)
return f"Error during filtering: {e}"
def query_context_index(query: str) -> str | None:
"""Answers a specific question based on the information contained within the loaded long context using the index.
Args:
query (str): The question to answer.
Returns:
str: The answer derived from the context, or an error/"not found" message.
"""
logger.info(f"Querying loaded context index with: {query}")
index = _build_or_get_index() # Get existing index
if index is None:
return "Error: No context loaded or index available. Use 'load_text_context' first."
try:
query_engine = index.as_query_engine(similarity_top_k=5) # Default query engine
response = query_engine.query(query)
answer = response.response.strip()
logger.info("Context query successful.")
# Check if the LLM indicated it couldn't answer
if "don't know" in answer.lower() or "no information" in answer.lower() or "context does not mention" in answer.lower():
logger.warning(f"Query response suggests information not found: {answer}")
return f"The loaded context does not seem to contain the answer to: {query}"
return answer
except Exception as e:
logger.error(f"Error during context query: {e}", exc_info=True)
return f"Error querying context: {e}"
# --- Tool Definitions ---
load_context_tool = FunctionTool.from_defaults(
fn=load_text_context,
name="load_text_context",
description=(
"Loads/replaces the long text context for the agent and builds an internal index. "
"Input: text (str), Optional: source (str). Output: Status message (str)."
),
)
summarize_context_tool = FunctionTool.from_defaults(
fn=summarize_long_context,
name="summarize_long_context",
description=(
"Summarizes the currently loaded long text context. "
"Input: Optional: detail_level ('brief', 'standard', 'detailed'), max_length (int), min_length (int). Output: Summary (str) or error."
),
)
extract_info_tool = FunctionTool.from_defaults(
fn=extract_key_information,
name="extract_key_information",
description=(
"Extracts specific information or answers questions from the loaded context using its index. "
"Input: query (str), Optional: max_results (int). Output: List[str] of findings or error string."
),
)
filter_context_tool = FunctionTool.from_defaults(
fn=filter_by_relevance,
name="filter_by_relevance",
description=(
"Filters the loaded context to retain only sections relevant to a topic, using the index. "
"Input: topic (str), Optional: threshold (float 0-1). Output: Filtered text (str) or error."
),
)
query_context_tool = FunctionTool.from_defaults(
fn=query_context_index,
name="query_context_index",
description=(
"Answers a specific question based *only* on the loaded long context using its index. "
"Input: query (str). Output: Answer (str) or error/'not found' message."
),
)
# --- Agent Initialization ---
def initialize_long_context_management_agent() -> ReActAgent:
"""Initializes the Long Context Management Agent."""
logger.info("Initializing LongContextManagementAgent...")
# Configuration for the agent's main LLM
agent_llm_model = os.getenv("CONTEXT_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Needs to handle planning
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found for LongContextManagementAgent.")
raise ValueError("GEMINI_API_KEY must be set for LongContextManagementAgent")
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"Using agent LLM: {agent_llm_model}")
Settings.llm = llm # Set default LLM for LlamaIndex components used by tools
# Load system prompt
default_system_prompt = ("You are LongContextManagementAgent... [Default prompt content - replace with actual]" # Placeholder
)
system_prompt = load_prompt_from_file("../prompts/long_context_management_agent_prompt.txt", default_system_prompt)
if system_prompt == default_system_prompt:
logger.warning("Using default/fallback system prompt for LongContextManagementAgent.")
# Define available tools
tools = [
load_context_tool,
summarize_context_tool,
extract_info_tool,
filter_context_tool,
query_context_tool
]
# Define valid handoff targets
valid_handoffs = [
"planner_agent", # To return results
"text_analyzer_agent", # If further analysis of extracted/filtered text is needed
"reasoning_agent",
"research_agent"
]
agent = ReActAgent(
name="long_context_management_agent",
description=(
"Manages and processes long textual context efficiently. Handles large documents, transcripts, or datasets "
"by summarizing (`summarize_long_context`), extracting key information (`extract_key_information`), "
"filtering relevant content (`filter_by_relevance`), and answering questions based on the context (`query_context_index`). "
"Supports internal indexing for efficient retrieval and repeated queries. Optimized for chunked input processing "
"and contextual distillation. Only relies on the provided input and avoids external augmentation unless explicitly requested."
),
tools=tools,
llm=llm,
system_prompt=system_prompt,
can_handoff_to=valid_handoffs,
)
logger.info("LongContextManagementAgent initialized successfully.")
return agent
except Exception as e:
logger.error(f"Error during LongContextManagementAgent initialization: {e}", exc_info=True)
raise
# Example usage (for testing if run directly)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Set LlamaIndex log level higher to reduce noise during testing
logging.getLogger("llama_index.core.indices.vector_store").setLevel(logging.WARNING)
logging.getLogger("llama_index.core.query_engine").setLevel(logging.WARNING)
logging.getLogger("llama_index.core.token_counter").setLevel(logging.ERROR) # Suppress token counting logs
logger.info("Running long_context_management_agent.py directly for testing...")
# Check required keys
required_keys = ["GEMINI_API_KEY"]
missing_keys = [key for key in required_keys if not os.getenv(key)]
if missing_keys:
print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.")
else:
try:
# Example long text
long_text = """
Meeting Minutes - Project Phoenix - April 28, 2025
Attendees: Alice, Bob, Charlie, David
Agenda: Review Q1 results, Plan Q2 roadmap, Budget allocation
Q1 Results Discussion:
Alice presented the sales figures. Sales increased by 15% compared to Q4 2024, exceeding the target of 10%.
Bob highlighted the success of the marketing campaign launched in February. Customer acquisition cost decreased by 5%.
Charlie noted a slight dip in user engagement metrics in March, possibly due to a recent UI change.
Action Item: David to investigate the engagement dip.
Q2 Roadmap Planning:
The team discussed potential features for Q2. Feature A (enhanced reporting) was prioritized.
Feature B (mobile app improvements) was deferred to Q3.
Alice emphasized the need for stability improvements. Bob suggested focusing on performance optimization.
Decision: Q2 focus will be on Feature A and performance/stability improvements.
Budget Allocation:
Charlie presented the proposed budget.
An additional $50,000 was requested for cloud infrastructure scaling due to increased usage.
David questioned the necessity of the full amount.
After discussion, the team approved an additional $40,000 for infrastructure.
Decision: Allocate $40,000 extra for Q2 infrastructure.
Next Steps:
David to report on engagement metrics by May 5th.
Alice to finalize Q2 feature specifications by May 10th.
Meeting adjourned.
""" * 5 # Make it longer
# Test loading context
print("\nTesting load_text_context...")
load_status = load_text_context(long_text, source="Meeting Minutes Test")
print(load_status)
if "Error" not in load_status:
# Test summarization
print("\nTesting summarize_long_context (brief)...")
summary_brief = summarize_long_context(detail_level="brief")
print(f"Brief Summary: {summary_brief}")
# Test extraction
print("\nTesting extract_key_information (decisions)...")
decisions = extract_key_information(query="List all decisions made in the meeting")
print(f"Decisions Extracted: {decisions}")
# Test filtering
print("\nTesting filter_by_relevance (budget)...")
budget_text = filter_by_relevance(topic="budget allocation", threshold=0.7)
print(f"Filtered Budget Text (first 300 chars):\n{budget_text[:300]}...")
# Test querying
print("\nTesting query_context_index (Q1 sales)...")
sales_query = "What was the sales increase in Q1?"
sales_answer = query_context_index(sales_query)
print(f"Answer to '{sales_query}': {sales_answer}")
print("\nTesting query_context_index (non-existent info)...")
non_existent_query = "Who is the CEO?"
non_existent_answer = query_context_index(non_existent_query)
print(f"Answer to '{non_existent_query}': {non_existent_answer}")
# Initialize the agent (optional)
# test_agent = initialize_long_context_management_agent()
# print("\nLong Context Management Agent initialized successfully for testing.")
except Exception as e:
print(f"Error during testing: {e}")
|