CodePDE / program_database.py
LDA1020's picture
feat: code release
56c4b9b verified
# Copyright 2023 DeepMind Technologies Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""A programs database that implements the evolutionary algorithm."""
from collections.abc import Mapping, Sequence
import copy
import dataclasses
import time
from typing import Any
import numpy as np
import scipy
Signature = tuple[float, ...]
ScoresPerTest = Mapping[Any, float]
def _softmax(logits: np.ndarray, temperature: float) -> np.ndarray:
"""Returns the tempered softmax of 1D finite `logits`."""
if not np.all(np.isfinite(logits)):
non_finites = set(logits[~np.isfinite(logits)])
raise ValueError(f'`logits` contains non-finite value(s): {non_finites}')
if not np.issubdtype(logits.dtype, np.floating):
logits = np.array(logits, dtype=np.float32)
result = scipy.special.softmax(logits / temperature, axis=-1)
# Ensure that probabilities sum to 1 to prevent error in `np.random.choice`.
index = np.argmax(result)
result[index] = 1 - np.sum(result[0:index]) - np.sum(result[index+1:])
return result
def _reduce_score(scores_per_test: ScoresPerTest) -> float:
"""Reduces per-test scores into a single score."""
return scores_per_test[list(scores_per_test.keys())[-1]]
def _get_signature(scores_per_test: ScoresPerTest) -> Signature:
"""Represents test scores as a canonical signature."""
return tuple(scores_per_test[k] for k in sorted(scores_per_test.keys()))
@dataclasses.dataclass(frozen=True)
class Prompt:
"""A prompt produced by the ProgramsDatabase, to be sent to Samplers.
Attributes:
code: The prompt, ending with the header of the function to be completed.
version_generated: The function to be completed is `_v{version_generated}`.
island_id: Identifier of the island that produced the implementations
included in the prompt. Used to direct the newly generated implementation
into the same island.
"""
code: str
version_generated: int
island_id: int
@dataclasses.dataclass(frozen=True)
class ProgramsDatabaseConfig:
"""Configuration of a ProgramsDatabase.
Attributes:
functions_per_prompt: Number of previous programs to include in prompts.
num_islands: Number of islands to maintain as a diversity mechanism.
reset_period: How often (in seconds) the weakest islands should be reset.
cluster_sampling_temperature_init: Initial temperature for softmax sampling
of clusters within an island.
cluster_sampling_temperature_period: Period of linear decay of the cluster
sampling temperature.
"""
functions_per_prompt: int = 2
num_islands: int = 4
reset_period: int = 60 * 60
cluster_sampling_temperature_init: float = 0.1
cluster_sampling_temperature_period: int = 5
class ProgramsDatabase:
"""A collection of programs, organized as islands."""
def __init__(
self,
config: ProgramsDatabaseConfig,
) -> None:
self._config: ProgramsDatabaseConfig = config
# Initialize empty islands.
self._islands: list[Island] = []
for _ in range(config.num_islands):
self._islands.append(
Island(config.functions_per_prompt,
config.cluster_sampling_temperature_init,
config.cluster_sampling_temperature_period))
self._best_score_per_island: list[float] = (
[-float('inf')] * config.num_islands)
self._best_program_per_island: list[int | None] = (
[None] * config.num_islands)
self._best_scores_per_test_per_island: list[ScoresPerTest | None] = (
[None] * config.num_islands)
self._last_reset_time: float = time.time()
def get_seed(self) -> list[int]:
"""Returns a prompt containing implementations from one chosen island."""
island_id = np.random.randint(len(self._islands))
seed_ids = self._islands[island_id].get_seed()
return island_id, seed_ids
def _register_program_in_island(
self,
program: int,
program_len: int,
island_id: int,
scores_per_test: ScoresPerTest,
) -> None:
"""Registers `program` in the specified island."""
self._islands[island_id].register_program(program, program_len, scores_per_test)
score = _reduce_score(scores_per_test)
if score > self._best_score_per_island[island_id]:
self._best_program_per_island[island_id] = program
self._best_scores_per_test_per_island[island_id] = scores_per_test
self._best_score_per_island[island_id] = score
def register_program(
self,
program: int,
program_len: int,
island_id: int | None,
scores_per_test: ScoresPerTest,
) -> None:
"""Registers `program` in the database."""
# In an asynchronous implementation we should consider the possibility of
# registering a program on an island that had been reset after the prompt
# was generated. Leaving that out here for simplicity.
if island_id is None:
# This is a program added at the beginning, so adding it to all islands.
for island_id in range(len(self._islands)):
self._register_program_in_island(program, program_len, island_id, scores_per_test)
else:
self._register_program_in_island(program, program_len, island_id, scores_per_test)
# Check whether it is time to reset an island.
if (time.time() - self._last_reset_time > self._config.reset_period):
self._last_reset_time = time.time()
self.reset_islands()
def reset_islands(self) -> None:
"""Resets the weaker half of islands."""
# We sort best scores after adding minor noise to break ties.
indices_sorted_by_score: np.ndarray = np.argsort(
self._best_score_per_island +
np.random.randn(len(self._best_score_per_island)) * 1e-6)
num_islands_to_reset = self._config.num_islands // 2
reset_islands_ids = indices_sorted_by_score[:num_islands_to_reset]
keep_islands_ids = indices_sorted_by_score[num_islands_to_reset:]
for island_id in reset_islands_ids:
self._islands[island_id] = Island(
self._template,
self._function_to_evolve,
self._config.functions_per_prompt,
self._config.cluster_sampling_temperature_init,
self._config.cluster_sampling_temperature_period)
self._best_score_per_island[island_id] = -float('inf')
founder_island_id = np.random.choice(keep_islands_ids)
founder = self._best_program_per_island[founder_island_id]
founder_scores = self._best_scores_per_test_per_island[founder_island_id]
self._register_program_in_island(founder, island_id, founder_scores)
class Island:
"""A sub-population of the programs database."""
def __init__(
self,
functions_per_prompt: int,
cluster_sampling_temperature_init: float,
cluster_sampling_temperature_period: int,
) -> None:
self._functions_per_prompt: int = functions_per_prompt
self._cluster_sampling_temperature_init = cluster_sampling_temperature_init
self._cluster_sampling_temperature_period = (
cluster_sampling_temperature_period)
self._clusters: dict[Signature, Cluster] = {}
self._num_programs: int = 0
def register_program(
self,
program: int,
program_len: int,
scores_per_test: ScoresPerTest,
) -> None:
"""Stores a program on this island, in its appropriate cluster."""
signature = _get_signature(scores_per_test)
if signature not in self._clusters:
score = _reduce_score(scores_per_test)
self._clusters[signature] = Cluster(score, [program], [program_len])
else:
self._clusters[signature].register_program(program, program_len)
self._num_programs += 1
def get_seed(self) -> tuple[str, int]:
"""Constructs a prompt containing functions from this island."""
signatures = list(self._clusters.keys())
cluster_scores = np.array(
[self._clusters[signature].score for signature in signatures])
# Convert scores to probabilities using softmax with temperature schedule.
period = self._cluster_sampling_temperature_period
temperature = self._cluster_sampling_temperature_init * (
1 - (self._num_programs % period) / period)
probabilities = _softmax(cluster_scores, temperature)
# At the beginning of an experiment when we have few clusters, place fewer
# programs into the prompt.
functions_per_prompt = min(len(self._clusters), self._functions_per_prompt)
idx = np.random.choice(
len(signatures), size=functions_per_prompt, p=probabilities)
chosen_signatures = [signatures[i] for i in idx]
implementations = []
scores = []
for signature in chosen_signatures:
cluster = self._clusters[signature]
implementations.append(cluster.sample_program())
scores.append(cluster.score)
indices = np.argsort(scores)
sorted_implementations = [implementations[i] for i in indices]
return sorted_implementations
class Cluster:
"""A cluster of programs on the same island and with the same Signature."""
def __init__(self, score: float, init_programs: list[int], init_program_lengths: list[int]) -> None:
self._score = score
# We store the indices of the programs in the cluster
self._programs: list[int] = init_programs
self._lengths: list[int] = init_program_lengths
@property
def score(self) -> float:
"""Reduced score of the signature that this cluster represents."""
return self._score
def register_program(self, program_idx: int, program_length: int) -> None:
"""Adds `program` to the cluster."""
self._programs.append(program_idx)
self._lengths.append(program_length)
def sample_program(self) -> int:
"""Samples a program, giving higher probability to shorther programs."""
normalized_lengths = (np.array(self._lengths) - min(self._lengths)) / (
max(self._lengths) + 1e-6)
probabilities = _softmax(-normalized_lengths, temperature=1.0)
return np.random.choice(self._programs, p=probabilities)