Spaces:
Build error
Build error
Enhance AgentRunner and graph functionality by adding answer extraction logic and improving logging throughout the processing flow. Update the handling of interrupts and state management to ensure clarity in debug output. Refactor the should_continue function in graph.py to better manage completion states and improve user interaction.
218633c
unverified
import logging | |
import os | |
import uuid | |
from langgraph.types import Command | |
from graph import agent_graph | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) # Default to INFO level | |
logger = logging.getLogger(__name__) | |
# Enable LiteLLM debug logging only if environment variable is set | |
import litellm | |
if os.getenv("LITELLM_DEBUG", "false").lower() == "true": | |
litellm.set_verbose = True | |
logger.setLevel(logging.DEBUG) | |
else: | |
litellm.set_verbose = False | |
logger.setLevel(logging.INFO) | |
class AgentRunner: | |
"""Runner class for the code agent.""" | |
def __init__(self): | |
"""Initialize the agent runner with graph and tools.""" | |
logger.info("Initializing AgentRunner") | |
self.graph = agent_graph | |
self.last_state = None # Store the last state for testing/debugging | |
self.thread_id = str( | |
uuid.uuid4() | |
) # Generate a unique thread_id for this runner | |
logger.info(f"Created AgentRunner with thread_id: {self.thread_id}") | |
def _extract_answer(self, state: dict) -> str: | |
"""Extract the answer from the state.""" | |
if not state: | |
return None | |
# First try to get answer from direct answer field | |
if "answer" in state and state["answer"]: | |
logger.info(f"Found answer in direct field: {state['answer']}") | |
return state["answer"] | |
# Then try to get answer from messages | |
if "messages" in state and state["messages"]: | |
for msg in reversed(state["messages"]): | |
if hasattr(msg, "content") and msg.content: | |
logger.info(f"Found answer in message: {msg.content}") | |
return msg.content | |
return None | |
def __call__(self, input_data) -> str: | |
"""Process a question through the agent graph and return the answer. | |
Args: | |
input_data: Either a question string or a Command object for resuming | |
Returns: | |
str: The agent's response | |
""" | |
try: | |
config = {"configurable": {"thread_id": self.thread_id}} | |
logger.info(f"Using config: {config}") | |
if isinstance(input_data, str): | |
# Initial question | |
logger.info(f"Processing initial question: {input_data}") | |
initial_state = { | |
"question": input_data, | |
"messages": [], | |
"answer": None, | |
"step_logs": [], | |
"is_complete": False, | |
"step_count": 0, | |
# Initialize new memory fields | |
"context": {}, | |
"memory_buffer": [], | |
"last_action": None, | |
"action_history": [], | |
"error_count": 0, | |
"success_count": 0, | |
} | |
logger.info(f"Initial state: {initial_state}") | |
# Use stream to get interrupt information | |
logger.info("Starting graph stream for initial question") | |
for chunk in self.graph.stream(initial_state, config): | |
logger.debug(f"Received chunk: {chunk}") | |
if isinstance(chunk, dict): | |
if "__interrupt__" in chunk: | |
logger.info("Detected interrupt in stream") | |
logger.info(f"Interrupt details: {chunk['__interrupt__']}") | |
# If we hit an interrupt, resume with 'c' | |
logger.info("Resuming with 'c' command") | |
for result in self.graph.stream( | |
Command(resume="c"), config | |
): | |
logger.debug(f"Received resume result: {result}") | |
if isinstance(result, dict): | |
answer = self._extract_answer(result) | |
if answer: | |
self.last_state = result | |
return answer | |
else: | |
answer = self._extract_answer(chunk) | |
if answer: | |
self.last_state = chunk | |
return answer | |
else: | |
logger.debug(f"Skipping chunk without answer: {chunk}") | |
else: | |
# Resuming from interrupt | |
logger.info(f"Resuming from interrupt with input: {input_data}") | |
for result in self.graph.stream(input_data, config): | |
logger.debug(f"Received resume result: {result}") | |
if isinstance(result, dict): | |
answer = self._extract_answer(result) | |
if answer: | |
self.last_state = result | |
return answer | |
else: | |
logger.debug(f"Skipping result without answer: {result}") | |
# If we get here, we didn't find an answer | |
logger.warning("No answer generated from stream") | |
return "No answer generated" | |
except Exception as e: | |
logger.error(f"Error processing input: {str(e)}") | |
raise | |