import faiss from langchain.docstore import InMemoryDocstore from langchain.embeddings import OpenAIEmbeddings from langchain.tools.human.tool import HumanInputRun from langchain.vectorstores import FAISS from langchain_experimental.autonomous_agents import AutoGPT from swarms.agents.message import Message from swarms.tools.autogpt import ( ReadFileTool, WriteFileTool, compile, process_csv, load_qa_with_sources_chain, WebpageQATool ) from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator #cache ROOT_DIR = "./data/" #main class Worker: """ Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on Parameters: - `model_name` (str): The name of the language model to be used (default: "gpt-4"). - `openai_api_key` (str): The OpenAI API key (optional). - `ai_name` (str): The name of the AI worker. - `ai_role` (str): The role of the AI worker. - `external_tools` (list): List of external tools (optional). - `human_in_the_loop` (bool): Enable human-in-the-loop interaction (default: False). - `temperature` (float): The temperature parameter for response generation (default: 0.5). - `llm` (ChatOpenAI): Pre-initialized ChatOpenAI model instance (optional). - `openai` (bool): If True, use the OpenAI language model; otherwise, use `llm` (default: True). #Usage ``` from swarms import Worker node = Worker( ai_name="Optimus Prime", ) task = "What were the winning boston marathon times for the past 5 years (ending in 2022)? Generate a table of the year, name, country of origin, and times." response = node.run(task) print(response) ``` llm + tools + memory """ def __init__( self, ai_name: str = "Autobot Swarm Worker", ai_role: str = "Worker in a swarm", external_tools = None, human_in_the_loop = False, temperature: float = 0.5, llm = None, openai_api_key: str = None, ): self.temperature = temperature self.human_in_the_loop = human_in_the_loop self.llm = llm self.openai_api_key = openai_api_key self.ai_name = ai_name self.ai_role = ai_role self.setup_tools(external_tools) self.setup_memory() self.setup_agent() def reset(self): """ Reset the message history. """ self.message_history = ["Here is the conversation so far"] @property def name(self): return self.ai_name def receieve( self, name: str, message: str ) -> None: """ Receive a message and update the message history. Parameters: - `name` (str): The name of the sender. - `message` (str): The received message. """ self.message_history.append(f"{name}: {message}") def send(self) -> str: self.agent.run(task=self.message_history) def add(self, task, priority=0): self.task_queue.append((priority, task)) def setup_tools(self, external_tools): """ Set up tools for the worker. Parameters: - `external_tools` (list): List of external tools (optional). Example: ``` external_tools = [MyTool1(), MyTool2()] worker = Worker(model_name="gpt-4", openai_api_key="my_key", ai_name="My Worker", ai_role="Worker", external_tools=external_tools, human_in_the_loop=False, temperature=0.5) ``` """ query_website_tool = WebpageQATool( qa_chain=load_qa_with_sources_chain(self.llm) ) self.tools = [ WriteFileTool(root_dir=ROOT_DIR), ReadFileTool(root_dir=ROOT_DIR), process_csv, query_website_tool, HumanInputRun(), compile, # VQAinference, ] if external_tools is not None: self.tools.extend(external_tools) def setup_memory(self): """ Set up memory for the worker. """ try: embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key) embedding_size = 1536 index = faiss.IndexFlatL2(embedding_size) self.vectorstore = FAISS( embeddings_model.embed_query, index, InMemoryDocstore({}), {} ) except Exception as error: raise RuntimeError(f"Error setting up memory perhaps try try tuning the embedding size: {error}") def setup_agent(self): """ Set up the autonomous agent. """ try: self.agent = AutoGPT.from_llm_and_tools( ai_name=self.ai_name, ai_role=self.ai_role, tools=self.tools, llm=self.llm, memory=self.vectorstore.as_retriever(search_kwargs={"k": 8}), human_in_the_loop=self.human_in_the_loop ) except Exception as error: raise RuntimeError(f"Error setting up agent: {error}") @log_decorator @error_decorator @timing_decorator def run( self, task: str = None ): """ Run the autonomous agent on a given task. Parameters: - `task`: The task to be processed. Returns: - `result`: The result of the agent's processing. """ try: result = self.agent.run([task]) return result except Exception as error: raise RuntimeError(f"Error while running agent: {error}") @log_decorator @error_decorator @timing_decorator def __call__( self, task: str = None ): """ Make the worker callable to run the agent on a given task. Parameters: - `task`: The task to be processed. Returns: - `results`: The results of the agent's processing. """ try: results = self.agent.run([task]) return results except Exception as error: raise RuntimeError(f"Error while running agent: {error}") def health_check(self): pass @log_decorator @error_decorator @timing_decorator def chat( self, msg: str = None, streaming: bool = False ): """ Run chat Args: msg (str, optional): Message to send to the agent. Defaults to None. language (str, optional): Language to use. Defaults to None. streaming (bool, optional): Whether to stream the response. Defaults to False. Returns: str: Response from the agent Usage: -------------- agent = MultiModalAgent() agent.chat("Hello") """ #add users message to the history self.history.append( Message( "User", msg ) ) #process msg try: response = self.agent.run(msg) #add agent's response to the history self.history.append( Message( "Agent", response ) ) #if streaming is = True if streaming: return self._stream_response(response) else: response except Exception as error: error_message = f"Error processing message: {str(error)}" #add error to history self.history.append( Message( "Agent", error_message ) ) return error_message def _stream_response( self, response: str = None ): """ Yield the response token by token (word by word) Usage: -------------- for token in _stream_response(response): print(token) """ for token in response.split(): yield token