| """End-to-end test simulating CLI interrupt during subagent execution. |
| |
| Reproduces the exact scenario: |
| 1. Parent agent calls delegate_task |
| 2. Child agent is running (simulated with a slow tool) |
| 3. User "types a message" (simulated by calling parent.interrupt from another thread) |
| 4. Child should detect the interrupt and stop |
| |
| This tests the COMPLETE path including _run_single_child, _active_children |
| registration, interrupt propagation, and child detection. |
| """ |
|
|
| import json |
| import os |
| import queue |
| import threading |
| import time |
| import unittest |
| from unittest.mock import MagicMock, patch, PropertyMock |
|
|
| from tools.interrupt import set_interrupt, is_interrupted |
|
|
|
|
| class TestCLISubagentInterrupt(unittest.TestCase): |
| """Simulate exact CLI scenario.""" |
|
|
| def setUp(self): |
| set_interrupt(False) |
|
|
| def tearDown(self): |
| set_interrupt(False) |
|
|
| def test_full_delegate_interrupt_flow(self): |
| """Full integration: parent runs delegate_task, main thread interrupts.""" |
| from run_agent import AIAgent |
|
|
| interrupt_detected = threading.Event() |
| child_started = threading.Event() |
| child_api_call_count = 0 |
|
|
| |
| parent = AIAgent.__new__(AIAgent) |
| parent._interrupt_requested = False |
| parent._interrupt_message = None |
| parent._active_children = [] |
| parent._active_children_lock = threading.Lock() |
| parent.quiet_mode = True |
| parent.model = "test/model" |
| parent.base_url = "http://localhost:1" |
| parent.api_key = "test" |
| parent.provider = "test" |
| parent.api_mode = "chat_completions" |
| parent.platform = "cli" |
| parent.enabled_toolsets = ["terminal", "file"] |
| parent.providers_allowed = None |
| parent.providers_ignored = None |
| parent.providers_order = None |
| parent.provider_sort = None |
| parent.max_tokens = None |
| parent.reasoning_config = None |
| parent.prefill_messages = None |
| parent._session_db = None |
| parent._delegate_depth = 0 |
| parent._delegate_spinner = None |
| parent.tool_progress_callback = None |
|
|
| |
| original_children = parent._active_children |
|
|
| |
| |
| def mock_child_run_conversation(user_message, **kwargs): |
| child_started.set() |
| |
| child = parent._active_children[-1] if parent._active_children else None |
| |
| |
| for i in range(100): |
| if child and child._interrupt_requested: |
| interrupt_detected.set() |
| return { |
| "final_response": "Interrupted!", |
| "messages": [], |
| "api_calls": 1, |
| "completed": False, |
| "interrupted": True, |
| "interrupt_message": child._interrupt_message, |
| } |
| time.sleep(0.1) |
| |
| return { |
| "final_response": "Finished without interrupt", |
| "messages": [], |
| "api_calls": 5, |
| "completed": True, |
| "interrupted": False, |
| } |
|
|
| |
| from tools.delegate_tool import _run_single_child |
| from run_agent import IterationBudget |
|
|
| parent.iteration_budget = IterationBudget(max_total=100) |
|
|
| |
| delegate_result = [None] |
| delegate_error = [None] |
|
|
| def run_delegate(): |
| try: |
| with patch('run_agent.AIAgent') as MockAgent: |
| mock_instance = MagicMock() |
| mock_instance._interrupt_requested = False |
| mock_instance._interrupt_message = None |
| mock_instance._active_children = [] |
| mock_instance._active_children_lock = threading.Lock() |
| mock_instance.quiet_mode = True |
| mock_instance.run_conversation = mock_child_run_conversation |
| mock_instance.interrupt = lambda msg=None: setattr(mock_instance, '_interrupt_requested', True) or setattr(mock_instance, '_interrupt_message', msg) |
| mock_instance.tools = [] |
| MockAgent.return_value = mock_instance |
|
|
| |
| parent._active_children.append(mock_instance) |
|
|
| result = _run_single_child( |
| task_index=0, |
| goal="Do something slow", |
| child=mock_instance, |
| parent_agent=parent, |
| ) |
| delegate_result[0] = result |
| except Exception as e: |
| delegate_error[0] = e |
|
|
| agent_thread = threading.Thread(target=run_delegate, daemon=True) |
| agent_thread.start() |
|
|
| |
| assert child_started.wait(timeout=5), "Child never started!" |
|
|
| |
| time.sleep(0.2) |
| |
| print(f"Parent has {len(parent._active_children)} active children") |
| assert len(parent._active_children) >= 1, f"Expected child in _active_children, got {len(parent._active_children)}" |
|
|
| |
| parent.interrupt("Hey stop that") |
| |
| print(f"Parent._interrupt_requested: {parent._interrupt_requested}") |
| for i, child in enumerate(parent._active_children): |
| print(f"Child {i}._interrupt_requested: {child._interrupt_requested}") |
|
|
| |
| detected = interrupt_detected.wait(timeout=3.0) |
| |
| |
| agent_thread.join(timeout=5) |
|
|
| if delegate_error[0]: |
| raise delegate_error[0] |
|
|
| assert detected, "Child never detected the interrupt!" |
| result = delegate_result[0] |
| assert result is not None, "Delegate returned no result" |
| assert result["status"] == "interrupted", f"Expected 'interrupted', got '{result['status']}'" |
| print(f"✓ Interrupt detected! Result: {result}") |
|
|
|
|
| if __name__ == "__main__": |
| unittest.main() |
|
|