File size: 4,473 Bytes
025a7d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import json
import uuid
from typing import Any, Dict, List, Optional, Union

from pydantic import (
    UUID4,
    BaseModel,
    ConfigDict,
    Field,
    InstanceOf,
    Json,
    field_validator,
    model_validator,
)
from pydantic_core import PydanticCustomError

from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.process import Process
from crewai.task import Task
from crewai.tools.agent_tools import AgentTools


class Crew(BaseModel):
    """Class that represents a group of agents, how they should work together and their tasks."""

    __hash__ = object.__hash__
    model_config = ConfigDict(arbitrary_types_allowed=True)
    tasks: List[Task] = Field(description="List of tasks", default_factory=list)
    agents: List[Agent] = Field(
        description="List of agents in this crew.", default_factory=list
    )
    process: Process = Field(
        description="Process that the crew will follow.", default=Process.sequential
    )
    verbose: Union[int, bool] = Field(
        description="Verbose mode for the Agent Execution", default=0
    )
    config: Optional[Union[Json, Dict[str, Any]]] = Field(
        description="Configuration of the crew.", default=None
    )
    cache_handler: Optional[InstanceOf[CacheHandler]] = Field(
        default=CacheHandler(), description="An instance of the CacheHandler class."
    )
    id: UUID4 = Field(
        default_factory=uuid.uuid4,
        frozen=True,
        description="Unique identifier for the object, not set by user.",
    )

    @field_validator("id", mode="before")
    @classmethod
    def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
        if v:
            raise PydanticCustomError(
                "may_not_set_field", "This field is not to be set by the user.", {}
            )

    @classmethod
    @field_validator("config", mode="before")
    def check_config_type(cls, v: Union[Json, Dict[str, Any]]):
        if isinstance(v, Json):
            return json.loads(v)
        return v

    @model_validator(mode="after")
    def check_config(self):
        if not self.config and not self.tasks and not self.agents:
            raise PydanticCustomError(
                "missing_keys", "Either agents and task need to be set or config.", {}
            )

        if self.config:
            if not self.config.get("agents") or not self.config.get("tasks"):
                raise PydanticCustomError(
                    "missing_keys_in_config", "Config should have agents and tasks", {}
                )

            self.agents = [Agent(**agent) for agent in self.config["agents"]]

            tasks = []
            for task in self.config["tasks"]:
                task_agent = [agt for agt in self.agents if agt.role == task["agent"]][
                    0
                ]
                del task["agent"]
                tasks.append(Task(**task, agent=task_agent))

            self.tasks = tasks

        if self.agents:
            for agent in self.agents:
                agent.set_cache_handler(self.cache_handler)
        return self

    def kickoff(self) -> str:
        """Kickoff the crew to work on its tasks.

        Returns:
            Output of the crew for each task.
        """
        for agent in self.agents:
            agent.cache_handler = self.cache_handler

        if self.process == Process.sequential:
            return self.__sequential_loop()

    def __sequential_loop(self) -> str:
        """Loop that executes the sequential process.

        Returns:
            Output of the crew.
        """
        task_outcome = None
        for task in self.tasks:
            # Add delegation tools to the task if the agent allows it
            if task.agent.allow_delegation:
                tools = AgentTools(agents=self.agents).tools()
                task.tools += tools

            self.__log("debug", f"Working Agent: {task.agent.role}")
            self.__log("info", f"Starting Task: {task.description} ...")

            task_outcome = task.execute(task_outcome)

            self.__log("debug", f"Task output: {task_outcome}")

        return task_outcome

    def __log(self, level, message):
        """Log a message"""
        level_map = {"debug": 1, "info": 2}
        verbose_level = (
            2 if isinstance(self.verbose, bool) and self.verbose else self.verbose
        )
        if verbose_level and level_map[level] <= verbose_level:
            print(message)