|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""A programs database that implements the evolutionary algorithm.""" |
|
from collections.abc import Mapping, Sequence |
|
import copy |
|
import dataclasses |
|
import time |
|
from typing import Any |
|
|
|
import numpy as np |
|
import scipy |
|
|
|
Signature = tuple[float, ...] |
|
ScoresPerTest = Mapping[Any, float] |
|
|
|
|
|
def _softmax(logits: np.ndarray, temperature: float) -> np.ndarray: |
|
"""Returns the tempered softmax of 1D finite `logits`.""" |
|
if not np.all(np.isfinite(logits)): |
|
non_finites = set(logits[~np.isfinite(logits)]) |
|
raise ValueError(f'`logits` contains non-finite value(s): {non_finites}') |
|
if not np.issubdtype(logits.dtype, np.floating): |
|
logits = np.array(logits, dtype=np.float32) |
|
|
|
result = scipy.special.softmax(logits / temperature, axis=-1) |
|
|
|
index = np.argmax(result) |
|
result[index] = 1 - np.sum(result[0:index]) - np.sum(result[index+1:]) |
|
return result |
|
|
|
|
|
def _reduce_score(scores_per_test: ScoresPerTest) -> float: |
|
"""Reduces per-test scores into a single score.""" |
|
return scores_per_test[list(scores_per_test.keys())[-1]] |
|
|
|
|
|
def _get_signature(scores_per_test: ScoresPerTest) -> Signature: |
|
"""Represents test scores as a canonical signature.""" |
|
return tuple(scores_per_test[k] for k in sorted(scores_per_test.keys())) |
|
|
|
|
|
@dataclasses.dataclass(frozen=True) |
|
class Prompt: |
|
"""A prompt produced by the ProgramsDatabase, to be sent to Samplers. |
|
|
|
Attributes: |
|
code: The prompt, ending with the header of the function to be completed. |
|
version_generated: The function to be completed is `_v{version_generated}`. |
|
island_id: Identifier of the island that produced the implementations |
|
included in the prompt. Used to direct the newly generated implementation |
|
into the same island. |
|
""" |
|
code: str |
|
version_generated: int |
|
island_id: int |
|
|
|
|
|
@dataclasses.dataclass(frozen=True) |
|
class ProgramsDatabaseConfig: |
|
"""Configuration of a ProgramsDatabase. |
|
|
|
Attributes: |
|
functions_per_prompt: Number of previous programs to include in prompts. |
|
num_islands: Number of islands to maintain as a diversity mechanism. |
|
reset_period: How often (in seconds) the weakest islands should be reset. |
|
cluster_sampling_temperature_init: Initial temperature for softmax sampling |
|
of clusters within an island. |
|
cluster_sampling_temperature_period: Period of linear decay of the cluster |
|
sampling temperature. |
|
""" |
|
functions_per_prompt: int = 2 |
|
num_islands: int = 4 |
|
reset_period: int = 60 * 60 |
|
cluster_sampling_temperature_init: float = 0.1 |
|
cluster_sampling_temperature_period: int = 5 |
|
|
|
|
|
class ProgramsDatabase: |
|
"""A collection of programs, organized as islands.""" |
|
|
|
def __init__( |
|
self, |
|
config: ProgramsDatabaseConfig, |
|
) -> None: |
|
self._config: ProgramsDatabaseConfig = config |
|
|
|
|
|
self._islands: list[Island] = [] |
|
for _ in range(config.num_islands): |
|
self._islands.append( |
|
Island(config.functions_per_prompt, |
|
config.cluster_sampling_temperature_init, |
|
config.cluster_sampling_temperature_period)) |
|
self._best_score_per_island: list[float] = ( |
|
[-float('inf')] * config.num_islands) |
|
self._best_program_per_island: list[int | None] = ( |
|
[None] * config.num_islands) |
|
self._best_scores_per_test_per_island: list[ScoresPerTest | None] = ( |
|
[None] * config.num_islands) |
|
|
|
self._last_reset_time: float = time.time() |
|
|
|
def get_seed(self) -> list[int]: |
|
"""Returns a prompt containing implementations from one chosen island.""" |
|
island_id = np.random.randint(len(self._islands)) |
|
seed_ids = self._islands[island_id].get_seed() |
|
return island_id, seed_ids |
|
|
|
def _register_program_in_island( |
|
self, |
|
program: int, |
|
program_len: int, |
|
island_id: int, |
|
scores_per_test: ScoresPerTest, |
|
) -> None: |
|
"""Registers `program` in the specified island.""" |
|
self._islands[island_id].register_program(program, program_len, scores_per_test) |
|
score = _reduce_score(scores_per_test) |
|
if score > self._best_score_per_island[island_id]: |
|
self._best_program_per_island[island_id] = program |
|
self._best_scores_per_test_per_island[island_id] = scores_per_test |
|
self._best_score_per_island[island_id] = score |
|
|
|
def register_program( |
|
self, |
|
program: int, |
|
program_len: int, |
|
island_id: int | None, |
|
scores_per_test: ScoresPerTest, |
|
) -> None: |
|
"""Registers `program` in the database.""" |
|
|
|
|
|
|
|
if island_id is None: |
|
|
|
for island_id in range(len(self._islands)): |
|
self._register_program_in_island(program, program_len, island_id, scores_per_test) |
|
else: |
|
self._register_program_in_island(program, program_len, island_id, scores_per_test) |
|
|
|
|
|
if (time.time() - self._last_reset_time > self._config.reset_period): |
|
self._last_reset_time = time.time() |
|
self.reset_islands() |
|
|
|
def reset_islands(self) -> None: |
|
"""Resets the weaker half of islands.""" |
|
|
|
indices_sorted_by_score: np.ndarray = np.argsort( |
|
self._best_score_per_island + |
|
np.random.randn(len(self._best_score_per_island)) * 1e-6) |
|
num_islands_to_reset = self._config.num_islands // 2 |
|
reset_islands_ids = indices_sorted_by_score[:num_islands_to_reset] |
|
keep_islands_ids = indices_sorted_by_score[num_islands_to_reset:] |
|
for island_id in reset_islands_ids: |
|
self._islands[island_id] = Island( |
|
self._template, |
|
self._function_to_evolve, |
|
self._config.functions_per_prompt, |
|
self._config.cluster_sampling_temperature_init, |
|
self._config.cluster_sampling_temperature_period) |
|
self._best_score_per_island[island_id] = -float('inf') |
|
founder_island_id = np.random.choice(keep_islands_ids) |
|
founder = self._best_program_per_island[founder_island_id] |
|
founder_scores = self._best_scores_per_test_per_island[founder_island_id] |
|
self._register_program_in_island(founder, island_id, founder_scores) |
|
|
|
|
|
class Island: |
|
"""A sub-population of the programs database.""" |
|
|
|
def __init__( |
|
self, |
|
functions_per_prompt: int, |
|
cluster_sampling_temperature_init: float, |
|
cluster_sampling_temperature_period: int, |
|
) -> None: |
|
self._functions_per_prompt: int = functions_per_prompt |
|
self._cluster_sampling_temperature_init = cluster_sampling_temperature_init |
|
self._cluster_sampling_temperature_period = ( |
|
cluster_sampling_temperature_period) |
|
|
|
self._clusters: dict[Signature, Cluster] = {} |
|
self._num_programs: int = 0 |
|
|
|
def register_program( |
|
self, |
|
program: int, |
|
program_len: int, |
|
scores_per_test: ScoresPerTest, |
|
) -> None: |
|
"""Stores a program on this island, in its appropriate cluster.""" |
|
signature = _get_signature(scores_per_test) |
|
if signature not in self._clusters: |
|
score = _reduce_score(scores_per_test) |
|
self._clusters[signature] = Cluster(score, [program], [program_len]) |
|
else: |
|
self._clusters[signature].register_program(program, program_len) |
|
self._num_programs += 1 |
|
|
|
def get_seed(self) -> tuple[str, int]: |
|
"""Constructs a prompt containing functions from this island.""" |
|
signatures = list(self._clusters.keys()) |
|
cluster_scores = np.array( |
|
[self._clusters[signature].score for signature in signatures]) |
|
|
|
|
|
period = self._cluster_sampling_temperature_period |
|
temperature = self._cluster_sampling_temperature_init * ( |
|
1 - (self._num_programs % period) / period) |
|
probabilities = _softmax(cluster_scores, temperature) |
|
|
|
|
|
|
|
functions_per_prompt = min(len(self._clusters), self._functions_per_prompt) |
|
|
|
idx = np.random.choice( |
|
len(signatures), size=functions_per_prompt, p=probabilities) |
|
chosen_signatures = [signatures[i] for i in idx] |
|
implementations = [] |
|
scores = [] |
|
for signature in chosen_signatures: |
|
cluster = self._clusters[signature] |
|
implementations.append(cluster.sample_program()) |
|
scores.append(cluster.score) |
|
|
|
indices = np.argsort(scores) |
|
sorted_implementations = [implementations[i] for i in indices] |
|
return sorted_implementations |
|
|
|
|
|
class Cluster: |
|
"""A cluster of programs on the same island and with the same Signature.""" |
|
|
|
def __init__(self, score: float, init_programs: list[int], init_program_lengths: list[int]) -> None: |
|
self._score = score |
|
|
|
self._programs: list[int] = init_programs |
|
self._lengths: list[int] = init_program_lengths |
|
|
|
@property |
|
def score(self) -> float: |
|
"""Reduced score of the signature that this cluster represents.""" |
|
return self._score |
|
|
|
def register_program(self, program_idx: int, program_length: int) -> None: |
|
"""Adds `program` to the cluster.""" |
|
self._programs.append(program_idx) |
|
self._lengths.append(program_length) |
|
|
|
def sample_program(self) -> int: |
|
"""Samples a program, giving higher probability to shorther programs.""" |
|
normalized_lengths = (np.array(self._lengths) - min(self._lengths)) / ( |
|
max(self._lengths) + 1e-6) |
|
probabilities = _softmax(-normalized_lengths, temperature=1.0) |
|
return np.random.choice(self._programs, p=probabilities) |