import os import time import logging from dotenv import load_dotenv from collections import deque from typing import Dict, List, Optional import os ,re script_dir = os.path.dirname(os.path.abspath(__file__)) from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv()) from typing import Union import warnings from langchain.callbacks.manager import CallbackManagerForLLMRun from langchain.chat_models.base import BaseChatModel from typing import Any, Iterator, List, Optional from huggingface_hub import login from tempfile import TemporaryDirectory from langchain_community.tools.eleven_labs.text2speech import ElevenLabsText2SpeechTool from langchain.utilities.serpapi import SerpAPIWrapper from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.llms import HuggingFaceHub warnings.filterwarnings('ignore') HUGGINGFACE_TOKEN = os.getenv["HUGGINGFACE_TOKEN"] if os.path.exists("/home/codemonkeyxl/.cache/huggingface/token"): newsession_bool = False write_permission_bool = False else: newsession_bool = True write_permission_bool = False login(HUGGINGFACE_TOKEN, new_session= newsession_bool, write_permission= write_permission_bool ) import langchain from langchain.chains import LLMChain from langchain.prompts import PromptTemplate from langchain_community.vectorstores import FAISS from langchain_community.embeddings import HuggingFaceEmbeddings,HuggingFaceHubEmbeddings,HuggingFaceInferenceAPIEmbeddings from langchain.llms import BaseLLM from langchain.vectorstores.base import VectorStore from pydantic import BaseModel, Field import streamlit as st from langchain.llms.base import LLM tts = ElevenLabsText2SpeechTool() serp_search = SerpAPIWrapper() embeddings= HuggingFaceEmbeddings( model_name="all-MiniLM-L6-v2", model_kwargs = {'device': 'cpu'}, encode_kwargs = {'normalize_embeddings': True} ) openllm= HuggingFaceHub( repo_id="openchat/openchat_3.5", task="text-generation", model_kwargs = {"min_length": 16,"max_length":1000,"temperature":0.1, "max_new_tokens":512, "num_return_sequences":1 }) best_llm = HuggingFaceHub(repo_id="tiiuae/falcon-7b-instruct", task="text-generation", model_kwargs = {"min_length": 200,"max_length":1000,"temperature":0.1, "max_new_tokens":512, "num_return_sequences":1}) # Set Variables load_dotenv() class TaskCreationChain(LLMChain): """Chain to create tasks.""" #def __init__(self): # self.logger = logging.getLogger("TaskCreationChain") @classmethod def from_llm(cls, llm: BaseLLM, objective: str, verbose: bool = True) -> LLMChain: """Get the response parser.""" task_creation_template = ( "You are an task creation AI that uses the result of an execution agent" " to create new tasks with the following objective: {objective}," " The last completed task has the result: {result}." " This result was based on this task description: {task_description}." " These are incomplete tasks: {incomplete_tasks}." " Based on the result, create new tasks to be completed" " by the AI system that do not overlap with incomplete tasks." " Return the tasks as an array." ) prompt = PromptTemplate( template=task_creation_template, partial_variables={"objective": objective}, input_variables=["result", "task_description", "incomplete_tasks"], ) return cls(prompt=prompt, llm=llm, verbose=verbose) def get_next_task(self, result: Dict, task_description: str, task_list: List[str]) -> List[Dict]: """Get the next task.""" incomplete_tasks = ", ".join(task_list) response = self.run(result=result, task_description=task_description, incomplete_tasks=incomplete_tasks) new_tasks = response.split('\n') return [{"task_name": task_name} for task_name in new_tasks if task_name.strip()] class TaskPrioritizationChain(LLMChain): """Chain to prioritize tasks.""" #def __init__(self): # self.logger = logging.getLogger("TaskPrioritizationChain") @classmethod def from_llm(cls, llm: BaseLLM, objective: str, verbose: bool = True) -> LLMChain: """Get the response parser.""" task_prioritization_template = ( "You are an task prioritization AI tasked with cleaning the formatting of and reprioritizing" " the following tasks: {task_names}." " Consider the ultimate objective of your team: {objective}." " Do not remove any tasks. Return the result as a numbered list, like:" " #. First task" " #. Second task" " Start the task list with number {next_task_id}." ) prompt = PromptTemplate( template=task_prioritization_template, partial_variables={"objective": objective}, input_variables=["task_names", "next_task_id"], ) return cls(prompt=prompt, llm=llm, verbose=verbose) def prioritize_tasks(self, this_task_id: int, task_list: List[Dict]) -> List[Dict]: """Prioritize tasks.""" task_names = [t["task_name"] for t in task_list] next_task_id = int(this_task_id) + 1 response = self.run(task_names=task_names, next_task_id=next_task_id) new_tasks = response.split('\n') prioritized_task_list = [] for task_string in new_tasks: if not task_string.strip(): continue task_parts = task_string.strip().split(".", 1) if len(task_parts) == 2: task_id = task_parts[0].strip() task_name = task_parts[1].strip() prioritized_task_list.append({"task_id": task_id, "task_name": task_name}) return prioritized_task_list class ExecutionChain(LLMChain): """Chain to execute tasks.""" vectorstore: VectorStore = Field(init=False) #def __init__(self): # self.logger = logging.getLogger("ExecutionChain") @classmethod def from_llm(cls, llm: BaseLLM, vectorstore: VectorStore, verbose: bool = True) -> LLMChain: """Get the response parser.""" execution_template = ( "You are an AI who performs one task based on the following objective: {objective}." " Take into account these previously completed tasks: {context}." " Your task: {task}." " Response:" ) prompt = PromptTemplate( template=execution_template, input_variables=["objective", "context", "task"], ) return cls(prompt=prompt, llm=llm, verbose=verbose, vectorstore=vectorstore) def _get_top_tasks(self, query: str, k: int) -> List[str]: """Get the top k tasks based on the query.""" results = self.vectorstore.similarity_search_with_score(query, k=k) if not results: return [] sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True)) return [str(item.metadata['task']) for item in sorted_results] def execute_task(self, objective: str, task: str, k: int = 5) -> str: """Execute a task.""" context = self._get_top_tasks(query=objective, k=k) return self.run(objective=objective, context=context, task=task) class Message: exp: st.expander ai_icon = "./img/robot.png" def __init__(self, label: str): message_area, icon_area = st.columns([10, 1]) icon_area.image(self.ai_icon, caption="BabyAGI") # Expander self.exp = message_area.expander(label=label, expanded=True) def __enter__(self): return self def __exit__(self, ex_type, ex_value, trace): pass def write(self, content): self.exp.markdown(content) class BabyAGI(BaseModel): """Controller model for the BabyAGI agent.""" objective: str = Field(alias="objective") task_list: deque = Field(default_factory=deque) task_creation_chain: TaskCreationChain = Field(...) task_prioritization_chain: TaskPrioritizationChain = Field(...) execution_chain: ExecutionChain = Field(...) task_id_counter: int = Field(1) #def __init__(self): # Configure loggers for each chain #self.task_creation_logger = logging.getLogger("TaskCreationChain") #self.task_prioritization_logger = logging.getLogger("TaskPrioritizationChain") #self.execution_logger = logging.getLogger("ExecutionChain") def add_task(self, task: Dict): self.task_list.append(task) def print_task_list(self): with Message(label="Task List") as m: m.write("### Task List") for t in self.task_list: m.write("- " + str(t["task_id"]) + ": " + t["task_name"]) m.write("") def print_next_task(self, task: Dict): with Message(label="Next Task") as m: m.write("### Next Task") m.write("- " + str(task["task_id"]) + ": " + task["task_name"]) m.write("") def print_task_result(self, result: str): with Message(label="Task Result") as m: m.write("### Task Result") m.write(result) m.write("") def print_task_ending(self): with Message(label="Task Ending") as m: m.write("### Task Ending") m.write("") def print_iteration_number(self, iteration_number: int): with Message(label="Iteration Number") as m: m.write(f"### Iteration Number: {iteration_number}") def run(self, max_iterations: Optional[int] = None): """Run the agent.""" num_iters = 0 while True: self.print_iteration_number(num_iters + 1) # Add this line if self.task_list: self.print_task_list() # Step 1: Pull the first task task = self.task_list.popleft() self.print_next_task(task) # Step 2: Execute the task result = self.execution_chain.execute_task( self.objective, task["task_name"] ) this_task_id = int(task["task_id"]) self.print_task_result(result) # Step 3: Store the result in Pinecone result_id = f"result_{num_iters}_{task['task_id']}" self.execution_chain.vectorstore.add_texts( texts=[result], metadatas=[{"task": task["task_name"]}], ids=[result_id], ) #self.execution_logger.info(f"Task: {task['task_name']}, Result: {result}") # Log execution information # Step 4: Create new tasks and reprioritize task list new_tasks = self.task_creation_chain.get_next_task( result, task["task_name"], [t["task_name"] for t in self.task_list] ) for new_task in new_tasks: self.task_id_counter += 1 new_task.update({"task_id": self.task_id_counter}) self.add_task(new_task) self.task_list = deque( self.task_prioritization_chain.prioritize_tasks( this_task_id, list(self.task_list) ) ) # Log task creation information #self.task_creation_logger.info(f"Result: {result}, Task Description: {task['task_name']}, Incomplete Tasks: {', '.join([t['task_name'] for t in self.task_list])}") #self.task_prioritization_logger.info(f"This Task ID: {this_task_id}, Task List: {', '.join([t['task_name'] for t in self.task_list])}") num_iters += 1 if max_iterations is not None and num_iters == max_iterations: self.print_task_ending() break @classmethod def from_llm_and_objectives( cls, llm: BaseLLM, vectorstore: VectorStore, objective: str, first_task: str, verbose: bool = False, ) -> "BabyAGI": """Initialize the BabyAGI Controller.""" task_creation_chain = TaskCreationChain.from_llm( llm, objective, verbose=verbose ) task_prioritization_chain = TaskPrioritizationChain.from_llm( llm, objective, verbose=verbose ) execution_chain = ExecutionChain.from_llm(llm, vectorstore, verbose=verbose) controller = cls( objective=objective, task_creation_chain=task_creation_chain, task_prioritization_chain=task_prioritization_chain, execution_chain=execution_chain, ) #task_id = int(time.time()) #controller.add_task({"task_id": task_id, "task_name": first_task}) controller.add_task({"task_id": 1, "task_name": first_task}) return controller def main(): iteration_number = 0 # Add this line st.set_page_config( initial_sidebar_state="expanded", page_title="BabyAGI Streamlit", layout="wide", ) st.title("BabyAGI Streamlit") st.write(f"Iteration-{iteration_number}") goals = ["Make a small shooter in python OOP scripting", "Make a streamlit cheatsheet", "Make a advanced langchain examples sheet", "End poverty"] objective = st.selectbox("Select Ultimate goal", goals) #objective = st.text_input("Input Ultimate goal", "Solve world hunger") first_task = st.text_input("Input Where to start", "Develop a task list") max_iterations = st.number_input("Max iterations", value=3, min_value=1, step=1) button = st.button("Run") embedding_model = HuggingFaceInferenceAPIEmbeddings(api_key=os.getenv["HUGGINGFACE_TOKEN"]) vectorstore = FAISS.from_texts(["_"], embedding_model, metadatas=[{"task":first_task}]) if button: try: baby_agi.run(llm=best_llm, vectorstore=vectorstore, objective=objective, first_task=first_task, verbose=False, ) except Exception as e: st.error(e) if __name__ == "__main__": main()