Spaces:
Sleeping
Sleeping
File size: 6,138 Bytes
d8d14f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# Concurrent Agents API Reference
This documentation covers the API for running multiple agents concurrently using various execution strategies. The implementation uses `asyncio` with `uvloop` for enhanced performance and `ThreadPoolExecutor` for handling CPU-bound operations.
## Table of Contents
- [Core Functions](#core-functions)
- [Advanced Functions](#advanced-functions)
- [Utility Functions](#utility-functions)
- [Resource Monitoring](#resource-monitoring)
- [Usage Examples](#usage-examples)
## Core Functions
### run_agents_concurrently()
Primary function for running multiple agents concurrently with optimized performance using both uvloop and ThreadPoolExecutor.
#### Arguments
| Parameter | Type | Required | Default | Description |
|-------------|----------------|----------|----------------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances to run concurrently |
| task | str | Yes | - | Task string to execute |
| batch_size | int | No | CPU count | Number of agents to run in parallel in each batch |
| max_workers | int | No | CPU count * 2 | Maximum number of threads in the executor |
#### Returns
`List[Any]`: List of outputs from each agent
#### Flow Diagram
```mermaid
graph TD
A[Start] --> B[Initialize ThreadPoolExecutor]
B --> C[Split Agents into Batches]
C --> D[Process Batch]
D --> E{More Batches?}
E -->|Yes| D
E -->|No| F[Combine Results]
F --> G[Return Results]
subgraph "Batch Processing"
D --> H[Run Agents Async]
H --> I[Wait for Completion]
I --> J[Collect Batch Results]
end
```
### run_agents_sequentially()
Runs multiple agents sequentially for baseline comparison or simple use cases.
#### Arguments
| Parameter | Type | Required | Default | Description |
|-----------|----------------|----------|---------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances to run |
| task | str | Yes | - | Task string to execute |
#### Returns
`List[Any]`: List of outputs from each agent
## Advanced Functions
### run_agents_with_different_tasks()
Runs multiple agents with different tasks concurrently.
#### Arguments
| Parameter | Type | Required | Default | Description |
|-----------------|-------------------------------|----------|----------------|-------------|
| agent_task_pairs| List[tuple[AgentType, str]] | Yes | - | List of (agent, task) tuples |
| batch_size | int | No | CPU count | Number of agents to run in parallel |
| max_workers | int | No | CPU count * 2 | Maximum number of threads |
### run_agents_with_timeout()
Runs multiple agents concurrently with timeout limits.
#### Arguments
| Parameter | Type | Required | Default | Description |
|-------------|----------------|----------|----------------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances |
| task | str | Yes | - | Task string to execute |
| timeout | float | Yes | - | Timeout in seconds for each agent |
| batch_size | int | No | CPU count | Number of agents to run in parallel |
| max_workers | int | No | CPU count * 2 | Maximum number of threads |
## Usage Examples
```python
from swarms import Agent, run_agents_concurrently, run_agents_with_timeout, run_agents_with_different_tasks
from swarm_models import OpenAIChat
model = OpenAIChat(
model_name="gpt-4o-mini",
temperature=0.0
)
# Initialize agents
agents = [
Agent(
agent_name=f"Analysis-Agent-{i}",
system_prompt="You are a financial analysis expert",
llm=model,
max_loops=1
)
for i in range(5)
]
# Basic concurrent execution
task = "Analyze the impact of rising interest rates on tech stocks"
outputs = run_agents_concurrently(agents, task)
# Running with timeout
outputs_with_timeout = run_agents_with_timeout(
agents=agents,
task=task,
timeout=30.0,
batch_size=2
)
# Running different tasks
task_pairs = [
(agents[0], "Analyze tech stocks"),
(agents[1], "Analyze energy stocks"),
(agents[2], "Analyze retail stocks")
]
different_outputs = run_agents_with_different_tasks(task_pairs)
```
## Resource Monitoring
### ResourceMetrics
A dataclass for system resource metrics.
#### Properties
| Property | Type | Description |
|----------------|-------|-------------|
| cpu_percent | float | Current CPU usage percentage |
| memory_percent | float | Current memory usage percentage |
| active_threads | int | Number of active threads |
### run_agents_with_resource_monitoring()
Runs agents with system resource monitoring and adaptive batch sizing.
#### Arguments
| Parameter | Type | Required | Default | Description |
|------------------|----------------|----------|---------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances |
| task | str | Yes | - | Task string to execute |
| cpu_threshold | float | No | 90.0 | Max CPU usage percentage |
| memory_threshold | float | No | 90.0 | Max memory usage percentage |
| check_interval | float | No | 1.0 | Resource check interval in seconds |
## Performance Considerations
- All functions are decorated with `@profile_func` for performance monitoring
- Default batch sizes and worker counts are optimized based on CPU cores
- Resource monitoring helps prevent system overload
- Using `uvloop` provides better performance than standard `asyncio`
## Error Handling
- Functions handle asyncio event loop creation/retrieval
- Timeout mechanism prevents infinite waiting
- Resource monitoring allows for adaptive performance adjustment |