Spaces:
Runtime error
Runtime error
import os | |
import time | |
import logging | |
from dotenv import load_dotenv | |
from collections import deque | |
from typing import Dict, List, Optional | |
import os ,re | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
from dotenv import load_dotenv, find_dotenv | |
load_dotenv(find_dotenv()) | |
from typing import Union | |
import warnings | |
from langchain.callbacks.manager import CallbackManagerForLLMRun | |
from langchain.chat_models.base import BaseChatModel | |
from typing import Any, Iterator, List, Optional | |
from huggingface_hub import login | |
from tempfile import TemporaryDirectory | |
from langchain_community.tools.eleven_labs.text2speech import ElevenLabsText2SpeechTool | |
from langchain.utilities.serpapi import SerpAPIWrapper | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.llms import HuggingFaceHub | |
warnings.filterwarnings('ignore') | |
HUGGINGFACE_TOKEN = os.getenv["HUGGINGFACE_TOKEN"] | |
if os.path.exists("/home/codemonkeyxl/.cache/huggingface/token"): | |
newsession_bool = False | |
write_permission_bool = False | |
else: | |
newsession_bool = True | |
write_permission_bool = False | |
login(HUGGINGFACE_TOKEN, new_session= newsession_bool, write_permission= write_permission_bool ) | |
import langchain | |
from langchain.chains import LLMChain | |
from langchain.prompts import PromptTemplate | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings,HuggingFaceHubEmbeddings,HuggingFaceInferenceAPIEmbeddings | |
from langchain.llms import BaseLLM | |
from langchain.vectorstores.base import VectorStore | |
from pydantic import BaseModel, Field | |
import streamlit as st | |
from langchain.llms.base import LLM | |
tts = ElevenLabsText2SpeechTool() | |
serp_search = SerpAPIWrapper() | |
embeddings= HuggingFaceEmbeddings( | |
model_name="all-MiniLM-L6-v2", | |
model_kwargs = {'device': 'cpu'}, | |
encode_kwargs = {'normalize_embeddings': True} | |
) | |
openllm= HuggingFaceHub( repo_id="openchat/openchat_3.5", task="text-generation", | |
model_kwargs = {"min_length": 16,"max_length":1000,"temperature":0.1, "max_new_tokens":512, "num_return_sequences":1 }) | |
best_llm = HuggingFaceHub(repo_id="tiiuae/falcon-7b-instruct", task="text-generation", | |
model_kwargs = {"min_length": 200,"max_length":1000,"temperature":0.1, "max_new_tokens":512, "num_return_sequences":1}) | |
# Set Variables | |
load_dotenv() | |
class TaskCreationChain(LLMChain): | |
"""Chain to create tasks.""" | |
#def __init__(self): | |
# self.logger = logging.getLogger("TaskCreationChain") | |
def from_llm(cls, llm: BaseLLM, objective: str, verbose: bool = True) -> LLMChain: | |
"""Get the response parser.""" | |
task_creation_template = ( | |
"You are an task creation AI that uses the result of an execution agent" | |
" to create new tasks with the following objective: {objective}," | |
" The last completed task has the result: {result}." | |
" This result was based on this task description: {task_description}." | |
" These are incomplete tasks: {incomplete_tasks}." | |
" Based on the result, create new tasks to be completed" | |
" by the AI system that do not overlap with incomplete tasks." | |
" Return the tasks as an array." | |
) | |
prompt = PromptTemplate( | |
template=task_creation_template, | |
partial_variables={"objective": objective}, | |
input_variables=["result", "task_description", "incomplete_tasks"], | |
) | |
return cls(prompt=prompt, llm=llm, verbose=verbose) | |
def get_next_task(self, result: Dict, task_description: str, task_list: List[str]) -> List[Dict]: | |
"""Get the next task.""" | |
incomplete_tasks = ", ".join(task_list) | |
response = self.run(result=result, task_description=task_description, incomplete_tasks=incomplete_tasks) | |
new_tasks = response.split('\n') | |
return [{"task_name": task_name} for task_name in new_tasks if task_name.strip()] | |
class TaskPrioritizationChain(LLMChain): | |
"""Chain to prioritize tasks.""" | |
#def __init__(self): | |
# self.logger = logging.getLogger("TaskPrioritizationChain") | |
def from_llm(cls, llm: BaseLLM, objective: str, verbose: bool = True) -> LLMChain: | |
"""Get the response parser.""" | |
task_prioritization_template = ( | |
"You are an task prioritization AI tasked with cleaning the formatting of and reprioritizing" | |
" the following tasks: {task_names}." | |
" Consider the ultimate objective of your team: {objective}." | |
" Do not remove any tasks. Return the result as a numbered list, like:" | |
" #. First task" | |
" #. Second task" | |
" Start the task list with number {next_task_id}." | |
) | |
prompt = PromptTemplate( | |
template=task_prioritization_template, | |
partial_variables={"objective": objective}, | |
input_variables=["task_names", "next_task_id"], | |
) | |
return cls(prompt=prompt, llm=llm, verbose=verbose) | |
def prioritize_tasks(self, this_task_id: int, task_list: List[Dict]) -> List[Dict]: | |
"""Prioritize tasks.""" | |
task_names = [t["task_name"] for t in task_list] | |
next_task_id = int(this_task_id) + 1 | |
response = self.run(task_names=task_names, next_task_id=next_task_id) | |
new_tasks = response.split('\n') | |
prioritized_task_list = [] | |
for task_string in new_tasks: | |
if not task_string.strip(): | |
continue | |
task_parts = task_string.strip().split(".", 1) | |
if len(task_parts) == 2: | |
task_id = task_parts[0].strip() | |
task_name = task_parts[1].strip() | |
prioritized_task_list.append({"task_id": task_id, "task_name": task_name}) | |
return prioritized_task_list | |
class ExecutionChain(LLMChain): | |
"""Chain to execute tasks.""" | |
vectorstore: VectorStore = Field(init=False) | |
#def __init__(self): | |
# self.logger = logging.getLogger("ExecutionChain") | |
def from_llm(cls, llm: BaseLLM, vectorstore: VectorStore, verbose: bool = True) -> LLMChain: | |
"""Get the response parser.""" | |
execution_template = ( | |
"You are an AI who performs one task based on the following objective: {objective}." | |
" Take into account these previously completed tasks: {context}." | |
" Your task: {task}." | |
" Response:" | |
) | |
prompt = PromptTemplate( | |
template=execution_template, | |
input_variables=["objective", "context", "task"], | |
) | |
return cls(prompt=prompt, llm=llm, verbose=verbose, vectorstore=vectorstore) | |
def _get_top_tasks(self, query: str, k: int) -> List[str]: | |
"""Get the top k tasks based on the query.""" | |
results = self.vectorstore.similarity_search_with_score(query, k=k) | |
if not results: | |
return [] | |
sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True)) | |
return [str(item.metadata['task']) for item in sorted_results] | |
def execute_task(self, objective: str, task: str, k: int = 5) -> str: | |
"""Execute a task.""" | |
context = self._get_top_tasks(query=objective, k=k) | |
return self.run(objective=objective, context=context, task=task) | |
class Message: | |
exp: st.expander | |
ai_icon = "./img/robot.png" | |
def __init__(self, label: str): | |
message_area, icon_area = st.columns([10, 1]) | |
icon_area.image(self.ai_icon, caption="BabyAGI") | |
# Expander | |
self.exp = message_area.expander(label=label, expanded=True) | |
def __enter__(self): | |
return self | |
def __exit__(self, ex_type, ex_value, trace): | |
pass | |
def write(self, content): | |
self.exp.markdown(content) | |
class BabyAGI(BaseModel): | |
"""Controller model for the BabyAGI agent.""" | |
objective: str = Field(alias="objective") | |
task_list: deque = Field(default_factory=deque) | |
task_creation_chain: TaskCreationChain = Field(...) | |
task_prioritization_chain: TaskPrioritizationChain = Field(...) | |
execution_chain: ExecutionChain = Field(...) | |
task_id_counter: int = Field(1) | |
#def __init__(self): | |
# Configure loggers for each chain | |
#self.task_creation_logger = logging.getLogger("TaskCreationChain") | |
#self.task_prioritization_logger = logging.getLogger("TaskPrioritizationChain") | |
#self.execution_logger = logging.getLogger("ExecutionChain") | |
def add_task(self, task: Dict): | |
self.task_list.append(task) | |
def print_task_list(self): | |
with Message(label="Task List") as m: | |
m.write("### Task List") | |
for t in self.task_list: | |
m.write("- " + str(t["task_id"]) + ": " + t["task_name"]) | |
m.write("") | |
def print_next_task(self, task: Dict): | |
with Message(label="Next Task") as m: | |
m.write("### Next Task") | |
m.write("- " + str(task["task_id"]) + ": " + task["task_name"]) | |
m.write("") | |
def print_task_result(self, result: str): | |
with Message(label="Task Result") as m: | |
m.write("### Task Result") | |
m.write(result) | |
m.write("") | |
def print_task_ending(self): | |
with Message(label="Task Ending") as m: | |
m.write("### Task Ending") | |
m.write("") | |
def print_iteration_number(self, iteration_number: int): | |
with Message(label="Iteration Number") as m: | |
m.write(f"### Iteration Number: {iteration_number}") | |
def run(self, max_iterations: Optional[int] = None): | |
"""Run the agent.""" | |
num_iters = 0 | |
while True: | |
self.print_iteration_number(num_iters + 1) # Add this line | |
if self.task_list: | |
self.print_task_list() | |
# Step 1: Pull the first task | |
task = self.task_list.popleft() | |
self.print_next_task(task) | |
# Step 2: Execute the task | |
result = self.execution_chain.execute_task( | |
self.objective, task["task_name"] | |
) | |
this_task_id = int(task["task_id"]) | |
self.print_task_result(result) | |
# Step 3: Store the result in Pinecone | |
result_id = f"result_{num_iters}_{task['task_id']}" | |
self.execution_chain.vectorstore.add_texts( | |
texts=[result], | |
metadatas=[{"task": task["task_name"]}], | |
ids=[result_id], | |
) | |
#self.execution_logger.info(f"Task: {task['task_name']}, Result: {result}") # Log execution information | |
# Step 4: Create new tasks and reprioritize task list | |
new_tasks = self.task_creation_chain.get_next_task( | |
result, task["task_name"], [t["task_name"] for t in self.task_list] | |
) | |
for new_task in new_tasks: | |
self.task_id_counter += 1 | |
new_task.update({"task_id": self.task_id_counter}) | |
self.add_task(new_task) | |
self.task_list = deque( | |
self.task_prioritization_chain.prioritize_tasks( | |
this_task_id, list(self.task_list) | |
) | |
) | |
# Log task creation information | |
#self.task_creation_logger.info(f"Result: {result}, Task Description: {task['task_name']}, Incomplete Tasks: {', '.join([t['task_name'] for t in self.task_list])}") | |
#self.task_prioritization_logger.info(f"This Task ID: {this_task_id}, Task List: {', '.join([t['task_name'] for t in self.task_list])}") | |
num_iters += 1 | |
if max_iterations is not None and num_iters == max_iterations: | |
self.print_task_ending() | |
break | |
def from_llm_and_objectives( | |
cls, | |
llm: BaseLLM, | |
vectorstore: VectorStore, | |
objective: str, | |
first_task: str, | |
verbose: bool = False, | |
) -> "BabyAGI": | |
"""Initialize the BabyAGI Controller.""" | |
task_creation_chain = TaskCreationChain.from_llm( | |
llm, objective, verbose=verbose | |
) | |
task_prioritization_chain = TaskPrioritizationChain.from_llm( | |
llm, objective, verbose=verbose | |
) | |
execution_chain = ExecutionChain.from_llm(llm, vectorstore, verbose=verbose) | |
controller = cls( | |
objective=objective, | |
task_creation_chain=task_creation_chain, | |
task_prioritization_chain=task_prioritization_chain, | |
execution_chain=execution_chain, | |
) | |
#task_id = int(time.time()) | |
#controller.add_task({"task_id": task_id, "task_name": first_task}) | |
controller.add_task({"task_id": 1, "task_name": first_task}) | |
return controller | |
def main(): | |
iteration_number = 0 # Add this line | |
st.set_page_config( | |
initial_sidebar_state="expanded", | |
page_title="BabyAGI Streamlit", | |
layout="wide", | |
) | |
st.title("BabyAGI Streamlit") | |
st.write(f"Iteration-{iteration_number}") | |
goals = ["Make a small shooter in python OOP scripting", "Make a streamlit cheatsheet", "Make a advanced langchain examples sheet", "End poverty"] | |
objective = st.selectbox("Select Ultimate goal", goals) | |
#objective = st.text_input("Input Ultimate goal", "Solve world hunger") | |
first_task = st.text_input("Input Where to start", "Develop a task list") | |
max_iterations = st.number_input("Max iterations", value=3, min_value=1, step=1) | |
button = st.button("Run") | |
embedding_model = HuggingFaceInferenceAPIEmbeddings(api_key=os.getenv["HUGGINGFACE_TOKEN"]) | |
vectorstore = FAISS.from_texts(["_"], embedding_model, metadatas=[{"task":first_task}]) | |
if button: | |
try: | |
baby_agi.run(llm=best_llm, | |
vectorstore=vectorstore, | |
objective=objective, | |
first_task=first_task, | |
verbose=False, ) | |
except Exception as e: | |
st.error(e) | |
if __name__ == "__main__": | |
main() | |