Spaces:
Sleeping
Sleeping
File size: 5,370 Bytes
d8d14f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Union
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger("company-swarm")
@dataclass
class Company(BaseSwarm):
"""
Represents a company with a hierarchical organizational structure.
"""
org_chart: List[List[Agent]]
shared_instructions: str = None
ceo: Optional[Agent] = None
agents: List[Agent] = field(default_factory=list)
agent_interactions: Dict[str, List[str]] = field(
default_factory=dict
)
def __post_init__(self):
self._parse_org_chart(self.org_chart)
def add(self, agent: Agent) -> None:
"""
Adds an agent to the company.
Args:
agent (Agent): The agent to be added.
Raises:
ValueError: If an agent with the same ID already exists in the company.
"""
try:
if any(
existing_agent.id == agent.id
for existing_agent in self.agents
):
raise ValueError(
f"Agent with id {agent.id} already exists in the"
" company."
)
self.agents.append(agent)
except Exception as error:
logger.error(
f"[ERROR][CLASS: Company][METHOD: add] {error}"
)
raise error
def get(self, agent_name: str) -> Agent:
"""
Retrieves an agent from the company by name.
Args:
agent_name (str): The name of the agent to retrieve.
Returns:
Agent: The retrieved agent.
Raises:
ValueError: If an agent with the specified name does not exist in the company.
"""
try:
for agent in self.agents:
if agent.name == agent_name:
return agent
raise ValueError(
f"Agent with name {agent_name} does not exist in the"
" company."
)
except Exception as error:
logger.error(
f"[ERROR][CLASS: Company][METHOD: get] {error}"
)
raise error
def remove(self, agent: Agent) -> None:
"""
Removes an agent from the company.
Args:
agent (Agent): The agent to be removed.
"""
try:
self.agents.remove(agent)
except Exception as error:
logger.error(
f"[ERROR][CLASS: Company][METHOD: remove] {error}"
)
raise error
def _parse_org_chart(
self, org_chart: Union[List[Agent], List[List[Agent]]]
) -> None:
"""
Parses the organization chart and adds agents to the company.
Args:
org_chart (Union[List[Agent], List[List[Agent]]]): The organization chart
representing the hierarchy of agents.
Raises:
ValueError: If more than one CEO is found in the org chart or if an invalid
agent is encountered.
"""
try:
for node in org_chart:
if isinstance(node, Agent):
if self.ceo:
raise ValueError("1 CEO is only allowed")
self.ceo = node
self.add(node)
elif isinstance(node, list):
for agent in node:
if not isinstance(agent, Agent):
raise ValueError(
"Invalid agent in org chart"
)
self.add(agent)
for i, agent in enumerate(node):
if i == len(node) - 1:
continue
for other_agent in node[i + 1]:
self.__init_task(agent, other_agent)
except Exception as error:
logger.error(
"[ERROR][CLASS: Company][METHOD: _parse_org_chart]"
f" {error}"
)
raise error
def _init_interaction(
self,
agent1: Agent,
agent2: Agent,
) -> None:
"""
Initializes the interaction between two agents.
Args:
agent1 (Agent): The first agent involved in the interaction.
agent2 (Agent): The second agent involved in the interaction.
Returns:
None
"""
if agent1.ai_name not in self.agents_interactions:
self.agents_interactions[agent1.ai_name] = []
self.agents_interactions[agent1.ai_name].append(
agent2.ai_name
)
def run(self):
"""
Run the company
"""
for (
agent_name,
interaction_agents,
) in self.agents_interactions.items():
agent = self.get(agent_name)
for interaction_agent in interaction_agents:
task_description = (
f"Task for {agent_name} to interact with"
f" {interaction_agent}"
)
print(f"{task_description} is being executed")
agent.run(task_description)
|