import json import uuid from typing import Any, Dict, List, Optional, Union from pydantic_core import PydanticCustomError from crewai.agents.cache import CacheHandler from crewai.tools.agent_tools import AgentTools from typing import Any, List, Optional from langchain.agents.format_scratchpad import format_log_to_str from langchain_openai import ChatOpenAI from langchain.memory import ConversationSummaryMemory from langchain.tools.render import render_text_description from langchain_core.runnables.config import RunnableConfig # Content from crew.py from pydantic import ( UUID4, BaseModel, ConfigDict, Field, InstanceOf, Json, field_validator, model_validator, ) from crewai.agents import ( CacheHandler, CrewAgentExecutor, CrewAgentOutputParser, ToolsHandler, ) class Gmix(BaseModel): """Class that represents a group of agents, how they should work together and their tasks.""" __hash__ = object.__hash__ model_config = ConfigDict(arbitrary_types_allowed=True) tasks: List[Task] = Field(description="List of tasks", default_factory=list) agents: List[Agent] = Field( description="List of agents in this crew.", default_factory=list ) process: Process = Field( description="Process that the crew will follow.", default=Process.sequential ) verbose: Union[int, bool] = Field( description="Verbose mode for the Agent Execution", default=0 ) config: Optional[Union[Json, Dict[str, Any]]] = Field( description="Configuration of the crew.", default=None ) cache_handler: Optional[InstanceOf[CacheHandler]] = Field( default=CacheHandler(), description="An instance of the CacheHandler class." ) id: UUID4 = Field( default_factory=uuid.uuid4, frozen=True, description="Unique identifier for the object, not set by user.", ) @field_validator("id", mode="before") @classmethod def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: if v: raise PydanticCustomError( "may_not_set_field", "This field is not to be set by the user.", {} ) @classmethod @field_validator("config", mode="before") def check_config_type(cls, v: Union[Json, Dict[str, Any]]): if isinstance(v, Json): return json.loads(v) return v @model_validator(mode="after") def check_config(self): if not self.config and not self.tasks and not self.agents: raise PydanticCustomError( "missing_keys", "Either agents and task need to be set or config.", {} ) if self.config: if not self.config.get("agents") or not self.config.get("tasks"): raise PydanticCustomError( "missing_keys_in_config", "Config should have agents and tasks", {} ) self.agents = [Agent(**agent) for agent in self.config["agents"]] tasks = [] for task in self.config["tasks"]: task_agent = [agt for agt in self.agents if agt.role == task["agent"]][ 0 ] del task["agent"] tasks.append(Task(**task, agent=task_agent)) self.tasks = tasks if self.agents: for agent in self.agents: agent.set_cache_handler(self.cache_handler) return self def kickoff(self) -> str: """Kickoff the crew to work on its tasks. Returns: Output of the crew for each task. """ for agent in self.agents: agent.cache_handler = self.cache_handler if self.process == Process.sequential: return self.__sequential_loop() def __sequential_loop(self) -> str: """Loop that executes the sequential process. Returns: Output of the crew. """ task_outcome = None for task in self.tasks: # Add delegation tools to the task if the agent allows it if task.agent.allow_delegation: tools = AgentTools(agents=self.agents).tools() task.tools += tools self.__log("debug", f"Working Agent: {task.agent.role}") self.__log("info", f"Starting Task: {task.description} ...") task_outcome = task.execute(task_outcome) self.__log("debug", f"Task output: {task_outcome}") return task_outcome def __log(self, level, message): """Log a message""" level_map = {"debug": 1, "info": 2} verbose_level = ( 2 if isinstance(self.verbose, bool) and self.verbose else self.verbose ) if verbose_level and level_map[level] <= verbose_level: print(message) class Agent(BaseModel): """Represents an agent in a system. Each agent has a role, a goal, a backstory, and an optional language model (llm). The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents. Attributes: agent_executor: An instance of the CrewAgentExecutor class. role: The role of the agent. goal: The objective of the agent. backstory: The backstory of the agent. llm: The language model that will run the agent. memory: Whether the agent should have memory or not. verbose: Whether the agent execution should be in verbose mode. allow_delegation: Whether the agent is allowed to delegate tasks to other agents. """ __hash__ = object.__hash__ model_config = ConfigDict(arbitrary_types_allowed=True) id: UUID4 = Field( default_factory=uuid.uuid4, frozen=True, description="Unique identifier for the object, not set by user.", ) role: str = Field(description="Role of the agent") goal: str = Field(description="Objective of the agent") backstory: str = Field(description="Backstory of the agent") llm: Optional[Any] = Field( default_factory=lambda: ChatOpenAI( temperature=0.7, model_name="gpt-4", ), description="Language model that will run the agent.", ) memory: bool = Field( default=True, description="Whether the agent should have memory or not" ) verbose: bool = Field( default=False, description="Verbose mode for the Agent Execution" ) allow_delegation: bool = Field( default=True, description="Allow delegation of tasks to agents" ) tools: List[Any] = Field( default_factory=list, description="Tools at agents disposal" ) agent_executor: Optional[InstanceOf[CrewAgentExecutor]] = Field( default=None, description="An instance of the CrewAgentExecutor class." ) tools_handler: Optional[InstanceOf[ToolsHandler]] = Field( default=None, description="An instance of the ToolsHandler class." ) cache_handler: Optional[InstanceOf[CacheHandler]] = Field( default=CacheHandler(), description="An instance of the CacheHandler class." ) @field_validator("id", mode="before") @classmethod def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: if v: raise PydanticCustomError( "may_not_set_field", "This field is not to be set by the user.", {} ) @model_validator(mode="after") def check_agent_executor(self) -> "Agent": if not self.agent_executor: self.set_cache_handler(self.cache_handler) return self def execute_task( self, task: str, context: str = None, tools: List[Any] = None ) -> str: """Execute a task with the agent. Args: task: Task to execute. context: Context to execute the task in. tools: Tools to use for the task. Returns: Output of the agent """ if context: task = "\n".join( [task, "\nThis is the context you are working with:", context] ) tools = tools or self.tools self.agent_executor.tools = tools return self.agent_executor.invoke( { "input": task, "tool_names": self.__tools_names(tools), "tools": render_text_description(tools), }, RunnableConfig(callbacks=[self.tools_handler]), )["output"] def set_cache_handler(self, cache_handler) -> None: self.cache_handler = cache_handler self.tools_handler = ToolsHandler(cache=self.cache_handler) self.__create_agent_executor() def __create_agent_executor(self) -> CrewAgentExecutor: """Create an agent executor for the agent. Returns: An instance of the CrewAgentExecutor class. """ agent_args = { "input": lambda x: x["input"], "tools": lambda x: x["tools"], "tool_names": lambda x: x["tool_names"], "agent_scratchpad": lambda x: format_log_to_str(x["intermediate_steps"]), } executor_args = { "tools": self.tools, "verbose": self.verbose, "handle_parsing_errors": True, } if self.memory: summary_memory = ConversationSummaryMemory( llm=self.llm, memory_key="chat_history", input_key="input" ) executor_args["memory"] = summary_memory agent_args["chat_history"] = lambda x: x["chat_history"] prompt = Prompts.TASK_EXECUTION_WITH_MEMORY_PROMPT else: prompt = Prompts.TASK_EXECUTION_PROMPT execution_prompt = prompt.partial( goal=self.goal, role=self.role, backstory=self.backstory, ) bind = self.llm.bind(stop=["\nObservation"]) inner_agent = ( agent_args | execution_prompt | bind | CrewAgentOutputParser( tools_handler=self.tools_handler, cache=self.cache_handler ) ) self.agent_executor = CrewAgentExecutor(agent=inner_agent, **executor_args) @staticmethod def __tools_names(tools) -> str: return ", ".join([t.name for t in tools]) # Content from task.py class Task(BaseModel): """Class that represent a task to be executed.""" __hash__ = object.__hash__ description: str = Field(description="Description of the actual task.") agent: Optional[Agent] = Field( description="Agent responsible for the task.", default=None ) tools: List[Any] = Field( default_factory=list, description="Tools the agent are limited to use for this task.", ) id: UUID4 = Field( default_factory=uuid.uuid4, frozen=True, description="Unique identifier for the object, not set by user.", ) @field_validator("id", mode="before") @classmethod def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: if v: raise PydanticCustomError( "may_not_set_field", "This field is not to be set by the user.", {} ) @model_validator(mode="after") def check_tools(self): if not self.tools and (self.agent and self.agent.tools): self.tools.extend(self.agent.tools) return self def execute(self, context: str = None) -> str: """Execute the task. Returns: Output of the task. """ if self.agent: return self.agent.execute_task( task=self.description, context=context, tools=self.tools ) else: raise Exception( f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Gmix using a specific process that support that, either consensual or hierarchical." ) # Content from process.py class Process(str, Enum): """ Class representing the different processes that can be used to tackle tasks """ sequential = "sequential" # TODO: consensual = 'consensual' # TODO: hierarchical = 'hierarchical' # Content from prompts.py """Prompts for generic agent.""" class Prompts(BaseModel): """Prompts for generic agent.""" TASK_SLICE: ClassVar[str] = dedent( """\ Begin! This is VERY important to you, your job depends on it! Current Task: {input}""" ) SCRATCHPAD_SLICE: ClassVar[str] = "\n{agent_scratchpad}" MEMORY_SLICE: ClassVar[str] = dedent( """\ This is the summary of your work so far: {chat_history}""" ) ROLE_PLAYING_SLICE: ClassVar[str] = dedent( """\ You are {role}. {backstory} Your personal goal is: {goal}""" ) TOOLS_SLICE: ClassVar[str] = dedent( """\ TOOLS: ------ You have access to the following tools: {tools} To use a tool, please use the exact following format: ``` Thought: Do I need to use a tool? Yes Action: the action to take, should be one of [{tool_names}], just the name. Action Input: the input to the action Observation: the result of the action ``` When you have a response for your task, or if you do not need to use a tool, you MUST use the format: ``` Thought: Do I need to use a tool? No Final Answer: [your response here] ```""" ) VOTING_SLICE: ClassVar[str] = dedent( """\ You are working on a crew with your co-workers and need to decide who will execute the task. These are your format instructions: {format_instructions} These are your co-workers and their roles: {coworkers}""" ) TASK_EXECUTION_WITH_MEMORY_PROMPT: ClassVar[str] = PromptTemplate.from_template( ROLE_PLAYING_SLICE + TOOLS_SLICE + MEMORY_SLICE + TASK_SLICE + SCRATCHPAD_SLICE ) TASK_EXECUTION_PROMPT: ClassVar[str] = PromptTemplate.from_template( ROLE_PLAYING_SLICE + TOOLS_SLICE + TASK_SLICE + SCRATCHPAD_SLICE ) CONSENSUNS_VOTING_PROMPT: ClassVar[str] = PromptTemplate.from_template( ROLE_PLAYING_SLICE + VOTING_SLICE + TASK_SLICE + SCRATCHPAD_SLICE )