Spaces:
Build error
Build error
Add configuration, graph, runner, and tools modules to enhance agent functionality. Introduce a Configuration class for managing parameters, implement an AgentRunner for executing the agent graph, and create tools for general search and mathematical calculations. Update test_agent.py to reflect new import paths and improve overall code organization.
13388e5
unverified
import logging | |
import os | |
import re | |
import uuid | |
from langgraph.types import Command | |
from graph import agent_graph | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) # Default to INFO level | |
logger = logging.getLogger(__name__) | |
# Enable LiteLLM debug logging only if environment variable is set | |
import litellm | |
if os.getenv("LITELLM_DEBUG", "false").lower() == "true": | |
litellm.set_verbose = True | |
logger.setLevel(logging.DEBUG) | |
else: | |
litellm.set_verbose = False | |
logger.setLevel(logging.INFO) | |
class AgentRunner: | |
"""Runner class for the code agent.""" | |
def __init__(self): | |
"""Initialize the agent runner with graph and tools.""" | |
logger.info("Initializing AgentRunner") | |
self.graph = agent_graph | |
self.last_state = None # Store the last state for testing/debugging | |
self.thread_id = str( | |
uuid.uuid4() | |
) # Generate a unique thread_id for this runner | |
logger.info(f"Created AgentRunner with thread_id: {self.thread_id}") | |
def _extract_answer(self, state: dict) -> str: | |
"""Extract the answer from the state.""" | |
if not state: | |
return None | |
# First try to get answer from direct answer field | |
if "answer" in state and state["answer"]: | |
logger.info(f"Found answer in direct field: {state['answer']}") | |
return state["answer"] | |
# Then try to get answer from messages | |
if "messages" in state and state["messages"]: | |
for msg in reversed(state["messages"]): | |
if hasattr(msg, "content") and msg.content: | |
# Look for code blocks that might contain the answer | |
if "```" in msg.content: | |
# Extract code between ```py and ``` or ```python and ``` | |
code_match = re.search( | |
r"```(?:py|python)?\s*\n(.*?)\n```", msg.content, re.DOTALL | |
) | |
if code_match: | |
code = code_match.group(1) | |
# Look for final_answer call | |
final_answer_match = re.search( | |
r"final_answer\((.*?)\)", code | |
) | |
if final_answer_match: | |
answer = final_answer_match.group(1) | |
logger.info( | |
f"Found answer in final_answer call: {answer}" | |
) | |
return answer | |
# If no code block with final_answer, use the content | |
logger.info(f"Found answer in message: {msg.content}") | |
return msg.content | |
return None | |
def __call__(self, input_data) -> str: | |
"""Process a question through the agent graph and return the answer. | |
Args: | |
input_data: Either a question string or a Command object for resuming | |
Returns: | |
str: The agent's response | |
""" | |
try: | |
config = {"configurable": {"thread_id": self.thread_id}} | |
logger.info(f"Using config: {config}") | |
if isinstance(input_data, str): | |
# Initial question | |
logger.info(f"Processing initial question: {input_data}") | |
initial_state = { | |
"question": input_data, | |
"messages": [], | |
"answer": None, | |
"step_logs": [], | |
"is_complete": False, | |
"step_count": 0, | |
# Initialize new memory fields | |
"context": {}, | |
"memory_buffer": [], | |
"last_action": None, | |
"action_history": [], | |
"error_count": 0, | |
"success_count": 0, | |
} | |
logger.info(f"Initial state: {initial_state}") | |
# Use stream to get results | |
logger.info("Starting graph stream for initial question") | |
for chunk in self.graph.stream(initial_state, config): | |
logger.debug(f"Received chunk: {chunk}") | |
if isinstance(chunk, dict): | |
if "__interrupt__" in chunk: | |
logger.info("Detected interrupt in stream") | |
logger.info(f"Interrupt details: {chunk['__interrupt__']}") | |
# Let the graph handle the interrupt naturally | |
continue | |
answer = self._extract_answer(chunk) | |
if answer: | |
self.last_state = chunk | |
# If the state is complete, return the answer | |
if chunk.get("is_complete", False): | |
return answer | |
else: | |
logger.debug(f"Skipping chunk without answer: {chunk}") | |
else: | |
# Resuming from interrupt | |
logger.info(f"Resuming from interrupt with input: {input_data}") | |
for result in self.graph.stream(input_data, config): | |
logger.debug(f"Received resume result: {result}") | |
if isinstance(result, dict): | |
answer = self._extract_answer(result) | |
if answer: | |
self.last_state = result | |
# If the state is complete, return the answer | |
if result.get("is_complete", False): | |
return answer | |
else: | |
logger.debug(f"Skipping result without answer: {result}") | |
# If we get here, we didn't find an answer | |
logger.warning("No answer generated from stream") | |
return "No answer generated" | |
except Exception as e: | |
logger.error(f"Error processing input: {str(e)}") | |
raise | |
if __name__ == "__main__": | |
import argparse | |
from langgraph.types import Command | |
# Set up argument parser | |
parser = argparse.ArgumentParser(description="Run the agent with a question") | |
parser.add_argument("question", type=str, help="The question to ask the agent") | |
parser.add_argument( | |
"--resume", | |
type=str, | |
help="Value to resume with after an interrupt", | |
default=None, | |
) | |
args = parser.parse_args() | |
# Create agent runner | |
runner = AgentRunner() | |
if args.resume: | |
# Resume from interrupt with provided value | |
print(f"\nResuming with value: {args.resume}") | |
response = runner(Command(resume=args.resume)) | |
else: | |
# Initial run with question | |
print(f"\nAsking question: {args.question}") | |
response = runner(args.question) | |
print(f"\nFinal response: {response}") | |