|
import dsp |
|
import dspy |
|
from dspy.teleprompt.teleprompt import Teleprompter |
|
from dspy.signatures import Signature |
|
from dspy.evaluate.evaluate import Evaluate |
|
from collections import defaultdict |
|
import random |
|
from dspy.teleprompt import BootstrapFewShot |
|
import numpy as np |
|
import optuna |
|
import math |
|
|
|
""" |
|
USAGE SUGGESTIONS: |
|
|
|
The following code can be used to compile a optimized signature teleprompter using the BayesianSignatureOptimizer, and evaluate it on an end task: |
|
|
|
from dspy.teleprompt import BayesianSignatureOptimizer |
|
|
|
teleprompter = BayesianSignatureOptimizer(prompt_model=prompt_model, task_model=task_model, metric=metric, n=10, init_temperature=1.0) |
|
kwargs = dict(num_threads=NUM_THREADS, display_progress=True, display_table=0) |
|
compiled_prompt_opt = teleprompter.compile(program, devset=devset[:DEV_NUM], optuna_trials_num=100, max_bootstrapped_demos=3, max_labeled_demos=5, eval_kwargs=kwargs) |
|
eval_score = evaluate(compiled_prompt_opt, devset=evalset[:EVAL_NUM], **kwargs) |
|
|
|
Note that this teleprompter takes in the following parameters: |
|
|
|
* prompt_model: The model used for prompt generation. When unspecified, defaults to the model set in settings (ie. dspy.settings.configure(lm=task_model)). |
|
* task_model: The model used for prompt generation. When unspecified, defaults to the model set in settings (ie. dspy.settings.configure(lm=task_model)). |
|
* metric: The task metric used for optimization. |
|
* n: The number of new prompts and sets of fewshot examples to generate and evaluate. Default=10. |
|
* init_temperature: The temperature used to generate new prompts. Higher roughly equals more creative. Default=1.0. |
|
* verbose: Tells the method whether or not to print intermediate steps. |
|
* track_stats: Tells the method whether or not to track statistics about the optimization process. |
|
If True, the method will track a dictionary with a key corresponding to the trial number, |
|
and a value containing a dict with the following keys: |
|
* program: the program being evaluated at a given trial |
|
* score: the last average evaluated score for the program |
|
* pruned: whether or not this program was pruned |
|
This information will be returned as attributes of the best program. |
|
""" |
|
class BasicGenerateInstruction(Signature): |
|
"""You are an instruction optimizer for large language models. I will give you a ``signature`` of fields (inputs and outputs) in English. Your task is to propose an instruction that will lead a good language model to perform the task well. Don't be afraid to be creative.""" |
|
|
|
basic_instruction = dspy.InputField(desc="The initial instructions before optimization") |
|
proposed_instruction = dspy.OutputField(desc="The improved instructions for the language model") |
|
proposed_prefix_for_output_field = dspy.OutputField(desc="The string at the end of the prompt, which will help the model start solving the task") |
|
|
|
class BasicGenerateInstructionWithDataObservations(Signature): |
|
"""You are an instruction optimizer for large language models. I will give you a ``signature`` of fields (inputs and outputs) in English. I will also give you some ``observations`` I have made about the dataset and task. Your task is to propose an instruction that will lead a good language model to perform the task well. Don't be afraid to be creative.""" |
|
|
|
basic_instruction = dspy.InputField(desc="The initial instructions before optimization") |
|
observations = dspy.InputField(desc="Observations about the dataset and task") |
|
proposed_instruction = dspy.OutputField(desc="The improved instructions for the language model") |
|
proposed_prefix_for_output_field = dspy.OutputField(desc="The string at the end of the prompt, which will help the model start solving the task") |
|
|
|
class BasicGenerateInstructionWithExamples(dspy.Signature): |
|
("""You are an instruction optimizer for large language models. I will give you a ``signature`` of fields (inputs and outputs) in English. Specifically, I will also provide you with the current ``basic instruction`` that is being used for this task. I will also provide you with some ``examples`` of the expected inputs and outputs. |
|
|
|
Your task is to propose an instruction that will lead a good language model to perform the task well. Don't be afraid to be creative.""") |
|
|
|
basic_instruction = dspy.InputField(desc="The initial instructions before optimization") |
|
|
|
examples = dspy.InputField(format=dsp.passages2text, desc="Example(s) of the task") |
|
proposed_instruction = dspy.OutputField(desc="The improved instructions for the language model") |
|
proposed_prefix_for_output_field = dspy.OutputField(desc="The string at the end of the prompt, which will help the model start solving the task") |
|
|
|
class BasicGenerateInstructionWithExamplesAndDataObservations(dspy.Signature): |
|
("""You are an instruction optimizer for large language models. I will give you a ``signature`` of fields (inputs and outputs) in English. Specifically, I will also provide you with the current ``basic instruction`` that is being used for this task. I will also provide you with some ``observations`` I have made about the dataset and task, along with some ``examples`` of the expected inputs and outputs. |
|
|
|
Your task is to propose a new improved instruction and prefix for the output field that will lead a good language model to perform the task well. Don't be afraid to be creative.""") |
|
basic_instruction = dspy.InputField(desc="The initial instructions before optimization") |
|
observations = dspy.InputField(desc="Observations about the dataset and task") |
|
examples = dspy.InputField(format=dsp.passages2text, desc="Example(s) of the task") |
|
proposed_instruction = dspy.OutputField(desc="The improved instructions for the language model") |
|
proposed_prefix_for_output_field = dspy.OutputField(desc="The string at the end of the prompt, which will help the model start solving the task") |
|
|
|
class ObservationSummarizer(dspy.Signature): |
|
("""Given a series of observations I have made about my dataset, please summarize them into a brief 2-3 sentence summary which highlights only the most important details.""") |
|
observations = dspy.InputField(desc="Observations I have made about my dataset") |
|
summary = dspy.OutputField(desc="Two to Three sentence summary of only the most significant highlights of my observations") |
|
|
|
class DatasetDescriptor(dspy.Signature): |
|
("""Given several examples from a dataset please write observations about trends that hold for most or all of the samples. """ |
|
"""Some areas you may consider in your observations: topics, content, syntax, conciceness, etc. """ |
|
"""It will be useful to make an educated guess as to the nature of the task this dataset will enable. Don't be afraid to be creative""") |
|
|
|
examples = dspy.InputField(desc="Sample data points from the dataset") |
|
observations = dspy.OutputField(desc="Somethings that holds true for most or all of the data you observed") |
|
|
|
class DatasetDescriptorWithPriorObservations(dspy.Signature): |
|
("""Given several examples from a dataset please write observations about trends that hold for most or all of the samples. """ |
|
"""I will also provide you with a few observations I have already made. Please add your own observations or if you feel the observations are comprehensive say 'COMPLETE' """ |
|
"""Some areas you may consider in your observations: topics, content, syntax, conciceness, etc. """ |
|
"""It will be useful to make an educated guess as to the nature of the task this dataset will enable. Don't be afraid to be creative""") |
|
|
|
examples = dspy.InputField(desc="Sample data points from the dataset") |
|
prior_observations = dspy.InputField(desc="Some prior observations I made about the data") |
|
observations = dspy.OutputField(desc="Somethings that holds true for most or all of the data you observed or COMPLETE if you have nothing to add") |
|
|
|
class BayesianSignatureOptimizer(Teleprompter): |
|
def __init__(self, prompt_model=None, task_model=None, teacher_settings={}, n=10, metric=None, init_temperature=1.0, verbose=False, track_stats=False, view_data_batch_size=10): |
|
self.n = n |
|
self.metric = metric |
|
self.init_temperature = init_temperature |
|
self.prompt_model = prompt_model if prompt_model is not None else dspy.settings.lm |
|
self.task_model = task_model if task_model is not None else dspy.settings.lm |
|
self.verbose = verbose |
|
self.track_stats = track_stats |
|
self.teacher_settings = teacher_settings |
|
self.view_data_batch_size = view_data_batch_size |
|
|
|
def _print_full_program(self, program): |
|
for i,predictor in enumerate(program.predictors()): |
|
if self.verbose: print(f"Predictor {i}") |
|
if (hasattr(predictor, 'extended_signature')): |
|
if self.verbose: print(f"i: {predictor.extended_signature.instructions}") |
|
if self.verbose: print(f"p: {predictor.extended_signature.fields[-1].name}") |
|
else: |
|
if self.verbose: print(f"i: {predictor.extended_signature1.instructions}") |
|
if self.verbose: print(f"p: {predictor.extended_signature1.fields[-1].name}") |
|
if self.verbose: print("\n") |
|
|
|
def _print_model_history(self, model, n=1): |
|
if self.verbose: print(f"Model ({model}) History:") |
|
model.inspect_history(n=n) |
|
|
|
def _observe_data(self, trainset): |
|
upper_lim = min(len(trainset), self.view_data_batch_size) |
|
observation = dspy.Predict(DatasetDescriptor, n=1, temperature=1.0)(examples=(trainset[0:upper_lim].__repr__())) |
|
observations = observation["observations"] |
|
|
|
skips = 0 |
|
for b in range(self.view_data_batch_size, len(trainset), self.view_data_batch_size): |
|
upper_lim = min(len(trainset), b+self.view_data_batch_size) |
|
output = dspy.Predict(DatasetDescriptorWithPriorObservations, n=1, temperature=1.0)(prior_observations=observations, examples=(trainset[b:upper_lim].__repr__())) |
|
if len(output["observations"]) >= 8 and output["observations"][:8].upper() == "COMPLETE": |
|
skips += 1 |
|
if skips >= 5: |
|
break |
|
continue |
|
observations += output["observations"] |
|
|
|
summary = dspy.Predict(ObservationSummarizer, n=1, temperature=1.0)(observations=observations) |
|
|
|
return summary.summary |
|
|
|
def _create_example_string(self, fields, example): |
|
|
|
|
|
output = [] |
|
for field in fields: |
|
name = field.name |
|
separator = field.separator |
|
input_variable = field.input_variable |
|
|
|
|
|
value = example.get(input_variable) |
|
|
|
|
|
field_str = f"{name}{separator}{value}" |
|
output.append(field_str) |
|
|
|
|
|
return '\n'.join(output) |
|
|
|
def _generate_first_N_candidates(self, module, N, view_data, view_examples, demo_candidates, devset): |
|
candidates = {} |
|
evaluated_candidates = defaultdict(dict) |
|
|
|
if view_data: |
|
|
|
self.observations = None |
|
with dspy.settings.context(lm=self.prompt_model): |
|
self.observations = self._observe_data(devset).replace("Observations:","").replace("Summary:","") |
|
|
|
if view_examples: |
|
example_sets = {} |
|
for predictor in module.predictors(): |
|
|
|
example_set = {} |
|
all_sets_of_examples = demo_candidates[id(predictor)] |
|
for example_set_i, set_of_examples in enumerate(all_sets_of_examples): |
|
if example_set_i != 0: |
|
for example in set_of_examples: |
|
if "augmented" in example.keys(): |
|
if example["augmented"]: |
|
if example_set_i not in example_set: |
|
example_set[example_set_i] = [] |
|
fields_to_use = predictor.signature.fields |
|
input_variable_names = [field.input_variable for field in fields_to_use] |
|
example_with_only_signature_fields = {key: value for key, value in example.items() if key in input_variable_names} |
|
example_string = self._create_example_string(fields_to_use, example_with_only_signature_fields) |
|
example_set[example_set_i].append(example_string) |
|
example_sets[id(predictor)] = example_set |
|
else: |
|
example_set[example_set_i] = [] |
|
example_sets[id(predictor)] = example_set |
|
|
|
|
|
for predictor in module.predictors(): |
|
basic_instruction = None |
|
basic_prefix = None |
|
if (hasattr(predictor, 'extended_signature')): |
|
basic_instruction = predictor.extended_signature.instructions |
|
basic_prefix = predictor.extended_signature.fields[-1].name |
|
else: |
|
basic_instruction = predictor.extended_signature1.instructions |
|
basic_prefix = predictor.extended_signature1.fields[-1].name |
|
with dspy.settings.context(lm=self.prompt_model): |
|
|
|
if view_data and view_examples: |
|
instruct = None |
|
for i in range(1,self.n): |
|
new_instruct = dspy.Predict(BasicGenerateInstructionWithExamplesAndDataObservations, n=1, temperature=self.init_temperature)(basic_instruction=basic_instruction, observations=self.observations, examples=example_sets[id(predictor)][i]) |
|
if not instruct: |
|
instruct = new_instruct |
|
else: |
|
instruct.completions.proposed_instruction.extend(new_instruct.completions.proposed_instruction) |
|
instruct.completions.proposed_prefix_for_output_field.extend(new_instruct.completions.proposed_prefix_for_output_field) |
|
|
|
elif view_data: |
|
instruct = dspy.Predict(BasicGenerateInstructionWithDataObservations, n=N-1, temperature=self.init_temperature)(basic_instruction=basic_instruction, observations=self.observations) |
|
|
|
elif view_examples: |
|
instruct = None |
|
for i in range(1,self.n): |
|
new_instruct = dspy.Predict(BasicGenerateInstructionWithExamples, n=1, temperature=self.init_temperature)(basic_instruction=basic_instruction, examples=example_sets[id(predictor)][i]) |
|
if not instruct: |
|
instruct = new_instruct |
|
else: |
|
instruct.completions.proposed_instruction.extend(new_instruct.completions.proposed_instruction) |
|
instruct.completions.proposed_prefix_for_output_field.extend(new_instruct.completions.proposed_prefix_for_output_field) |
|
|
|
else: |
|
instruct = dspy.Predict(BasicGenerateInstruction, n=N-1, temperature=self.init_temperature)(basic_instruction=basic_instruction) |
|
|
|
|
|
instruct.completions.proposed_instruction.insert(0, basic_instruction) |
|
instruct.completions.proposed_prefix_for_output_field.insert(0, basic_prefix) |
|
candidates[id(predictor)] = instruct.completions |
|
evaluated_candidates[id(predictor)] = {} |
|
|
|
if self.verbose: self._print_model_history(self.prompt_model) |
|
|
|
return candidates, evaluated_candidates |
|
|
|
def compile(self, student, *, devset, optuna_trials_num, max_bootstrapped_demos, max_labeled_demos, eval_kwargs, seed=42, view_data=True, view_examples=True): |
|
|
|
random.seed(seed) |
|
|
|
|
|
module = student.deepcopy() |
|
evaluate = Evaluate(devset=devset, metric=self.metric, **eval_kwargs) |
|
|
|
|
|
demo_candidates = {} |
|
for i in range(self.n): |
|
if i == 0: |
|
for module_p in module.predictors(): |
|
if id(module_p) not in demo_candidates.keys(): |
|
demo_candidates[id(module_p)] = [] |
|
demo_candidates[id(module_p)].append([]) |
|
else: |
|
if self.verbose: print(f"Creating basic bootstrap: {i}/{self.n-1}") |
|
|
|
|
|
rng = random.Random(i) |
|
shuffled_devset = devset[:] |
|
rng.shuffle(shuffled_devset) |
|
tp = BootstrapFewShot(metric = self.metric, max_bootstrapped_demos=max_bootstrapped_demos, max_labeled_demos=max_labeled_demos, teacher_settings=self.teacher_settings) |
|
candidate_program = tp.compile(student=module.deepcopy(), trainset=shuffled_devset) |
|
|
|
|
|
for module_p, candidate_p in zip(module.predictors(), candidate_program.predictors()): |
|
if id(module_p) not in demo_candidates.keys(): |
|
demo_candidates[id(module_p)] = [] |
|
demo_candidates[id(module_p)].append(candidate_p.demos) |
|
|
|
|
|
instruction_candidates, _ = self._generate_first_N_candidates(module, self.n, view_data, view_examples, demo_candidates, devset) |
|
|
|
|
|
best_score = float('-inf') |
|
best_program = None |
|
trial_num = 0 |
|
|
|
trial_logs = {} |
|
|
|
|
|
def create_objective(baseline_program, instruction_candidates, demo_candidates, evaluate, devset): |
|
def objective(trial): |
|
nonlocal best_program, best_score, trial_num, trial_logs |
|
candidate_program = baseline_program.deepcopy() |
|
|
|
|
|
if self.verbose: print(f"Starting trial num: {trial_num}") |
|
trial_logs[trial_num] = {} |
|
|
|
for p_old, p_new in zip(baseline_program.predictors(), candidate_program.predictors()): |
|
|
|
|
|
p_instruction_candidates = instruction_candidates[id(p_old)] |
|
p_demo_candidates = demo_candidates[id(p_old)] |
|
|
|
|
|
instruction_idx = trial.suggest_categorical(f"{id(p_old)}_predictor_instruction",range(len(p_instruction_candidates))) |
|
demos_idx = trial.suggest_categorical(f"{id(p_old)}_predictor_demos",range(len(p_demo_candidates))) |
|
trial_logs[trial_num][f"{id(p_old)}_predictor_instruction"] = instruction_idx |
|
trial_logs[trial_num][f"{id(p_old)}_predictor_demos"] = demos_idx |
|
|
|
|
|
selected_candidate = p_instruction_candidates[instruction_idx] |
|
selected_instruction = selected_candidate.proposed_instruction.strip('"').strip() |
|
selected_prefix = selected_candidate.proposed_prefix_for_output_field.strip('"').strip() |
|
|
|
|
|
p_new.extended_signature.instructions = selected_instruction |
|
p_new.extended_signature.fields[-1] = p_new.extended_signature.fields[-1]._replace(name=selected_prefix) |
|
|
|
|
|
selected_demos = p_demo_candidates[demos_idx] |
|
|
|
|
|
p_new.demos = selected_demos |
|
|
|
if self.verbose: print("Evaling the following program:") |
|
self._print_full_program(candidate_program) |
|
trial_logs[trial_num]["program"] = candidate_program |
|
|
|
|
|
total_score = 0 |
|
batch_size = 100 |
|
num_batches = math.ceil(len(devset) / batch_size) |
|
|
|
for i in range(num_batches): |
|
start_index = i * batch_size |
|
end_index = min((i + 1) * batch_size, len(devset)) |
|
split_dev = devset[start_index:end_index] |
|
split_score = evaluate(candidate_program, devset=split_dev, display_table=0) |
|
if self.verbose: print(f"{i}st split score: {split_score}") |
|
|
|
total_score += split_score * len(split_dev) |
|
curr_weighted_avg_score = total_score / min((i+1)*100,len(devset)) |
|
if self.verbose: print(f"curr average score: {curr_weighted_avg_score}") |
|
|
|
trial.report(curr_weighted_avg_score, i) |
|
|
|
|
|
if trial.should_prune(): |
|
if self.verbose: print(f"Optuna decided to prune!") |
|
trial_logs[trial_num]["score"] = curr_weighted_avg_score |
|
trial_logs[trial_num]["pruned"] = True |
|
trial_num += 1 |
|
raise optuna.TrialPruned() |
|
|
|
if self.verbose: print(f"Fully evaled score: {curr_weighted_avg_score}") |
|
self._print_model_history(self.task_model, n=1) |
|
score = curr_weighted_avg_score |
|
|
|
trial_logs[trial_num]["score"] = curr_weighted_avg_score |
|
trial_logs[trial_num]["pruned"] = False |
|
|
|
|
|
if score > best_score: |
|
best_score = score |
|
best_program = candidate_program.deepcopy() |
|
|
|
trial_num += 1 |
|
|
|
return score |
|
|
|
return objective |
|
|
|
|
|
objective_function = create_objective(module, instruction_candidates, demo_candidates, evaluate, devset) |
|
sampler = optuna.samplers.TPESampler(seed=seed) |
|
study = optuna.create_study(direction="maximize", sampler=sampler) |
|
score = study.optimize(objective_function, n_trials=optuna_trials_num) |
|
|
|
if best_program is not None and self.track_stats: |
|
best_program.trial_logs = trial_logs |
|
|
|
return best_program |
|
|