|
import os |
|
|
|
import hydra |
|
|
|
import aiflows |
|
from aiflows.backends.api_info import ApiInfo |
|
from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys |
|
|
|
from aiflows import logging |
|
from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache |
|
|
|
from aiflows.utils import serving |
|
from aiflows.workers import run_dispatch_worker_thread |
|
from aiflows.messages import FlowMessage |
|
from aiflows.interfaces import KeyInterface |
|
from aiflows.utils.colink_utils import start_colink_server |
|
from aiflows import flow_verse |
|
import pandas as pd |
|
import sys |
|
from copy import deepcopy |
|
import requests |
|
import time |
|
|
|
dependencies = [ |
|
{ |
|
"url": "aiflows/FunSearchFlowModule", |
|
"revision": "../FunSearchFlowModule" |
|
} |
|
] |
|
flow_verse.sync_dependencies(dependencies) |
|
from flow_modules.aiflows.FunSearchFlowModule.Loader import Loader |
|
|
|
logging.set_verbosity_debug() |
|
|
|
|
|
def load_problem(id, ds_location = "./data/codeforces.jsonl.gz"): |
|
def make_problem_descriptions_str(row): |
|
def write_public_tests_individual_io_str(row): |
|
public_tests = row.public_tests_individual_io |
|
tests = "" |
|
for i,test in enumerate(public_tests): |
|
input = test[0] |
|
output = test[1] |
|
tests += f"Test {i+1}:\n Input: {input}\n Output: \'{output}\'\n" |
|
return tests |
|
|
|
problem_descritption = row.problem_description |
|
input_descriptions = row.input_description |
|
ouput_descriptions = row.output_description |
|
public_tests = write_public_tests_individual_io_str(row) |
|
|
|
problem_description_str = f"Problem Description:\n{problem_descritption}\n\n" |
|
input_description_str = f"Input Description:\n{input_descriptions}\n\n" |
|
output_description_str = f"Output Description:\n{ouput_descriptions}\n\n" |
|
public_tests_str = f"Public Tests:\n{public_tests}\n" |
|
|
|
final_str = problem_description_str + input_description_str + output_description_str +public_tests_str |
|
return final_str |
|
|
|
df = pd.read_json(ds_location, lines=True, compression='gzip') |
|
row = df[df.id == id].iloc[0] |
|
|
|
assert row.non_unique_output == False, "Problem has non unique output. Not supported yet" |
|
|
|
problem_description = make_problem_descriptions_str(row) |
|
public_test = row.public_tests_individual_io |
|
tests = {} |
|
test_counter = 1 |
|
|
|
for public_test in public_test: |
|
tests["test_"+str(test_counter)] = {"tests_inputs": public_test[0], "expected_outputs": public_test[1]} |
|
test_counter += 1 |
|
|
|
for hidden_test in row.hidden_tests_io: |
|
tests["test_"+str(test_counter)] = {"tests_inputs": hidden_test[0], "expected_outputs": hidden_test[1]} |
|
test_counter += 1 |
|
|
|
return tests, problem_description |
|
|
|
def download_codeforces_data(data_folder_path,file_name): |
|
print("Downloading data....") |
|
os.makedirs(data_folder_path, exist_ok=True) |
|
url = "https://github.com/epfl-dlab/cc_flows/raw/main/data/codeforces/codeforces.jsonl.gz" |
|
response = requests.get(url, stream=True) |
|
|
|
if response.status_code == 200: |
|
with open(os.path.join(data_folder_path,file_name), 'wb') as file: |
|
for chunk in response: |
|
file.write(chunk) |
|
print("Download complete") |
|
else: |
|
print("Failed to download data", response.status_code) |
|
|
|
|
|
def get_configs(problem_id, ds_location = "./data/codeforces.jsonl.gz"): |
|
tests, problem_description = load_problem(problem_id,ds_location) |
|
|
|
path = os.path.join(".", "demo.yaml") |
|
funsearch_cfg = read_yaml_file(path) |
|
|
|
evaluate_function_file_path: str = "./cf_functions.py" |
|
evaluate_function_name: str = "evaluate" |
|
evolve_function_name:str = "solve_function" |
|
|
|
loader = Loader(file_path = evaluate_function_file_path, target_name = evaluate_function_name) |
|
evaluate_function: str= loader.load_target() |
|
evaluate_file_full_content = loader.load_full_file() |
|
|
|
evaluate_file_full_content = f"\"\"\"{problem_description}\"\"\"\n\n" + evaluate_file_full_content |
|
|
|
|
|
funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_function"] = evaluate_function |
|
funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_file_full_content"] = evaluate_file_full_content |
|
funsearch_cfg["subflows_config"]["ProgramDBFlow"]["artifact_to_evolve_name"] = evolve_function_name |
|
|
|
if len(tests) > 0: |
|
first_test = tests["test_1"] |
|
|
|
dummy_solution = f"def {evolve_function_name}(input) -> str:" +\ |
|
"\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\ |
|
f"\n return \'{first_test['expected_outputs']}\'\n" |
|
|
|
|
|
else: |
|
dummy_solution = f"def {evolve_function_name}(input) -> str:" +\ |
|
"\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\ |
|
f"\n return 0.0\"\"\n" |
|
|
|
|
|
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["py_file"] = evaluate_file_full_content |
|
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["run_error_score"] = -1 |
|
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["function_to_run_name"] = evaluate_function_name |
|
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["test_inputs"] = tests |
|
|
|
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["use_test_input_as_key"] = False |
|
|
|
|
|
|
|
funsearch_cfg["subflows_config"]["SamplerFlow"]["system_message_prompt_template"]["partial_variables"] = \ |
|
{ |
|
"evaluate_name": evaluate_function_name, |
|
"evolve_name": evolve_function_name, |
|
"artifacts_per_prompt": 2 |
|
} |
|
|
|
|
|
return funsearch_cfg, dummy_solution |
|
|
|
|
|
FLOW_MODULES_PATH = "./" |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
cl = start_colink_server() |
|
|
|
problem_id = "1789B" |
|
|
|
if not os.path.exists("./data/codeforces.jsonl.gz"): |
|
download_codeforces_data("./data", "codeforces.jsonl.gz") |
|
|
|
funsearch_cfg, dummy_solution = get_configs(problem_id) |
|
|
|
api_information = [ApiInfo(backend_used="openai", |
|
api_key = os.getenv("OPENAI_API_KEY"))] |
|
|
|
serving.recursive_serve_flow( |
|
cl=cl, |
|
flow_class_name="flow_modules.aiflows.FunSearchFlowModule.FunSearch", |
|
flow_endpoint="FunSearch", |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
n_workers = 10 |
|
for i in range(n_workers): |
|
run_dispatch_worker_thread(cl) |
|
|
|
quick_load_api_keys(funsearch_cfg, api_information, key="api_infos") |
|
config_overrides = None |
|
|
|
funsearch_proxy = serving.get_flow_instance( |
|
cl=cl, |
|
flow_endpoint="FunSearch", |
|
config_overrides=funsearch_cfg, |
|
) |
|
|
|
data = { |
|
"from": "SamplerFlow", |
|
"operation": "register_program", |
|
"api_output": dummy_solution |
|
} |
|
|
|
input_message = funsearch_proxy.package_input_message(data = data) |
|
|
|
funsearch_proxy.send_message(input_message) |
|
|
|
|
|
data = { |
|
"from": "FunSearch", |
|
"operation": "start", |
|
"content": {"num_samplers": 5}, |
|
} |
|
|
|
input_message = funsearch_proxy.package_input_message(data = data) |
|
|
|
funsearch_proxy.send_message(input_message) |
|
|
|
data = { |
|
"from": "FunSearch", |
|
"operation": "stop", |
|
"content": {}, |
|
} |
|
|
|
input_message = funsearch_proxy.package_input_message(data = data) |
|
|
|
funsearch_proxy.send_message(input_message) |
|
|
|
|
|
wait_time = 1000 |
|
print(f"Waiting {wait_time} seconds before requesting result...") |
|
time.sleep(wait_time) |
|
|
|
data = { |
|
"from": "FunSearch", |
|
"operation": "get_best_programs_per_island", |
|
"content": {} |
|
} |
|
|
|
input_message = funsearch_proxy.package_input_message(data = data) |
|
|
|
future = funsearch_proxy.get_reply_future(input_message) |
|
print("waiting for response....") |
|
response = future.get_data() |
|
print(response) |
|
|