Sami Marreed
feat: docker-v1 with optimized frontend
0646b18
import json
import uuid
from typing import Literal, Optional, Dict, Callable
from langchain_core.messages import HumanMessage, ToolCall, BaseMessage
from loguru import logger
from cuga.backend.activity_tracker.tracker import ActivityTracker, Step
from cuga.backend.cuga_graph.nodes.shared.base_agent import create_partial
from cuga.backend.cuga_graph.nodes.chat.chat_agent.chat_agent import ChatAgent
from cuga.backend.cuga_graph.nodes.shared.base_node import BaseNode
from cuga.backend.cuga_graph.nodes.human_in_the_loop.followup_model import (
create_flow_approve,
create_new_flow_approve,
)
from cuga.backend.cuga_graph.state.agent_state import AgentState
from cuga.backend.cuga_graph.utils.nodes_names import NodeNames, ActionIds
from langgraph.types import Command
from cuga.config import settings
tracker = ActivityTracker()
ENABLE_SAVE_REUSE = settings.features.save_reuse
class ChatHumanInTheLoopHandler:
"""Handler for chat-specific human-in-the-loop interactions"""
def __init__(self):
self._action_handlers: Dict[str, Callable] = {
ActionIds.FLOW_APPROVE: self._handle_tool_execute,
# Add chat-specific action handlers here
# Example: ActionIds.TOOL_EXECUTE: self._handle_tool_execute,
# ActionIds.CHAT_CONTINUE: self._handle_chat_continue,
}
def handle_human_response(self, state: AgentState, node_name: str) -> Command:
"""Handle any human response based on action_id"""
action_id = state.hitl_response.action_id
if action_id in self._action_handlers:
return self._action_handlers[action_id](state, node_name)
# Default fallback for chat - continue to final answer
return Command(update=state.model_dump(), goto=NodeNames.FINAL_ANSWER_AGENT)
def add_action_handler(self, action_id: str, handler: Callable):
"""Add a custom action handler"""
self._action_handlers[action_id] = handler
def _handle_tool_execute(self, state: AgentState, node_name: str) -> Command:
"""Handle tool execution approval"""
state.sender = node_name
return Command(update=state.model_dump(), goto=NodeNames.WAIT_FOR_RESPONSE)
def _handle_chat_continue(self, state: AgentState, node_name: str) -> Command:
"""Handle continuing chat conversation"""
state.sender = node_name
return Command(update=state.model_dump(), goto=NodeNames.FINAL_ANSWER_AGENT)
class ChatNode(BaseNode):
def __init__(self):
super().__init__()
self.chat_agent: Optional[ChatAgent] = None
self.hitl_handler = ChatHumanInTheLoopHandler()
self._initialized = False
@classmethod
async def create(cls):
"""Factory method to create and initialize the class"""
instance = cls()
instance.chat_agent = ChatAgent()
if settings.features.chat:
await instance.chat_agent.setup()
instance.node = create_partial(
ChatNode.node_handler,
agent=instance.chat_agent,
hitl_handler=instance.hitl_handler,
name=instance.chat_agent.name,
)
instance._initialized = True
return instance
@staticmethod
def format_function_call(func_dict):
name = func_dict["name"]
args = func_dict["args"]
def format_value(v):
if isinstance(v, str):
return f"'{v}'"
elif isinstance(v, (list, dict)):
return repr(v)
else:
return str(v)
arg_strings = [f"{k}={format_value(v)}" for k, v in args.items()]
return f"{name}({', '.join(arg_strings)})"
@staticmethod
async def node_handler(
state: AgentState, agent: ChatAgent, hitl_handler: ChatHumanInTheLoopHandler, name: str
) -> Command[Literal["FinalAnswerAgent", "TaskAnalyzerAgent", "SuggestHumanActions"]]:
# Handle human-in-the-loop responses
if (
state.sender == NodeNames.WAIT_FOR_RESPONSE
and state.hitl_response.action_id == ActionIds.FLOW_APPROVE
):
tool = ToolCall(**state.hitl_response.additional_data.tool)
res = await agent.execute_tool(tool)
parsed_result = res
if isinstance(res, str):
try:
parsed_result = json.loads(res)
except (json.JSONDecodeError, TypeError):
# If parsing fails, keep original string
parsed_result = res
# Get tool details
tool_name = tool.get("name")
tool_args = tool.get("args")
# Add to variable manager
var_name = f"tool_result_{str(uuid.uuid4())[:5]}"
state.variables_manager.add_variable(
parsed_result, var_name, f"Result of tool {tool_name} with args {tool_args}"
)
state.sender = "ChatAgentTool"
state.last_planner_answer = state.variables_manager.present_variable(var_name)
return Command(update=state.model_dump(), goto=NodeNames.FINAL_ANSWER_AGENT)
if (
state.sender == NodeNames.WAIT_FOR_RESPONSE
and state.hitl_response.action_id == ActionIds.NEW_FLOW_APPROVE
):
logger.debug("tool call in chat node")
tool = ToolCall(**state.hitl_response.additional_data.tool)
state.input = tool.get("args").get("user_task")
state.sender = "ChatAgent"
return Command(update=state.model_dump(), goto=NodeNames.TASK_ANALYZER_AGENT)
# If chat feature is disabled, go directly to task analyzer
if not settings.features.chat:
state.sender = name
return Command(update=state.model_dump(), goto=NodeNames.TASK_ANALYZER_AGENT)
# Process chat input
state.sender = name
state.chat_agent_messages.append(HumanMessage(content=state.input))
res: BaseMessage = await agent.invoke(state.chat_agent_messages, state)
state.chat_agent_messages.append(res)
# Handle tool calls - require human approval
if ENABLE_SAVE_REUSE and res.tool_calls and res.tool_calls[0].get("name") == "run_new_flow":
state.final_answer = state.chat_agent_messages[-1].content
state.sender = name
state.hitl_action = create_new_flow_approve(tool=res.tool_calls[0])
return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS)
if ENABLE_SAVE_REUSE and res.tool_calls:
state.final_answer = state.chat_agent_messages[-1].content
state.sender = name
state.hitl_action = create_flow_approve(tool=res.tool_calls[0])
return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS)
if (
not ENABLE_SAVE_REUSE
and res.tool_calls
and len(res.tool_calls) > 0
and res.tool_calls[0].get("name") == "execute_task"
):
logger.debug(f"tool call in chat node {res.tool_calls[0]}")
variables_rel = res.tool_calls[0].get("args").get("relevant_variables")
if variables_rel and len(variables_rel) > 0:
state.input = (
f"task: {res.tool_calls[0].get('args').get('task')}"
+ f"\n relevant variables from history: {res.tool_calls[0].get('args').get('relevant_variables')}"
)
else:
state.input = res.tool_calls[0].get("args").get("task")
return Command(update=state.model_dump(), goto="TaskAnalyzerAgent")
# Regular chat response - add to messages and continue+
res.content = state.variables_manager.replace_variables_placeholders(res.content)
state.messages.append(res)
tracker.collect_step(
step=Step(
name=name,
data=res.content,
current_url=state.url,
)
)
state.final_answer = state.chat_agent_messages[-1].content
return Command(update=state.model_dump(), goto=NodeNames.FINAL_ANSWER_AGENT)