File size: 8,683 Bytes
97e363b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
import os
import hydra
import aiflows
from aiflows.backends.api_info import ApiInfo
from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
from aiflows import logging
from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
from aiflows.utils import serving
from aiflows.workers import run_dispatch_worker_thread
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.utils.colink_utils import start_colink_server
from aiflows import flow_verse
import pandas as pd
import sys
from copy import deepcopy
import requests
import time
dependencies = [
{
"url": "aiflows/FunSearchFlowModule",
"revision": "../FunSearchFlowModule"
}
]
flow_verse.sync_dependencies(dependencies)
from flow_modules.aiflows.FunSearchFlowModule.Loader import Loader
logging.set_verbosity_debug()
def load_problem(id, ds_location = "./data/codeforces.jsonl.gz"):
def make_problem_descriptions_str(row):
def write_public_tests_individual_io_str(row):
public_tests = row.public_tests_individual_io
tests = ""
for i,test in enumerate(public_tests):
input = test[0]
output = test[1]
tests += f"Test {i+1}:\n Input: {input}\n Output: \'{output}\'\n"
return tests
problem_descritption = row.problem_description
input_descriptions = row.input_description
ouput_descriptions = row.output_description
public_tests = write_public_tests_individual_io_str(row)
problem_description_str = f"Problem Description:\n{problem_descritption}\n\n"
input_description_str = f"Input Description:\n{input_descriptions}\n\n"
output_description_str = f"Output Description:\n{ouput_descriptions}\n\n"
public_tests_str = f"Public Tests:\n{public_tests}\n"
final_str = problem_description_str + input_description_str + output_description_str +public_tests_str
return final_str
df = pd.read_json(ds_location, lines=True, compression='gzip')
row = df[df.id == id].iloc[0]
assert row.non_unique_output == False, "Problem has non unique output. Not supported yet"
problem_description = make_problem_descriptions_str(row)
public_test = row.public_tests_individual_io
tests = {}
test_counter = 1
for public_test in public_test:
tests["test_"+str(test_counter)] = {"tests_inputs": public_test[0], "expected_outputs": public_test[1]}
test_counter += 1
for hidden_test in row.hidden_tests_io:
tests["test_"+str(test_counter)] = {"tests_inputs": hidden_test[0], "expected_outputs": hidden_test[1]}
test_counter += 1
return tests, problem_description
def download_codeforces_data(data_folder_path,file_name):
print("Downloading data....")
os.makedirs(data_folder_path, exist_ok=True)
url = "https://github.com/epfl-dlab/cc_flows/raw/main/data/codeforces/codeforces.jsonl.gz"
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(os.path.join(data_folder_path,file_name), 'wb') as file:
for chunk in response:
file.write(chunk)
print("Download complete")
else:
print("Failed to download data", response.status_code)
def get_configs(problem_id, ds_location = "./data/codeforces.jsonl.gz"):
tests, problem_description = load_problem(problem_id,ds_location)
path = os.path.join(".", "demo.yaml")
funsearch_cfg = read_yaml_file(path)
evaluate_function_file_path: str = "./cf_functions.py"
evaluate_function_name: str = "evaluate"
evolve_function_name:str = "solve_function"
loader = Loader(file_path = evaluate_function_file_path, target_name = evaluate_function_name)
evaluate_function: str= loader.load_target()
evaluate_file_full_content = loader.load_full_file()
evaluate_file_full_content = f"\"\"\"{problem_description}\"\"\"\n\n" + evaluate_file_full_content
#~~~~~ ProgramDBFlow Overrides ~~~~~~~~
funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_function"] = evaluate_function
funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_file_full_content"] = evaluate_file_full_content
funsearch_cfg["subflows_config"]["ProgramDBFlow"]["artifact_to_evolve_name"] = evolve_function_name
if len(tests) > 0:
first_test = tests["test_1"]
dummy_solution = f"def {evolve_function_name}(input) -> str:" +\
"\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\
f"\n return \'{first_test['expected_outputs']}\'\n"
else:
dummy_solution = f"def {evolve_function_name}(input) -> str:" +\
"\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\
f"\n return 0.0\"\"\n"
#~~~~~~~~~~Evaluator overrides~~~~~~~~~~~~
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["py_file"] = evaluate_file_full_content
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["run_error_score"] = -1
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["function_to_run_name"] = evaluate_function_name
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["test_inputs"] = tests
#Hides test inputs from LLM (necessary for hidden tests. Makes same setup as in a real contest.)
funsearch_cfg["subflows_config"]["EvaluatorFlow"]["use_test_input_as_key"] = False
#~~~~~~~~~~Sampler overrides~~~~~~~~~~~~
funsearch_cfg["subflows_config"]["SamplerFlow"]["system_message_prompt_template"]["partial_variables"] = \
{
"evaluate_name": evaluate_function_name,
"evolve_name": evolve_function_name,
"artifacts_per_prompt": 2
}
return funsearch_cfg, dummy_solution
FLOW_MODULES_PATH = "./"
if __name__ == "__main__":
cl = start_colink_server()
problem_id = "1789B" #put the problem id here
if not os.path.exists("./data/codeforces.jsonl.gz"):
download_codeforces_data("./data", "codeforces.jsonl.gz")
funsearch_cfg, dummy_solution = get_configs(problem_id)
#Serve Program Database and get its flow type explicitly
api_information = [ApiInfo(backend_used="openai",
api_key = os.getenv("OPENAI_API_KEY"))]
serving.recursive_serve_flow(
cl=cl,
flow_class_name="flow_modules.aiflows.FunSearchFlowModule.FunSearch",
flow_endpoint="FunSearch",
)
# #Serve the rest
# serving.recursive_serve_flow(
# cl=cl,
# flow_type="FunSearch_served",
# default_config=funsearch_cfg,
# default_state=None,
# default_dispatch_point="coflows_dispatch",
# )
n_workers = 10
for i in range(n_workers):
run_dispatch_worker_thread(cl)
quick_load_api_keys(funsearch_cfg, api_information, key="api_infos")
config_overrides = None
#Mount ProgramDBFlow first to get it's flow ref
funsearch_proxy = serving.get_flow_instance(
cl=cl,
flow_endpoint="FunSearch",
config_overrides=funsearch_cfg,
)
data = {
"from": "SamplerFlow",
"operation": "register_program",
"api_output": dummy_solution
}
input_message = funsearch_proxy.package_input_message(data = data)
funsearch_proxy.send_message(input_message)
data = {
"from": "FunSearch",
"operation": "start",
"content": {"num_samplers": 5},
}
input_message = funsearch_proxy.package_input_message(data = data)
funsearch_proxy.send_message(input_message)
data = {
"from": "FunSearch",
"operation": "stop",
"content": {},
}
input_message = funsearch_proxy.package_input_message(data = data)
funsearch_proxy.send_message(input_message)
wait_time = 1000
print(f"Waiting {wait_time} seconds before requesting result...")
time.sleep(wait_time)
data = {
"from": "FunSearch",
"operation": "get_best_programs_per_island",
"content": {}
}
input_message = funsearch_proxy.package_input_message(data = data)
future = funsearch_proxy.get_reply_future(input_message)
print("waiting for response....")
response = future.get_data()
print(response)
|