eaglelandsonce's picture
Update gmixai.py
f870d7d
raw
history blame
14.2 kB
import json
import uuid
from typing import Any, Dict, List, Optional, Union
from pydantic_core import PydanticCustomError
from crewai.agents.cache import CacheHandler
from crewai.tools.agent_tools import AgentTools
from typing import Any, List, Optional
from langchain.agents.format_scratchpad import format_log_to_str
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationSummaryMemory
from langchain.tools.render import render_text_description
from langchain_core.runnables.config import RunnableConfig
# Content from crew.py
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
Json,
field_validator,
model_validator,
)
from crewai.agents import (
CacheHandler,
CrewAgentExecutor,
CrewAgentOutputParser,
ToolsHandler,
)
class Gmix(BaseModel):
"""Class that represents a group of agents, how they should work together and their tasks."""
__hash__ = object.__hash__
model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(description="List of tasks", default_factory=list)
agents: List[Agent] = Field(
description="List of agents in this crew.", default_factory=list
)
process: Process = Field(
description="Process that the crew will follow.", default=Process.sequential
)
verbose: Union[int, bool] = Field(
description="Verbose mode for the Agent Execution", default=0
)
config: Optional[Union[Json, Dict[str, Any]]] = Field(
description="Configuration of the crew.", default=None
)
cache_handler: Optional[InstanceOf[CacheHandler]] = Field(
default=CacheHandler(), description="An instance of the CacheHandler class."
)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@classmethod
@field_validator("config", mode="before")
def check_config_type(cls, v: Union[Json, Dict[str, Any]]):
if isinstance(v, Json):
return json.loads(v)
return v
@model_validator(mode="after")
def check_config(self):
if not self.config and not self.tasks and not self.agents:
raise PydanticCustomError(
"missing_keys", "Either agents and task need to be set or config.", {}
)
if self.config:
if not self.config.get("agents") or not self.config.get("tasks"):
raise PydanticCustomError(
"missing_keys_in_config", "Config should have agents and tasks", {}
)
self.agents = [Agent(**agent) for agent in self.config["agents"]]
tasks = []
for task in self.config["tasks"]:
task_agent = [agt for agt in self.agents if agt.role == task["agent"]][
0
]
del task["agent"]
tasks.append(Task(**task, agent=task_agent))
self.tasks = tasks
if self.agents:
for agent in self.agents:
agent.set_cache_handler(self.cache_handler)
return self
def kickoff(self) -> str:
"""Kickoff the crew to work on its tasks.
Returns:
Output of the crew for each task.
"""
for agent in self.agents:
agent.cache_handler = self.cache_handler
if self.process == Process.sequential:
return self.__sequential_loop()
def __sequential_loop(self) -> str:
"""Loop that executes the sequential process.
Returns:
Output of the crew.
"""
task_outcome = None
for task in self.tasks:
# Add delegation tools to the task if the agent allows it
if task.agent.allow_delegation:
tools = AgentTools(agents=self.agents).tools()
task.tools += tools
self.__log("debug", f"Working Agent: {task.agent.role}")
self.__log("info", f"Starting Task: {task.description} ...")
task_outcome = task.execute(task_outcome)
self.__log("debug", f"Task output: {task_outcome}")
return task_outcome
def __log(self, level, message):
"""Log a message"""
level_map = {"debug": 1, "info": 2}
verbose_level = (
2 if isinstance(self.verbose, bool) and self.verbose else self.verbose
)
if verbose_level and level_map[level] <= verbose_level:
print(message)
class Agent(BaseModel):
"""Represents an agent in a system.
Each agent has a role, a goal, a backstory, and an optional language model (llm).
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
Attributes:
agent_executor: An instance of the CrewAgentExecutor class.
role: The role of the agent.
goal: The objective of the agent.
backstory: The backstory of the agent.
llm: The language model that will run the agent.
memory: Whether the agent should have memory or not.
verbose: Whether the agent execution should be in verbose mode.
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
"""
__hash__ = object.__hash__
model_config = ConfigDict(arbitrary_types_allowed=True)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
llm: Optional[Any] = Field(
default_factory=lambda: ChatOpenAI(
temperature=0.7,
model_name="gpt-4",
),
description="Language model that will run the agent.",
)
memory: bool = Field(
default=True, description="Whether the agent should have memory or not"
)
verbose: bool = Field(
default=False, description="Verbose mode for the Agent Execution"
)
allow_delegation: bool = Field(
default=True, description="Allow delegation of tasks to agents"
)
tools: List[Any] = Field(
default_factory=list, description="Tools at agents disposal"
)
agent_executor: Optional[InstanceOf[CrewAgentExecutor]] = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
tools_handler: Optional[InstanceOf[ToolsHandler]] = Field(
default=None, description="An instance of the ToolsHandler class."
)
cache_handler: Optional[InstanceOf[CacheHandler]] = Field(
default=CacheHandler(), description="An instance of the CacheHandler class."
)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def check_agent_executor(self) -> "Agent":
if not self.agent_executor:
self.set_cache_handler(self.cache_handler)
return self
def execute_task(
self, task: str, context: str = None, tools: List[Any] = None
) -> str:
"""Execute a task with the agent.
Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.
Returns:
Output of the agent
"""
if context:
task = "\n".join(
[task, "\nThis is the context you are working with:", context]
)
tools = tools or self.tools
self.agent_executor.tools = tools
return self.agent_executor.invoke(
{
"input": task,
"tool_names": self.__tools_names(tools),
"tools": render_text_description(tools),
},
RunnableConfig(callbacks=[self.tools_handler]),
)["output"]
def set_cache_handler(self, cache_handler) -> None:
self.cache_handler = cache_handler
self.tools_handler = ToolsHandler(cache=self.cache_handler)
self.__create_agent_executor()
def __create_agent_executor(self) -> CrewAgentExecutor:
"""Create an agent executor for the agent.
Returns:
An instance of the CrewAgentExecutor class.
"""
agent_args = {
"input": lambda x: x["input"],
"tools": lambda x: x["tools"],
"tool_names": lambda x: x["tool_names"],
"agent_scratchpad": lambda x: format_log_to_str(x["intermediate_steps"]),
}
executor_args = {
"tools": self.tools,
"verbose": self.verbose,
"handle_parsing_errors": True,
}
if self.memory:
summary_memory = ConversationSummaryMemory(
llm=self.llm, memory_key="chat_history", input_key="input"
)
executor_args["memory"] = summary_memory
agent_args["chat_history"] = lambda x: x["chat_history"]
prompt = Prompts.TASK_EXECUTION_WITH_MEMORY_PROMPT
else:
prompt = Prompts.TASK_EXECUTION_PROMPT
execution_prompt = prompt.partial(
goal=self.goal,
role=self.role,
backstory=self.backstory,
)
bind = self.llm.bind(stop=["\nObservation"])
inner_agent = (
agent_args
| execution_prompt
| bind
| CrewAgentOutputParser(
tools_handler=self.tools_handler, cache=self.cache_handler
)
)
self.agent_executor = CrewAgentExecutor(agent=inner_agent, **executor_args)
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])
# Content from task.py
class Task(BaseModel):
"""Class that represent a task to be executed."""
__hash__ = object.__hash__
description: str = Field(description="Description of the actual task.")
agent: Optional[Agent] = Field(
description="Agent responsible for the task.", default=None
)
tools: List[Any] = Field(
default_factory=list,
description="Tools the agent are limited to use for this task.",
)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def check_tools(self):
if not self.tools and (self.agent and self.agent.tools):
self.tools.extend(self.agent.tools)
return self
def execute(self, context: str = None) -> str:
"""Execute the task.
Returns:
Output of the task.
"""
if self.agent:
return self.agent.execute_task(
task=self.description, context=context, tools=self.tools
)
else:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Gmix using a specific process that support that, either consensual or hierarchical."
)
# Content from process.py
class Process(str, Enum):
"""
Class representing the different processes that can be used to tackle tasks
"""
sequential = "sequential"
# TODO: consensual = 'consensual'
# TODO: hierarchical = 'hierarchical'
# Content from prompts.py
"""Prompts for generic agent."""
class Prompts(BaseModel):
"""Prompts for generic agent."""
TASK_SLICE: ClassVar[str] = dedent(
"""\
Begin! This is VERY important to you, your job depends on it!
Current Task: {input}"""
)
SCRATCHPAD_SLICE: ClassVar[str] = "\n{agent_scratchpad}"
MEMORY_SLICE: ClassVar[str] = dedent(
"""\
This is the summary of your work so far:
{chat_history}"""
)
ROLE_PLAYING_SLICE: ClassVar[str] = dedent(
"""\
You are {role}.
{backstory}
Your personal goal is: {goal}"""
)
TOOLS_SLICE: ClassVar[str] = dedent(
"""\
TOOLS:
------
You have access to the following tools:
{tools}
To use a tool, please use the exact following format:
```
Thought: Do I need to use a tool? Yes
Action: the action to take, should be one of [{tool_names}], just the name.
Action Input: the input to the action
Observation: the result of the action
```
When you have a response for your task, or if you do not need to use a tool, you MUST use the format:
```
Thought: Do I need to use a tool? No
Final Answer: [your response here]
```"""
)
VOTING_SLICE: ClassVar[str] = dedent(
"""\
You are working on a crew with your co-workers and need to decide who will execute the task.
These are your format instructions:
{format_instructions}
These are your co-workers and their roles:
{coworkers}"""
)
TASK_EXECUTION_WITH_MEMORY_PROMPT: ClassVar[str] = PromptTemplate.from_template(
ROLE_PLAYING_SLICE + TOOLS_SLICE + MEMORY_SLICE + TASK_SLICE + SCRATCHPAD_SLICE
)
TASK_EXECUTION_PROMPT: ClassVar[str] = PromptTemplate.from_template(
ROLE_PLAYING_SLICE + TOOLS_SLICE + TASK_SLICE + SCRATCHPAD_SLICE
)
CONSENSUNS_VOTING_PROMPT: ClassVar[str] = PromptTemplate.from_template(
ROLE_PLAYING_SLICE + VOTING_SLICE + TASK_SLICE + SCRATCHPAD_SLICE
)