nbaldwin's picture
first version FunSearch
97e363b
raw
history blame
8.68 kB
import os
import hydra
import aiflows
from aiflows.backends.api_info import ApiInfo
from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
from aiflows import logging
from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
from aiflows.utils import serving
from aiflows.workers import run_dispatch_worker_thread
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.utils.colink_utils import start_colink_server
from aiflows import flow_verse
import pandas as pd
import sys
from copy import deepcopy
import requests
import time
dependencies = [
{
"url": "aiflows/FunSearchFlowModule",
"revision": "../FunSearchFlowModule"
}
]
flow_verse.sync_dependencies(dependencies)
from flow_modules.aiflows.FunSearchFlowModule.Loader import Loader
logging.set_verbosity_debug()
def load_problem(id, ds_location = "./data/codeforces.jsonl.gz"):
def make_problem_descriptions_str(row):
def write_public_tests_individual_io_str(row):
public_tests = row.public_tests_individual_io
tests = ""
for i,test in enumerate(public_tests):
input = test[0]
output = test[1]
tests += f"Test {i+1}:\n Input: {input}\n Output: \'{output}\'\n"
return tests
problem_descritption = row.problem_description
input_descriptions = row.input_description
ouput_descriptions = row.output_description
public_tests = write_public_tests_individual_io_str(row)
problem_description_str = f"Problem Description:\n{problem_descritption}\n\n"
input_description_str = f"Input Description:\n{input_descriptions}\n\n"
output_description_str = f"Output Description:\n{ouput_descriptions}\n\n"
public_tests_str = f"Public Tests:\n{public_tests}\n"
final_str = problem_description_str + input_description_str + output_description_str +public_tests_str
return final_str
df = pd.read_json(ds_location, lines=True, compression='gzip')
row = df[df.id == id].iloc[0]
assert row.non_unique_output == False, "Problem has non unique output. Not supported yet"
problem_description = make_problem_descriptions_str(row)
public_test = row.public_tests_individual_io
tests = {}
test_counter = 1
for public_test in public_test:
tests["test_"+str(test_counter)] = {"tests_inputs": public_test[0], "expected_outputs": public_test[1]}
test_counter += 1
for hidden_test in row.hidden_tests_io:
tests["test_"+str(test_counter)] = {"tests_inputs": hidden_test[0], "expected_outputs": hidden_test[1]}
test_counter += 1
return tests, problem_description
def download_codeforces_data(data_folder_path,file_name):
print("Downloading data....")
os.makedirs(data_folder_path, exist_ok=True)
url = "https://github.com/epfl-dlab/cc_flows/raw/main/data/codeforces/codeforces.jsonl.gz"
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(os.path.join(data_folder_path,file_name), 'wb') as file:
for chunk in response:
file.write(chunk)
print("Download complete")
else:
print("Failed to download data", response.status_code)
def get_configs(problem_id, ds_location = "./data/codeforces.jsonl.gz"):
tests, problem_description = load_problem(problem_id,ds_location)
path = os.path.join(".", "demo.yaml")
funsearch_cfg = read_yaml_file(path)
evaluate_function_file_path: str = "./cf_functions.py"
evaluate_function_name: str = "evaluate"
evolve_function_name:str = "solve_function"
loader = Loader(file_path = evaluate_function_file_path, target_name = evaluate_function_name)
evaluate_function: str= loader.load_target()
evaluate_file_full_content = loader.load_full_file()
evaluate_file_full_content = f"\"\"\"{problem_description}\"\"\"\n\n" + evaluate_file_full_content
#~~~~~ ProgramDBFlow Overrides ~~~~~~~~
funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_function"] = evaluate_function
funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_file_full_content"] = evaluate_file_full_content
funsearch_cfg["subflows_config"]["ProgramDBFlow"]["artifact_to_evolve_name"] = evolve_function_name
if len(tests) > 0:
first_test = tests["test_1"]
dummy_solution = f"def {evolve_function_name}(input) -> str:" +\
"\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\
f"\n return \'{first_test['expected_outputs']}\'\n"
else:
dummy_solution = f"def {evolve_function_name}(input) -> str:" +\
"\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\
f"\n return 0.0\"\"\n"
#~~~~~~~~~~Evaluator overrides~~~~~~~~~~~~
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["py_file"] = evaluate_file_full_content
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["run_error_score"] = -1
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["function_to_run_name"] = evaluate_function_name
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["test_inputs"] = tests
#Hides test inputs from LLM (necessary for hidden tests. Makes same setup as in a real contest.)
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["use_test_input_as_key"] = False
#~~~~~~~~~~Sampler overrides~~~~~~~~~~~~
funsearch_cfg["subflows_config"]["SamplerFlow"]["system_message_prompt_template"]["partial_variables"] = \
{
"evaluate_name": evaluate_function_name,
"evolve_name": evolve_function_name,
"artifacts_per_prompt": 2
}
return funsearch_cfg, dummy_solution
FLOW_MODULES_PATH = "./"
if __name__ == "__main__":
cl = start_colink_server()
problem_id = "1789B" #put the problem id here
if not os.path.exists("./data/codeforces.jsonl.gz"):
download_codeforces_data("./data", "codeforces.jsonl.gz")
funsearch_cfg, dummy_solution = get_configs(problem_id)
#Serve Program Database and get its flow type explicitly
api_information = [ApiInfo(backend_used="openai",
api_key = os.getenv("OPENAI_API_KEY"))]
serving.recursive_serve_flow(
cl=cl,
flow_class_name="flow_modules.aiflows.FunSearchFlowModule.FunSearch",
flow_endpoint="FunSearch",
)
# #Serve the rest
# serving.recursive_serve_flow(
# cl=cl,
# flow_type="FunSearch_served",
# default_config=funsearch_cfg,
# default_state=None,
# default_dispatch_point="coflows_dispatch",
# )
n_workers = 10
for i in range(n_workers):
run_dispatch_worker_thread(cl)
quick_load_api_keys(funsearch_cfg, api_information, key="api_infos")
config_overrides = None
#Mount ProgramDBFlow first to get it's flow ref
funsearch_proxy = serving.get_flow_instance(
cl=cl,
flow_endpoint="FunSearch",
config_overrides=funsearch_cfg,
)
data = {
"from": "SamplerFlow",
"operation": "register_program",
"api_output": dummy_solution
}
input_message = funsearch_proxy.package_input_message(data = data)
funsearch_proxy.send_message(input_message)
data = {
"from": "FunSearch",
"operation": "start",
"content": {"num_samplers": 5},
}
input_message = funsearch_proxy.package_input_message(data = data)
funsearch_proxy.send_message(input_message)
data = {
"from": "FunSearch",
"operation": "stop",
"content": {},
}
input_message = funsearch_proxy.package_input_message(data = data)
funsearch_proxy.send_message(input_message)
wait_time = 1000
print(f"Waiting {wait_time} seconds before requesting result...")
time.sleep(wait_time)
data = {
"from": "FunSearch",
"operation": "get_best_programs_per_island",
"content": {}
}
input_message = funsearch_proxy.package_input_message(data = data)
future = funsearch_proxy.get_reply_future(input_message)
print("waiting for response....")
response = future.get_data()
print(response)