# Copyright 2023 DeepMind Technologies Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """A programs database that implements the evolutionary algorithm.""" from collections.abc import Mapping, Sequence import copy import dataclasses import time from typing import Any import numpy as np import scipy Signature = tuple[float, ...] ScoresPerTest = Mapping[Any, float] def _softmax(logits: np.ndarray, temperature: float) -> np.ndarray: """Returns the tempered softmax of 1D finite `logits`.""" if not np.all(np.isfinite(logits)): non_finites = set(logits[~np.isfinite(logits)]) raise ValueError(f'`logits` contains non-finite value(s): {non_finites}') if not np.issubdtype(logits.dtype, np.floating): logits = np.array(logits, dtype=np.float32) result = scipy.special.softmax(logits / temperature, axis=-1) # Ensure that probabilities sum to 1 to prevent error in `np.random.choice`. index = np.argmax(result) result[index] = 1 - np.sum(result[0:index]) - np.sum(result[index+1:]) return result def _reduce_score(scores_per_test: ScoresPerTest) -> float: """Reduces per-test scores into a single score.""" return scores_per_test[list(scores_per_test.keys())[-1]] def _get_signature(scores_per_test: ScoresPerTest) -> Signature: """Represents test scores as a canonical signature.""" return tuple(scores_per_test[k] for k in sorted(scores_per_test.keys())) @dataclasses.dataclass(frozen=True) class Prompt: """A prompt produced by the ProgramsDatabase, to be sent to Samplers. Attributes: code: The prompt, ending with the header of the function to be completed. version_generated: The function to be completed is `_v{version_generated}`. island_id: Identifier of the island that produced the implementations included in the prompt. Used to direct the newly generated implementation into the same island. """ code: str version_generated: int island_id: int @dataclasses.dataclass(frozen=True) class ProgramsDatabaseConfig: """Configuration of a ProgramsDatabase. Attributes: functions_per_prompt: Number of previous programs to include in prompts. num_islands: Number of islands to maintain as a diversity mechanism. reset_period: How often (in seconds) the weakest islands should be reset. cluster_sampling_temperature_init: Initial temperature for softmax sampling of clusters within an island. cluster_sampling_temperature_period: Period of linear decay of the cluster sampling temperature. """ functions_per_prompt: int = 2 num_islands: int = 4 reset_period: int = 60 * 60 cluster_sampling_temperature_init: float = 0.1 cluster_sampling_temperature_period: int = 5 class ProgramsDatabase: """A collection of programs, organized as islands.""" def __init__( self, config: ProgramsDatabaseConfig, ) -> None: self._config: ProgramsDatabaseConfig = config # Initialize empty islands. self._islands: list[Island] = [] for _ in range(config.num_islands): self._islands.append( Island(config.functions_per_prompt, config.cluster_sampling_temperature_init, config.cluster_sampling_temperature_period)) self._best_score_per_island: list[float] = ( [-float('inf')] * config.num_islands) self._best_program_per_island: list[int | None] = ( [None] * config.num_islands) self._best_scores_per_test_per_island: list[ScoresPerTest | None] = ( [None] * config.num_islands) self._last_reset_time: float = time.time() def get_seed(self) -> list[int]: """Returns a prompt containing implementations from one chosen island.""" island_id = np.random.randint(len(self._islands)) seed_ids = self._islands[island_id].get_seed() return island_id, seed_ids def _register_program_in_island( self, program: int, program_len: int, island_id: int, scores_per_test: ScoresPerTest, ) -> None: """Registers `program` in the specified island.""" self._islands[island_id].register_program(program, program_len, scores_per_test) score = _reduce_score(scores_per_test) if score > self._best_score_per_island[island_id]: self._best_program_per_island[island_id] = program self._best_scores_per_test_per_island[island_id] = scores_per_test self._best_score_per_island[island_id] = score def register_program( self, program: int, program_len: int, island_id: int | None, scores_per_test: ScoresPerTest, ) -> None: """Registers `program` in the database.""" # In an asynchronous implementation we should consider the possibility of # registering a program on an island that had been reset after the prompt # was generated. Leaving that out here for simplicity. if island_id is None: # This is a program added at the beginning, so adding it to all islands. for island_id in range(len(self._islands)): self._register_program_in_island(program, program_len, island_id, scores_per_test) else: self._register_program_in_island(program, program_len, island_id, scores_per_test) # Check whether it is time to reset an island. if (time.time() - self._last_reset_time > self._config.reset_period): self._last_reset_time = time.time() self.reset_islands() def reset_islands(self) -> None: """Resets the weaker half of islands.""" # We sort best scores after adding minor noise to break ties. indices_sorted_by_score: np.ndarray = np.argsort( self._best_score_per_island + np.random.randn(len(self._best_score_per_island)) * 1e-6) num_islands_to_reset = self._config.num_islands // 2 reset_islands_ids = indices_sorted_by_score[:num_islands_to_reset] keep_islands_ids = indices_sorted_by_score[num_islands_to_reset:] for island_id in reset_islands_ids: self._islands[island_id] = Island( self._template, self._function_to_evolve, self._config.functions_per_prompt, self._config.cluster_sampling_temperature_init, self._config.cluster_sampling_temperature_period) self._best_score_per_island[island_id] = -float('inf') founder_island_id = np.random.choice(keep_islands_ids) founder = self._best_program_per_island[founder_island_id] founder_scores = self._best_scores_per_test_per_island[founder_island_id] self._register_program_in_island(founder, island_id, founder_scores) class Island: """A sub-population of the programs database.""" def __init__( self, functions_per_prompt: int, cluster_sampling_temperature_init: float, cluster_sampling_temperature_period: int, ) -> None: self._functions_per_prompt: int = functions_per_prompt self._cluster_sampling_temperature_init = cluster_sampling_temperature_init self._cluster_sampling_temperature_period = ( cluster_sampling_temperature_period) self._clusters: dict[Signature, Cluster] = {} self._num_programs: int = 0 def register_program( self, program: int, program_len: int, scores_per_test: ScoresPerTest, ) -> None: """Stores a program on this island, in its appropriate cluster.""" signature = _get_signature(scores_per_test) if signature not in self._clusters: score = _reduce_score(scores_per_test) self._clusters[signature] = Cluster(score, [program], [program_len]) else: self._clusters[signature].register_program(program, program_len) self._num_programs += 1 def get_seed(self) -> tuple[str, int]: """Constructs a prompt containing functions from this island.""" signatures = list(self._clusters.keys()) cluster_scores = np.array( [self._clusters[signature].score for signature in signatures]) # Convert scores to probabilities using softmax with temperature schedule. period = self._cluster_sampling_temperature_period temperature = self._cluster_sampling_temperature_init * ( 1 - (self._num_programs % period) / period) probabilities = _softmax(cluster_scores, temperature) # At the beginning of an experiment when we have few clusters, place fewer # programs into the prompt. functions_per_prompt = min(len(self._clusters), self._functions_per_prompt) idx = np.random.choice( len(signatures), size=functions_per_prompt, p=probabilities) chosen_signatures = [signatures[i] for i in idx] implementations = [] scores = [] for signature in chosen_signatures: cluster = self._clusters[signature] implementations.append(cluster.sample_program()) scores.append(cluster.score) indices = np.argsort(scores) sorted_implementations = [implementations[i] for i in indices] return sorted_implementations class Cluster: """A cluster of programs on the same island and with the same Signature.""" def __init__(self, score: float, init_programs: list[int], init_program_lengths: list[int]) -> None: self._score = score # We store the indices of the programs in the cluster self._programs: list[int] = init_programs self._lengths: list[int] = init_program_lengths @property def score(self) -> float: """Reduced score of the signature that this cluster represents.""" return self._score def register_program(self, program_idx: int, program_length: int) -> None: """Adds `program` to the cluster.""" self._programs.append(program_idx) self._lengths.append(program_length) def sample_program(self) -> int: """Samples a program, giving higher probability to shorther programs.""" normalized_lengths = (np.array(self._lengths) - min(self._lengths)) / ( max(self._lengths) + 1e-6) probabilities = _softmax(-normalized_lengths, temperature=1.0) return np.random.choice(self._programs, p=probabilities)