|
import json |
|
import uuid |
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
from pydantic_core import PydanticCustomError |
|
from crewai.agents.cache import CacheHandler |
|
from crewai.tools.agent_tools import AgentTools |
|
from typing import Any, List, Optional |
|
from langchain.agents.format_scratchpad import format_log_to_str |
|
from langchain_openai import ChatOpenAI |
|
from langchain.memory import ConversationSummaryMemory |
|
from langchain.tools.render import render_text_description |
|
from langchain_core.runnables.config import RunnableConfig |
|
|
|
|
|
|
|
from pydantic import ( |
|
UUID4, |
|
BaseModel, |
|
ConfigDict, |
|
Field, |
|
InstanceOf, |
|
Json, |
|
field_validator, |
|
model_validator, |
|
) |
|
|
|
from crewai.agents import ( |
|
CacheHandler, |
|
CrewAgentExecutor, |
|
CrewAgentOutputParser, |
|
ToolsHandler, |
|
) |
|
class Gmix(BaseModel): |
|
"""Class that represents a group of agents, how they should work together and their tasks.""" |
|
|
|
__hash__ = object.__hash__ |
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
tasks: List[Task] = Field(description="List of tasks", default_factory=list) |
|
agents: List[Agent] = Field( |
|
description="List of agents in this crew.", default_factory=list |
|
) |
|
process: Process = Field( |
|
description="Process that the crew will follow.", default=Process.sequential |
|
) |
|
verbose: Union[int, bool] = Field( |
|
description="Verbose mode for the Agent Execution", default=0 |
|
) |
|
config: Optional[Union[Json, Dict[str, Any]]] = Field( |
|
description="Configuration of the crew.", default=None |
|
) |
|
cache_handler: Optional[InstanceOf[CacheHandler]] = Field( |
|
default=CacheHandler(), description="An instance of the CacheHandler class." |
|
) |
|
id: UUID4 = Field( |
|
default_factory=uuid.uuid4, |
|
frozen=True, |
|
description="Unique identifier for the object, not set by user.", |
|
) |
|
|
|
@field_validator("id", mode="before") |
|
@classmethod |
|
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: |
|
if v: |
|
raise PydanticCustomError( |
|
"may_not_set_field", "This field is not to be set by the user.", {} |
|
) |
|
|
|
@classmethod |
|
@field_validator("config", mode="before") |
|
def check_config_type(cls, v: Union[Json, Dict[str, Any]]): |
|
if isinstance(v, Json): |
|
return json.loads(v) |
|
return v |
|
|
|
@model_validator(mode="after") |
|
def check_config(self): |
|
if not self.config and not self.tasks and not self.agents: |
|
raise PydanticCustomError( |
|
"missing_keys", "Either agents and task need to be set or config.", {} |
|
) |
|
|
|
if self.config: |
|
if not self.config.get("agents") or not self.config.get("tasks"): |
|
raise PydanticCustomError( |
|
"missing_keys_in_config", "Config should have agents and tasks", {} |
|
) |
|
|
|
self.agents = [Agent(**agent) for agent in self.config["agents"]] |
|
|
|
tasks = [] |
|
for task in self.config["tasks"]: |
|
task_agent = [agt for agt in self.agents if agt.role == task["agent"]][ |
|
0 |
|
] |
|
del task["agent"] |
|
tasks.append(Task(**task, agent=task_agent)) |
|
|
|
self.tasks = tasks |
|
|
|
if self.agents: |
|
for agent in self.agents: |
|
agent.set_cache_handler(self.cache_handler) |
|
return self |
|
|
|
def kickoff(self) -> str: |
|
"""Kickoff the crew to work on its tasks. |
|
|
|
Returns: |
|
Output of the crew for each task. |
|
""" |
|
for agent in self.agents: |
|
agent.cache_handler = self.cache_handler |
|
|
|
if self.process == Process.sequential: |
|
return self.__sequential_loop() |
|
|
|
def __sequential_loop(self) -> str: |
|
"""Loop that executes the sequential process. |
|
|
|
Returns: |
|
Output of the crew. |
|
""" |
|
task_outcome = None |
|
for task in self.tasks: |
|
|
|
if task.agent.allow_delegation: |
|
tools = AgentTools(agents=self.agents).tools() |
|
task.tools += tools |
|
|
|
self.__log("debug", f"Working Agent: {task.agent.role}") |
|
self.__log("info", f"Starting Task: {task.description} ...") |
|
|
|
task_outcome = task.execute(task_outcome) |
|
|
|
self.__log("debug", f"Task output: {task_outcome}") |
|
|
|
return task_outcome |
|
|
|
def __log(self, level, message): |
|
"""Log a message""" |
|
level_map = {"debug": 1, "info": 2} |
|
verbose_level = ( |
|
2 if isinstance(self.verbose, bool) and self.verbose else self.verbose |
|
) |
|
if verbose_level and level_map[level] <= verbose_level: |
|
print(message) |
|
|
|
|
|
class Agent(BaseModel): |
|
"""Represents an agent in a system. |
|
|
|
Each agent has a role, a goal, a backstory, and an optional language model (llm). |
|
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents. |
|
|
|
Attributes: |
|
agent_executor: An instance of the CrewAgentExecutor class. |
|
role: The role of the agent. |
|
goal: The objective of the agent. |
|
backstory: The backstory of the agent. |
|
llm: The language model that will run the agent. |
|
memory: Whether the agent should have memory or not. |
|
verbose: Whether the agent execution should be in verbose mode. |
|
allow_delegation: Whether the agent is allowed to delegate tasks to other agents. |
|
""" |
|
|
|
__hash__ = object.__hash__ |
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
id: UUID4 = Field( |
|
default_factory=uuid.uuid4, |
|
frozen=True, |
|
description="Unique identifier for the object, not set by user.", |
|
) |
|
role: str = Field(description="Role of the agent") |
|
goal: str = Field(description="Objective of the agent") |
|
backstory: str = Field(description="Backstory of the agent") |
|
llm: Optional[Any] = Field( |
|
default_factory=lambda: ChatOpenAI( |
|
temperature=0.7, |
|
model_name="gpt-4", |
|
), |
|
description="Language model that will run the agent.", |
|
) |
|
memory: bool = Field( |
|
default=True, description="Whether the agent should have memory or not" |
|
) |
|
verbose: bool = Field( |
|
default=False, description="Verbose mode for the Agent Execution" |
|
) |
|
allow_delegation: bool = Field( |
|
default=True, description="Allow delegation of tasks to agents" |
|
) |
|
tools: List[Any] = Field( |
|
default_factory=list, description="Tools at agents disposal" |
|
) |
|
agent_executor: Optional[InstanceOf[CrewAgentExecutor]] = Field( |
|
default=None, description="An instance of the CrewAgentExecutor class." |
|
) |
|
tools_handler: Optional[InstanceOf[ToolsHandler]] = Field( |
|
default=None, description="An instance of the ToolsHandler class." |
|
) |
|
cache_handler: Optional[InstanceOf[CacheHandler]] = Field( |
|
default=CacheHandler(), description="An instance of the CacheHandler class." |
|
) |
|
|
|
@field_validator("id", mode="before") |
|
@classmethod |
|
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: |
|
if v: |
|
raise PydanticCustomError( |
|
"may_not_set_field", "This field is not to be set by the user.", {} |
|
) |
|
|
|
@model_validator(mode="after") |
|
def check_agent_executor(self) -> "Agent": |
|
if not self.agent_executor: |
|
self.set_cache_handler(self.cache_handler) |
|
return self |
|
|
|
def execute_task( |
|
self, task: str, context: str = None, tools: List[Any] = None |
|
) -> str: |
|
"""Execute a task with the agent. |
|
|
|
Args: |
|
task: Task to execute. |
|
context: Context to execute the task in. |
|
tools: Tools to use for the task. |
|
|
|
Returns: |
|
Output of the agent |
|
""" |
|
if context: |
|
task = "\n".join( |
|
[task, "\nThis is the context you are working with:", context] |
|
) |
|
|
|
tools = tools or self.tools |
|
self.agent_executor.tools = tools |
|
|
|
return self.agent_executor.invoke( |
|
{ |
|
"input": task, |
|
"tool_names": self.__tools_names(tools), |
|
"tools": render_text_description(tools), |
|
}, |
|
RunnableConfig(callbacks=[self.tools_handler]), |
|
)["output"] |
|
|
|
def set_cache_handler(self, cache_handler) -> None: |
|
self.cache_handler = cache_handler |
|
self.tools_handler = ToolsHandler(cache=self.cache_handler) |
|
self.__create_agent_executor() |
|
|
|
def __create_agent_executor(self) -> CrewAgentExecutor: |
|
"""Create an agent executor for the agent. |
|
|
|
Returns: |
|
An instance of the CrewAgentExecutor class. |
|
""" |
|
agent_args = { |
|
"input": lambda x: x["input"], |
|
"tools": lambda x: x["tools"], |
|
"tool_names": lambda x: x["tool_names"], |
|
"agent_scratchpad": lambda x: format_log_to_str(x["intermediate_steps"]), |
|
} |
|
executor_args = { |
|
"tools": self.tools, |
|
"verbose": self.verbose, |
|
"handle_parsing_errors": True, |
|
} |
|
|
|
if self.memory: |
|
summary_memory = ConversationSummaryMemory( |
|
llm=self.llm, memory_key="chat_history", input_key="input" |
|
) |
|
executor_args["memory"] = summary_memory |
|
agent_args["chat_history"] = lambda x: x["chat_history"] |
|
prompt = Prompts.TASK_EXECUTION_WITH_MEMORY_PROMPT |
|
else: |
|
prompt = Prompts.TASK_EXECUTION_PROMPT |
|
|
|
execution_prompt = prompt.partial( |
|
goal=self.goal, |
|
role=self.role, |
|
backstory=self.backstory, |
|
) |
|
|
|
bind = self.llm.bind(stop=["\nObservation"]) |
|
inner_agent = ( |
|
agent_args |
|
| execution_prompt |
|
| bind |
|
| CrewAgentOutputParser( |
|
tools_handler=self.tools_handler, cache=self.cache_handler |
|
) |
|
) |
|
self.agent_executor = CrewAgentExecutor(agent=inner_agent, **executor_args) |
|
|
|
@staticmethod |
|
def __tools_names(tools) -> str: |
|
return ", ".join([t.name for t in tools]) |
|
|
|
|
|
|
|
class Task(BaseModel): |
|
"""Class that represent a task to be executed.""" |
|
__hash__ = object.__hash__ |
|
description: str = Field(description="Description of the actual task.") |
|
agent: Optional[Agent] = Field( |
|
description="Agent responsible for the task.", default=None |
|
) |
|
tools: List[Any] = Field( |
|
default_factory=list, |
|
description="Tools the agent are limited to use for this task.", |
|
) |
|
id: UUID4 = Field( |
|
default_factory=uuid.uuid4, |
|
frozen=True, |
|
description="Unique identifier for the object, not set by user.", |
|
) |
|
@field_validator("id", mode="before") |
|
@classmethod |
|
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: |
|
if v: |
|
raise PydanticCustomError( |
|
"may_not_set_field", "This field is not to be set by the user.", {} |
|
) |
|
@model_validator(mode="after") |
|
def check_tools(self): |
|
if not self.tools and (self.agent and self.agent.tools): |
|
self.tools.extend(self.agent.tools) |
|
return self |
|
def execute(self, context: str = None) -> str: |
|
"""Execute the task. |
|
Returns: |
|
Output of the task. |
|
""" |
|
if self.agent: |
|
return self.agent.execute_task( |
|
task=self.description, context=context, tools=self.tools |
|
) |
|
else: |
|
raise Exception( |
|
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Gmix using a specific process that support that, either consensual or hierarchical." |
|
) |
|
|
|
|
|
|
|
|
|
class Process(str, Enum): |
|
""" |
|
Class representing the different processes that can be used to tackle tasks |
|
""" |
|
sequential = "sequential" |
|
|
|
|
|
|
|
|
|
|
|
"""Prompts for generic agent.""" |
|
class Prompts(BaseModel): |
|
"""Prompts for generic agent.""" |
|
TASK_SLICE: ClassVar[str] = dedent( |
|
"""\ |
|
Begin! This is VERY important to you, your job depends on it! |
|
Current Task: {input}""" |
|
) |
|
SCRATCHPAD_SLICE: ClassVar[str] = "\n{agent_scratchpad}" |
|
MEMORY_SLICE: ClassVar[str] = dedent( |
|
"""\ |
|
This is the summary of your work so far: |
|
{chat_history}""" |
|
) |
|
ROLE_PLAYING_SLICE: ClassVar[str] = dedent( |
|
"""\ |
|
You are {role}. |
|
{backstory} |
|
Your personal goal is: {goal}""" |
|
) |
|
TOOLS_SLICE: ClassVar[str] = dedent( |
|
"""\ |
|
TOOLS: |
|
------ |
|
You have access to the following tools: |
|
{tools} |
|
To use a tool, please use the exact following format: |
|
``` |
|
Thought: Do I need to use a tool? Yes |
|
Action: the action to take, should be one of [{tool_names}], just the name. |
|
Action Input: the input to the action |
|
Observation: the result of the action |
|
``` |
|
When you have a response for your task, or if you do not need to use a tool, you MUST use the format: |
|
``` |
|
Thought: Do I need to use a tool? No |
|
Final Answer: [your response here] |
|
```""" |
|
) |
|
VOTING_SLICE: ClassVar[str] = dedent( |
|
"""\ |
|
You are working on a crew with your co-workers and need to decide who will execute the task. |
|
These are your format instructions: |
|
{format_instructions} |
|
These are your co-workers and their roles: |
|
{coworkers}""" |
|
) |
|
TASK_EXECUTION_WITH_MEMORY_PROMPT: ClassVar[str] = PromptTemplate.from_template( |
|
ROLE_PLAYING_SLICE + TOOLS_SLICE + MEMORY_SLICE + TASK_SLICE + SCRATCHPAD_SLICE |
|
) |
|
TASK_EXECUTION_PROMPT: ClassVar[str] = PromptTemplate.from_template( |
|
ROLE_PLAYING_SLICE + TOOLS_SLICE + TASK_SLICE + SCRATCHPAD_SLICE |
|
) |
|
CONSENSUNS_VOTING_PROMPT: ClassVar[str] = PromptTemplate.from_template( |
|
ROLE_PLAYING_SLICE + VOTING_SLICE + TASK_SLICE + SCRATCHPAD_SLICE |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|