Spaces:
Running
Running
| import json | |
| import re | |
| from typing import Literal | |
| from cuga.backend.activity_tracker.tracker import ActivityTracker, Step | |
| from cuga.backend.cuga_graph.nodes.api.api_planner_agent.api_planner_agent import APIPlannerAgent | |
| from cuga.backend.cuga_graph.nodes.api.api_planner_agent.prompts.load_prompt import ( | |
| APIPlannerOutput, | |
| ActionName, | |
| APIPlannerInput, | |
| ) | |
| from cuga.backend.cuga_graph.nodes.api.shortlister_agent.shortlister_agent import ShortlisterAgent | |
| from cuga.backend.cuga_graph.nodes.shared.base_agent import create_partial | |
| from cuga.backend.cuga_graph.nodes.shared.base_node import BaseNode | |
| from cuga.backend.cuga_graph.state.agent_state import AgentState, SubTaskHistory | |
| from langgraph.types import Command | |
| from cuga.backend.cuga_graph.state.api_planner_history import HistoricalAction | |
| from loguru import logger | |
| from cuga.backend.tools_env.registry.utils.api_utils import get_apis | |
| from langchain_core.tools import tool | |
| from cuga.backend.llm.models import LLMManager | |
| from cuga.config import settings | |
| from cuga.configurations.instructions_manager import InstructionsManager | |
| from cuga.backend.cuga_graph.nodes.api.tasks.reflection import reflection_task | |
| from cuga.backend.cuga_graph.nodes.human_in_the_loop.followup_model import ( | |
| FollowUpAction, | |
| ActionType, | |
| ) | |
| from cuga.backend.cuga_graph.utils.nodes_names import NodeNames, ActionIds | |
| instructions_manager = InstructionsManager() | |
| tracker = ActivityTracker() | |
| llm_manager = LLMManager() | |
| if settings.advanced_features.enable_fact: | |
| from cuga.backend.memory.memory import Memory | |
| memory = Memory() | |
| # --- Minimal tolerant planner parser (handles double-encoded JSON, code fences, minor key typos) --- | |
| def _parse_planner_output_or_raise(raw: str) -> APIPlannerOutput: | |
| """ | |
| Robust to: | |
| - plain JSON object | |
| - double-encoded JSON (a JSON string containing JSON) | |
| - code fences (```json ... ```) or extra text around JSON | |
| Pure parsing retries only; does not re-ask the LLM. | |
| """ | |
| s = (raw or "").strip() | |
| # Strip code fences if present | |
| if s.startswith("```"): | |
| s = re.sub(r"^```[a-zA-Z]*\n?", "", s) | |
| s = re.sub(r"\n?```$", "", s).strip() | |
| last_err = None | |
| for _ in range(3): | |
| try: | |
| obj = json.loads(s) | |
| except Exception as e: | |
| last_err = e | |
| # Try to slice the outermost {...} | |
| first, last = s.find("{"), s.rfind("}") | |
| if first != -1 and last > first: | |
| s = s[first : last + 1].strip() | |
| continue | |
| break | |
| # If first loads produced a JSON string, decode again (double-encoded case) | |
| if isinstance(obj, str) and obj.strip().startswith("{"): | |
| s = obj.strip() | |
| continue | |
| return APIPlannerOutput(**obj) | |
| raise last_err or ValueError("Planner output could not be parsed") | |
| def think(thought: str): | |
| """ | |
| Use this tool to reflect and reason strategically. | |
| :param thought: | |
| :return: | |
| """ | |
| return thought | |
| class ApiPlanner(BaseNode): | |
| def __init__(self, router_agent: APIPlannerAgent): | |
| super().__init__() | |
| self.name = router_agent.name | |
| self.guidance = reflection_task(llm=llm_manager.get_model(settings.agent.planner.model)) | |
| self.agent = router_agent | |
| self.node = create_partial( | |
| ApiPlanner.node_handler, | |
| agent=self.agent, | |
| strategic_agent=self.guidance, | |
| name=self.name, | |
| ) | |
| def collect_history(state: AgentState, action: str, step: APIPlannerInput): | |
| obj = HistoricalAction(action_taken=action, input_to_agent=step, agent_output=None) | |
| state.api_planner_history.append(obj) | |
| def should_use_fast_mode_early(state: AgentState) -> bool: | |
| """Determine if fast mode (CugaLite) should be used before any LLM calls. | |
| Args: | |
| state: Current agent state | |
| Returns: | |
| True if fast mode should be used | |
| """ | |
| # Use state lite_mode if set, otherwise fallback to settings | |
| lite_mode = state.lite_mode if state.lite_mode is not None else settings.advanced_features.lite_mode | |
| if lite_mode and settings.advanced_features.mode in ['api', 'hybrid']: | |
| logger.info( | |
| f"Fast mode enabled (state={state.lite_mode}, settings={settings.advanced_features.lite_mode}) and mode is API or Hybrid - routing to CugaLite from APIPlannerAgent" | |
| ) | |
| return True | |
| return False | |
| async def count_tools_for_app(app_name: str) -> int: | |
| """Count total number of tools for a specific app. | |
| Args: | |
| app_name: Name of the app to count tools for | |
| Returns: | |
| Total number of tools for the specified app | |
| """ | |
| try: | |
| apis = await get_apis(app_name) | |
| if apis: | |
| return len(apis.keys()) | |
| return 0 | |
| except Exception as e: | |
| logger.debug(f"Could not count tools for app {app_name}: {e}") | |
| return 0 | |
| async def node_handler( | |
| state: AgentState, agent: APIPlannerAgent, strategic_agent, name: str | |
| ) -> Command[ | |
| Literal[ | |
| 'APICodePlannerAgent', | |
| 'ShortlisterAgent', | |
| 'PlanControllerAgent', | |
| 'SuggestHumanActions', | |
| 'CugaLite', | |
| ] | |
| ]: | |
| # Check fast mode early to skip LLM calls | |
| if ApiPlanner.should_use_fast_mode_early(state): | |
| logger.info("Fast mode enabled - checking tool threshold for current app") | |
| # Get current app from state.sub_task_app (API planner assumes single app) | |
| if state.sub_task_app: | |
| current_app_name = state.sub_task_app | |
| tool_count = await ApiPlanner.count_tools_for_app(current_app_name) | |
| threshold = settings.advanced_features.lite_mode_tool_threshold | |
| logger.info(f"Current app '{current_app_name}' tools: {tool_count}, Threshold: {threshold}") | |
| if tool_count < threshold: | |
| logger.info( | |
| f"Tool count ({tool_count}) below threshold ({threshold}) - routing to CugaLite" | |
| ) | |
| logger.info(f"APIPlannerAgent routing with state.sub_task: {state.sub_task}") | |
| logger.info(f"APIPlannerAgent routing with state.sub_task_app: {state.sub_task_app}") | |
| return Command(update=state.model_dump(), goto="CugaLite") | |
| # Handle human consultation response (only if HITL is enabled) | |
| if settings.advanced_features.api_planner_hitl: | |
| if state.sender == NodeNames.WAIT_FOR_RESPONSE and state.hitl_response: | |
| if state.hitl_response.action_id == ActionIds.CONSULT_WITH_HUMAN: | |
| human_response = ( | |
| state.hitl_response.text_response | |
| or state.hitl_response.selected_values | |
| or "No response provided" | |
| ) | |
| consultation_record = { | |
| "question": state.api_planner_human_consultations[-1].get("question", "") | |
| if state.api_planner_human_consultations | |
| else "", | |
| "response": human_response, | |
| "timestamp": state.hitl_response.timestamp, | |
| } | |
| state.api_planner_human_consultations.append(consultation_record) | |
| logger.debug(f"Human consultation response received: {human_response}") | |
| state.sender = name | |
| if settings.advanced_features.enable_fact: | |
| logger.info("Retrieving facts stored in memory") | |
| filters = { | |
| "user_id": state.user_id, | |
| } | |
| retrieved_facts = memory.search_for_facts( | |
| namespace_id='memory', query=state.input, filters=filters | |
| ) | |
| if retrieved_facts: | |
| for fact in retrieved_facts: | |
| if "variable_name" in fact.content: | |
| mem_dict = json.loads(fact.content) | |
| state.variables_manager.add_variable( | |
| name=mem_dict.get("variable_name"), | |
| description=mem_dict.get("description", ""), | |
| value=mem_dict.get("value"), | |
| ) | |
| # First time visit | |
| if ( | |
| state.api_last_step | |
| and state.api_last_step == ActionName.CODER_AGENT | |
| and settings.features.code_output_reflection | |
| ): | |
| res_2 = await strategic_agent.ainvoke( | |
| { | |
| "instructions": instructions_manager.get_instructions("api_reflection"), | |
| "current_task": state.sub_task, | |
| "agent_history": str(state.api_planner_history), | |
| "shortlister_agent_output": "N/A", # This would need to be populated from actual shortlister output | |
| "coder_agent_output": f"Variables history: {state.variables_manager.get_variables_summary(last_n=5)}\n\nUser information ( User already logged in ): {state.pi}\n\nCurrent datetime: {tracker.current_date}", | |
| } | |
| ) | |
| summary = res_2.content | |
| state.guidance = summary | |
| tracker.collect_step(step=Step(name=name, data=summary)) | |
| logger.debug(f"Guidance:\n{summary}") | |
| res = await agent.run(state) | |
| state.guidance = None | |
| state.messages.append(res) | |
| try: | |
| res = APIPlannerOutput(**json.loads(res.content)) | |
| except Exception as e1: | |
| logger.warning(f"Strict parse failed: {e1}; trying tolerant parse...") | |
| res = _parse_planner_output_or_raise(res.content) | |
| tracker.collect_step(step=Step(name=name, data=res.model_dump_json())) | |
| logger.debug("api_planner output:\n {}".format(res.model_dump_json(indent=4))) | |
| if res.action == ActionName.CODER_AGENT: | |
| state.api_last_step = ActionName.CODER_AGENT | |
| logger.debug("Current task is: code") | |
| state.coder_task = res.action_input_coder_agent.task_description | |
| state.coder_variables = res.action_input_coder_agent.context_variables_from_history | |
| state.coder_relevant_apis = res.action_input_coder_agent.relevant_apis | |
| state.api_shortlister_planner_filtered_apis = json.dumps( | |
| ShortlisterAgent.filter_by_api_names( | |
| state.api_shortlister_all_filtered_apis, | |
| [api.api_name for api in res.action_input_coder_agent.relevant_apis], | |
| ), | |
| indent=2, | |
| ) | |
| ApiPlanner.collect_history( | |
| state=state, action=res.action.value, step=res.action_input_coder_agent | |
| ) | |
| return Command(update=state.model_dump(), goto="APICodePlannerAgent") | |
| if res.action == ActionName.API_FILTERING_AGENT: | |
| state.api_last_step = ActionName.API_FILTERING_AGENT | |
| logger.debug("Current task is: shortlisting") | |
| ApiPlanner.collect_history( | |
| state=state, action=res.action.value, step=res.action_input_shortlisting_agent | |
| ) | |
| state.shortlister_relevant_apps = [res.action_input_shortlisting_agent.app_name] | |
| state.shortlister_query = f"**Input task**: {res.action_input_shortlisting_agent.task_description}\n\nTask context:{state.sub_task}" | |
| logger.debug(state.model_dump()) | |
| return Command(update=state.model_dump(), goto="ShortlisterAgent") | |
| if res.action == ActionName.CONCLUDE_TASK: | |
| state.api_last_step = ActionName.CONCLUDE_TASK | |
| state.guidance = None | |
| logger.debug("Current task is: conclude") | |
| ApiPlanner.collect_history( | |
| state=state, action=res.action.value, step=res.action_input_conclude_task | |
| ) | |
| state.stm_all_history.append( | |
| SubTaskHistory( | |
| sub_task=state.format_subtask(), | |
| steps=[], | |
| final_answer=res.action_input_conclude_task.final_response, | |
| ) | |
| ) | |
| state.last_planner_answer = res.action_input_conclude_task.final_response | |
| state.sender = "APIPlannerAgent" | |
| return Command(update=state.model_dump(), goto="PlanControllerAgent") | |
| if settings.advanced_features.api_planner_hitl and res.action == ActionName.CONSULT_WITH_HUMAN: | |
| state.api_last_step = ActionName.CONSULT_WITH_HUMAN | |
| logger.debug("Current task is: consult with human") | |
| ApiPlanner.collect_history( | |
| state=state, action=res.action.value, step=res.action_input_consult_with_human | |
| ) | |
| consultation_input = { | |
| "question": res.action_input_consult_with_human.question, | |
| "context": res.action_input_consult_with_human.context, | |
| "suggested_options": res.action_input_consult_with_human.suggested_options, | |
| } | |
| state.api_planner_human_consultations.append(consultation_input) | |
| options = None | |
| action_type = ActionType.NATURAL_LANGUAGE | |
| if res.action_input_consult_with_human.suggested_options: | |
| from cuga.backend.cuga_graph.nodes.human_in_the_loop.followup_model import ( | |
| SelectOption, | |
| ) | |
| action_type = ActionType.SELECT | |
| options = [ | |
| SelectOption(value=opt, label=opt) | |
| for opt in res.action_input_consult_with_human.suggested_options | |
| ] | |
| state.hitl_action = FollowUpAction( | |
| action_id=ActionIds.CONSULT_WITH_HUMAN, | |
| action_name="Human Consultation", | |
| description=res.action_input_consult_with_human.question, | |
| type=action_type, | |
| callback_url="/consult", | |
| placeholder="Please provide your response...", | |
| options=options, | |
| ) | |
| state.sender = name | |
| return Command(update=state.model_dump(), goto="SuggestHumanActions") | |
| return Command(update=state.model_dump(), goto="APICodePlannerAgent") | |
| # state.api_planner_codeagent_filtered_schemas_plan = res.content | |
| # return state | |