Spaces:
Running
on
Zero
Running
on
Zero
File size: 6,256 Bytes
0f43f8a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import os
from dotenv import load_dotenv
import time
from datetime import datetime
from skills.skill_registry import SkillRegistry
from tasks.task_registry import TaskRegistry
from ongoing_tasks import ongoing_tasks
load_dotenv() # Load environment variables from .env file
api_keys = {
'openai': os.getenv('OPENAI_API_KEY'),
'serpapi': os.getenv('SERPAPI_API_KEY')
#'airtable': os.getenv('AIRTABLE_API_KEY')
}
OBJECTIVE = "Research Yohei Nakajima and write a poem about him."
LOAD_SKILLS = ['web_search', 'text_completion', 'code_reader','google_jobs_api_search','image_generation','startup_analysis','play_music','game_generation']
#add web_search and documentation_search after you add SERPAPI_API_KEY in your secrets. airtable_search once you've added your AIRTABLE_API_KEY, and add base/table/column data to airtable_search.py, etc...
REFLECTION = False #Experimental reflection step between each task run (when running tasklist)
def run_single_task(task_id, task, skill_registry, task_outputs, OBJECTIVE, task_registry):
"""Execute a single task and update its status"""
task_output = task_registry.execute_task(task_id, task, skill_registry, task_outputs, OBJECTIVE)
task_outputs[task_id]["output"] = task_output
task_outputs[task_id]["completed"] = True
task_outputs[task_id]["description"] = task.get('description', 'No description available')
task_outputs[task_id]["skill"] = task.get('skill', 'No skill information available')
if task_output:
task_registry.update_tasks({"id": task_id, "status": "completed", "result": task_output})
completed_task = task_registry.get_task(task_id)
print(f"Task #{task_id}: {completed_task.get('task')} [COMPLETED][{completed_task.get('skill')}]")
if REFLECTION:
new_tasks, insert_after_ids, tasks_to_update = task_registry.reflect_on_output(task_output, skill_descriptions)
for new_task, after_id in zip(new_tasks, insert_after_ids):
task_registry.add_task(new_task, after_id)
if isinstance(tasks_to_update, dict) and tasks_to_update:
tasks_to_update = [tasks_to_update]
for task_to_update in tasks_to_update:
task_registry.update_tasks(task_to_update)
def run_main_loop(OBJECTIVE, LOAD_SKILLS, api_keys, REFLECTION=False):
"""Main execution loop"""
try:
skill_descriptions = ",".join(f"[{skill.name}: {skill.description}]" for skill in global_skill_registry.skills.values())
task_registry = TaskRegistry()
task_registry.create_tasklist(OBJECTIVE, skill_descriptions)
skill_names = [skill.name for skill in global_skill_registry.skills.values()]
session_summary = f"OBJECTIVE:{OBJECTIVE}.#SKILLS:{','.join(skill_names)}.#"
task_outputs = {task["id"]: {"completed": False, "output": None} for task in task_registry.get_tasks()}
task_output = None # Initialize task_output to None
while not all(task["completed"] for task in task_outputs.values()):
tasks = task_registry.get_tasks()
task_registry.print_tasklist(tasks)
for task in tasks:
if task["id"] not in task_outputs:
task_outputs[task["id"]] = {"completed": False, "output": None}
ready_tasks = [(task["id"], task) for task in tasks if all((dep in task_outputs and task_outputs[dep]["completed"]) for dep in task.get('dependent_task_ids', [])) and not task_outputs[task["id"]]["completed"]]
for task_id, task in ready_tasks:
run_single_task(task_id, task, global_skill_registry, task_outputs, OBJECTIVE, task_registry)
time.sleep(0.1)
# Assuming the last task in tasks has the latest output. Adjust if your use case is different.
last_task_id = tasks[-1]["id"] if tasks else None
task_output = task_outputs[last_task_id]["output"] if last_task_id else None
task_registry.reflect_on_final(OBJECTIVE, task_registry.get_tasks(), task_output, skill_descriptions)
global_skill_registry.reflect_skills(OBJECTIVE, task_registry.get_tasks(), task_output, skill_descriptions)
with open(f'output/output_{datetime.now().strftime("%d_%m_%Y_%H_%M_%S")}.txt', 'w') as file:
file.write(session_summary)
print("...file saved.")
print("END")
return task_output # Return the last task output
except Exception as e:
return f"An error occurred: {e}"
# Removed repeated logic for initiating skill registry
global_skill_registry = SkillRegistry(api_keys=api_keys, main_loop_function=run_main_loop, skill_names=LOAD_SKILLS)
def execute_skill(skill_name, objective, task_id):
"""Execute a single skill"""
skill = global_skill_registry.get_skill(skill_name)
if skill:
try:
result = skill.execute(objective, "", objective)
ongoing_tasks[task_id].update({"status": "completed", "output": result})
except Exception as e:
ongoing_tasks[task_id].update({"status": "error", "error": str(e)})
return task_id
return "Skill not found :("
def execute_task_list(objective, api_keys, task_id):
"""Execute a list of tasks"""
try:
task_registry = TaskRegistry()
result = run_main_loop(objective, get_skills(), api_keys)
ongoing_tasks[task_id].update({"status": "completed", "output": result})
return task_registry.get_tasks(), task_id
except Exception as e:
ongoing_tasks[task_id].update({"status": "error", "error": str(e)})
print(f"Error in execute_task_list: {e}")
return task_id
def get_skills():
"""Return the global skill registry"""
# Removed repeated logic for initiating skill registry
global global_skill_registry
print("Returning GLOBAL SKILL REGISTRY")
return global_skill_registry
# Removed repeated logic for initiating skill registry
global_skill_registry = SkillRegistry(api_keys=api_keys, main_loop_function=run_main_loop, skill_names=LOAD_SKILLS)
if __name__ == "__main__":
run_main_loop(OBJECTIVE, LOAD_SKILLS, api_keys, REFLECTION) |