Spaces:
Runtime error
Runtime error
File size: 6,017 Bytes
4962437 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
import logging
import os
import faiss
from langchain import LLMChain, OpenAI, PromptTemplate
from langchain.agents import AgentExecutor, Tool, ZeroShotAgent
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain_experimental.autonomous_agents import BabyAGI
from pydantic import ValidationError
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# ---------- Boss Node ----------
class Boss:
"""
The Bose class is responsible for creating and executing tasks using the BabyAGI model.
It takes a language model (llm), a vectorstore for memory, an agent_executor for task execution, and a maximum number of iterations for the BabyAGI model.
# Setup
api_key = "YOUR_OPENAI_API_KEY" # Replace with your OpenAI API Key.
os.environ["OPENAI_API_KEY"] = api_key
# Objective for the Boss
objective = "Analyze website user behavior patterns over the past month."
# Create a Bose instance
boss = Bose(
objective=objective,
boss_system_prompt="You are the main controller of a data analysis swarm...",
api_key=api_key,
worker_node=WorkerNode
)
# Run the Bose to process the objective
boss.run()
"""
def __init__(
self,
objective: str,
api_key=None,
max_iterations=5,
human_in_the_loop=None,
boss_system_prompt="You are a boss planner in a swarm...",
llm_class=OpenAI,
worker_node=None,
verbose=False
):
# Store parameters
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.objective = objective
self.max_iterations = max_iterations
self.boss_system_prompt = boss_system_prompt
self.llm_class = llm_class
self.verbose = verbose
# Initialization methods
self.llm = self._initialize_llm()
self.vectorstore = self._initialize_vectorstore()
self.task = self._create_task(self.objective)
self.agent_executor = self._initialize_agent_executor(worker_node)
self.baby_agi = self._initialize_baby_agi(human_in_the_loop)
def _initialize_llm(self):
"""
Init LLM
Params:
llm_class(class): The Language model class. Default is OpenAI.
temperature (float): The Temperature for the language model. Default is 0.5
"""
try:
return self.llm_class(openai_api_key=self.api_key, temperature=0.5)
except Exception as e:
logging.error(f"Failed to initialize language model: {e}")
raise e
def _initialize_vectorstore(self):
try:
embeddings_model = OpenAIEmbeddings(openai_api_key=self.api_key)
embedding_size = 8192
index = faiss.IndexFlatL2(embedding_size)
return FAISS(
embeddings_model.embed_query,
index,
InMemoryDocstore({}), {}
)
except Exception as e:
logging.error(f"Failed to initialize vector store: {e}")
raise e
def _initialize_agent_executor(self, worker_node):
todo_prompt = PromptTemplate.from_template(self.boss_system_prompt)
todo_chain = LLMChain(llm=self.llm, prompt=todo_prompt)
tools = [
Tool(
name="Goal Decomposition Tool",
func=todo_chain.run,
description="Use Case: Decompose ambitious goals into as many explicit and well defined tasks for an AI agent to follow. Rules and Regulations, don't use this tool too often only in the beginning when the user grants you a mission."
),
Tool(name="Swarm Worker Agent", func=worker_node, description="Use Case: When you want to delegate and assign the decomposed goal sub tasks to a worker agent in your swarm, Rules and Regulations, Provide a task specification sheet to the worker agent. It can use the browser, process csvs and generate content")
]
suffix = """Question: {task}\n{agent_scratchpad}"""
prefix = """You are a Boss in a swarm who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}.\n """
prompt = ZeroShotAgent.create_prompt(
tools,
prefix=prefix,
suffix=suffix,
input_variables=["objective", "task", "context", "agent_scratchpad"],
)
llm_chain = LLMChain(llm=self.llm, prompt=prompt)
agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tools)
return AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=self.verbose)
def _initialize_baby_agi(self, human_in_the_loop):
try:
return BabyAGI.from_llm(
llm=self.llm,
vectorstore=self.vectorstore,
task_execution_chain=self.agent_executor,
max_iterations=self.max_iterations,
human_in_the_loop=human_in_the_loop
)
except ValidationError as e:
logging.error(f"Validation Error while initializing BabyAGI: {e}")
raise
except Exception as e:
logging.error(f"Unexpected Error while initializing BabyAGI: {e}")
raise
def _create_task(self, objective):
if not objective:
logging.error("Objective cannot be empty.")
raise ValueError("Objective cannot be empty.")
return {"objective": objective}
def run(self):
if not self.task:
logging.error("Task cannot be empty.")
raise ValueError("Task cannot be empty.")
try:
self.baby_agi(self.task)
except Exception as e:
logging.error(f"Error while executing task: {e}")
raise
|