Spaces:
Sleeping
Sleeping
File size: 4,473 Bytes
025a7d5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import json
import uuid
from typing import Any, Dict, List, Optional, Union
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
Json,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.process import Process
from crewai.task import Task
from crewai.tools.agent_tools import AgentTools
class Crew(BaseModel):
"""Class that represents a group of agents, how they should work together and their tasks."""
__hash__ = object.__hash__
model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(description="List of tasks", default_factory=list)
agents: List[Agent] = Field(
description="List of agents in this crew.", default_factory=list
)
process: Process = Field(
description="Process that the crew will follow.", default=Process.sequential
)
verbose: Union[int, bool] = Field(
description="Verbose mode for the Agent Execution", default=0
)
config: Optional[Union[Json, Dict[str, Any]]] = Field(
description="Configuration of the crew.", default=None
)
cache_handler: Optional[InstanceOf[CacheHandler]] = Field(
default=CacheHandler(), description="An instance of the CacheHandler class."
)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@classmethod
@field_validator("config", mode="before")
def check_config_type(cls, v: Union[Json, Dict[str, Any]]):
if isinstance(v, Json):
return json.loads(v)
return v
@model_validator(mode="after")
def check_config(self):
if not self.config and not self.tasks and not self.agents:
raise PydanticCustomError(
"missing_keys", "Either agents and task need to be set or config.", {}
)
if self.config:
if not self.config.get("agents") or not self.config.get("tasks"):
raise PydanticCustomError(
"missing_keys_in_config", "Config should have agents and tasks", {}
)
self.agents = [Agent(**agent) for agent in self.config["agents"]]
tasks = []
for task in self.config["tasks"]:
task_agent = [agt for agt in self.agents if agt.role == task["agent"]][
0
]
del task["agent"]
tasks.append(Task(**task, agent=task_agent))
self.tasks = tasks
if self.agents:
for agent in self.agents:
agent.set_cache_handler(self.cache_handler)
return self
def kickoff(self) -> str:
"""Kickoff the crew to work on its tasks.
Returns:
Output of the crew for each task.
"""
for agent in self.agents:
agent.cache_handler = self.cache_handler
if self.process == Process.sequential:
return self.__sequential_loop()
def __sequential_loop(self) -> str:
"""Loop that executes the sequential process.
Returns:
Output of the crew.
"""
task_outcome = None
for task in self.tasks:
# Add delegation tools to the task if the agent allows it
if task.agent.allow_delegation:
tools = AgentTools(agents=self.agents).tools()
task.tools += tools
self.__log("debug", f"Working Agent: {task.agent.role}")
self.__log("info", f"Starting Task: {task.description} ...")
task_outcome = task.execute(task_outcome)
self.__log("debug", f"Task output: {task_outcome}")
return task_outcome
def __log(self, level, message):
"""Log a message"""
level_map = {"debug": 1, "info": 2}
verbose_level = (
2 if isinstance(self.verbose, bool) and self.verbose else self.verbose
)
if verbose_level and level_map[level] <= verbose_level:
print(message)
|