sw-api / tests /structs /test_concurrent_workflow.py
patrickbdevaney's picture
v1 attempt at hf space api
ffcf62f
from concurrent.futures import Future
from unittest.mock import Mock, create_autospec, patch
from swarms.structs import Agent, ConcurrentWorkflow, Task
def test_add():
workflow = ConcurrentWorkflow(max_workers=2)
task = Mock(spec=Task)
workflow.add(task)
assert task in workflow.tasks
def test_run():
workflow = ConcurrentWorkflow(max_workers=2)
task1 = create_autospec(Task)
task2 = create_autospec(Task)
workflow.add(task1)
workflow.add(task2)
with patch(
"concurrent.futures.ThreadPoolExecutor"
) as mock_executor:
future1 = Future()
future1.set_result(None)
future2 = Future()
future2.set_result(None)
mock_executor.return_value.__enter__.return_value.submit.side_effect = [
future1,
future2,
]
mock_executor.return_value.__enter__.return_value.as_completed.return_value = [
future1,
future2,
]
workflow.run()
task1.execute.assert_called_once()
task2.execute.assert_called_once()
def test_execute_task():
workflow = ConcurrentWorkflow(max_workers=2)
task = create_autospec(Task)
workflow._execute_task(task)
task.execute.assert_called_once()
def test_agent_execution():
workflow = ConcurrentWorkflow(max_workers=2)
agent = create_autospec(Agent)
task = Task(agent)
workflow.add(task)
workflow._execute_task(task)
agent.execute.assert_called_once()