WAWAA's picture
Upload folder using huggingface_hub
4962437
import faiss
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
from langchain.tools.human.tool import HumanInputRun
from langchain.vectorstores import FAISS
from langchain_experimental.autonomous_agents import AutoGPT
from swarms.agents.message import Message
from swarms.tools.autogpt import (
ReadFileTool,
WriteFileTool,
compile,
process_csv,
load_qa_with_sources_chain,
WebpageQATool
)
from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator
#cache
ROOT_DIR = "./data/"
#main
class Worker:
"""
Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks,
it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on
Parameters:
- `model_name` (str): The name of the language model to be used (default: "gpt-4").
- `openai_api_key` (str): The OpenAI API key (optional).
- `ai_name` (str): The name of the AI worker.
- `ai_role` (str): The role of the AI worker.
- `external_tools` (list): List of external tools (optional).
- `human_in_the_loop` (bool): Enable human-in-the-loop interaction (default: False).
- `temperature` (float): The temperature parameter for response generation (default: 0.5).
- `llm` (ChatOpenAI): Pre-initialized ChatOpenAI model instance (optional).
- `openai` (bool): If True, use the OpenAI language model; otherwise, use `llm` (default: True).
#Usage
```
from swarms import Worker
node = Worker(
ai_name="Optimus Prime",
)
task = "What were the winning boston marathon times for the past 5 years (ending in 2022)? Generate a table of the year, name, country of origin, and times."
response = node.run(task)
print(response)
```
llm + tools + memory
"""
def __init__(
self,
ai_name: str = "Autobot Swarm Worker",
ai_role: str = "Worker in a swarm",
external_tools = None,
human_in_the_loop = False,
temperature: float = 0.5,
llm = None,
openai_api_key: str = None,
):
self.temperature = temperature
self.human_in_the_loop = human_in_the_loop
self.llm = llm
self.openai_api_key = openai_api_key
self.ai_name = ai_name
self.ai_role = ai_role
self.setup_tools(external_tools)
self.setup_memory()
self.setup_agent()
def reset(self):
"""
Reset the message history.
"""
self.message_history = ["Here is the conversation so far"]
@property
def name(self):
return self.ai_name
def receieve(
self,
name: str,
message: str
) -> None:
"""
Receive a message and update the message history.
Parameters:
- `name` (str): The name of the sender.
- `message` (str): The received message.
"""
self.message_history.append(f"{name}: {message}")
def send(self) -> str:
self.agent.run(task=self.message_history)
def add(self, task, priority=0):
self.task_queue.append((priority, task))
def setup_tools(self, external_tools):
"""
Set up tools for the worker.
Parameters:
- `external_tools` (list): List of external tools (optional).
Example:
```
external_tools = [MyTool1(), MyTool2()]
worker = Worker(model_name="gpt-4",
openai_api_key="my_key",
ai_name="My Worker",
ai_role="Worker",
external_tools=external_tools,
human_in_the_loop=False,
temperature=0.5)
```
"""
query_website_tool = WebpageQATool(
qa_chain=load_qa_with_sources_chain(self.llm)
)
self.tools = [
WriteFileTool(root_dir=ROOT_DIR),
ReadFileTool(root_dir=ROOT_DIR),
process_csv,
query_website_tool,
HumanInputRun(),
compile,
# VQAinference,
]
if external_tools is not None:
self.tools.extend(external_tools)
def setup_memory(self):
"""
Set up memory for the worker.
"""
try:
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
self.vectorstore = FAISS(
embeddings_model.embed_query,
index,
InMemoryDocstore({}), {}
)
except Exception as error:
raise RuntimeError(f"Error setting up memory perhaps try try tuning the embedding size: {error}")
def setup_agent(self):
"""
Set up the autonomous agent.
"""
try:
self.agent = AutoGPT.from_llm_and_tools(
ai_name=self.ai_name,
ai_role=self.ai_role,
tools=self.tools,
llm=self.llm,
memory=self.vectorstore.as_retriever(search_kwargs={"k": 8}),
human_in_the_loop=self.human_in_the_loop
)
except Exception as error:
raise RuntimeError(f"Error setting up agent: {error}")
@log_decorator
@error_decorator
@timing_decorator
def run(
self,
task: str = None
):
"""
Run the autonomous agent on a given task.
Parameters:
- `task`: The task to be processed.
Returns:
- `result`: The result of the agent's processing.
"""
try:
result = self.agent.run([task])
return result
except Exception as error:
raise RuntimeError(f"Error while running agent: {error}")
@log_decorator
@error_decorator
@timing_decorator
def __call__(
self,
task: str = None
):
"""
Make the worker callable to run the agent on a given task.
Parameters:
- `task`: The task to be processed.
Returns:
- `results`: The results of the agent's processing.
"""
try:
results = self.agent.run([task])
return results
except Exception as error:
raise RuntimeError(f"Error while running agent: {error}")
def health_check(self):
pass
@log_decorator
@error_decorator
@timing_decorator
def chat(
self,
msg: str = None,
streaming: bool = False
):
"""
Run chat
Args:
msg (str, optional): Message to send to the agent. Defaults to None.
language (str, optional): Language to use. Defaults to None.
streaming (bool, optional): Whether to stream the response. Defaults to False.
Returns:
str: Response from the agent
Usage:
--------------
agent = MultiModalAgent()
agent.chat("Hello")
"""
#add users message to the history
self.history.append(
Message(
"User",
msg
)
)
#process msg
try:
response = self.agent.run(msg)
#add agent's response to the history
self.history.append(
Message(
"Agent",
response
)
)
#if streaming is = True
if streaming:
return self._stream_response(response)
else:
response
except Exception as error:
error_message = f"Error processing message: {str(error)}"
#add error to history
self.history.append(
Message(
"Agent",
error_message
)
)
return error_message
def _stream_response(
self,
response: str = None
):
"""
Yield the response token by token (word by word)
Usage:
--------------
for token in _stream_response(response):
print(token)
"""
for token in response.split():
yield token