import math import pandas as pd import numpy as np from itertools import product import shapely from bokeh.models import Span, Label, ColumnDataSource, Whisker from bokeh.plotting import figure, show from shapely.geometry import Polygon import matplotlib as mpl import matplotlib.pyplot as plt import seaborn task_patterns = { "CB": [0, 3], "RTE": [0, 3], "BoolQ": [0, 3, 5], "MNLI": [0, 3], "COPA": [0, 1], "WSC": [0, 1, 2], "WiC": [0, 1], "MultiRC": [0, 1, 2], } task_reps = {"CB": 4, "RTE": 4, "BoolQ": 4, "MNLI": 4, "COPA": 4, "WSC": 4, "WiC": 4, "MultiRC": 4} task_best_pattern = {"CB": 0, "RTE": 0, "BoolQ": 0, "MNLI": 0, "COPA": 1, "WSC": 0, "WiC": 0, "MultiRC": 1} task_metric_short = { "CB": "f1-macro", "RTE": "acc", "BoolQ": "acc", "MNLI": "acc", "COPA": "acc", "WSC": "acc", "WiC": "acc", "MultiRC": "f1", } task_metrics = { "CB": "F1-macro", "RTE": "accuracy", "BoolQ": "accuracy", "MNLI": "accuracy", "COPA": "accuracy", "WSC": "accuracy", "WiC": "accuracy", "MultiRC": "F1", } task_neutral = { "CB": True, "RTE": True, "BoolQ": True, "MNLI": True, "COPA": False, "WSC": False, "multirc": True, "WiC": True, "MultiRC": True, } neutral_tasks = [ "BoolQ", "CB", "MNLI", "MultiRC", "RTE", "WiC", ] tasks = sorted(task_patterns.keys()) pvp_colors = ["goldenrod", "blanchedalmond", "floralwhite"] ctl_colors = ["crimson", "salmon", "mistyrose"] clf_colors = ["indigo", "plum", "thistle"] def prompt_boolq(passage, question, pattern): if pattern == 0: return f"""{passage} Based on the previous passage, {question} [YES/NO]""" if pattern == 1: return f"""{passage} Question: {question} Answer: [YES/NO]""" if pattern == 2: return f"""Based on the following passage, {question} [YES/NO] {passage}""" def advantage_text(advantage): model_type = ( """Head""" if advantage < 0 else """Prompting""" ) return f"""{model_type} advantage: {abs(advantage):.2f} data points""" def average_advantage_text(advantage): model_type = ( """head""" if advantage < 0 else """prompting""" ) return f"""Average {model_type} advantage: {abs(advantage):.2f} data points""" def naming_convention(task, seed, pvp_index=None, neutral=False): method = f"PVP {pvp_index}" if pvp_index is not None else "CLF" model = "roberta" if neutral: verbalizer = "neutral" else: verbalizer = None return ( f"{method} {model}" + (f" {verbalizer} verbalizer" if verbalizer is not None else "") + f" seed {seed} - test-{task_metric_short[task]}-all-p" ) def get_data(task): url = f"https://raw.githubusercontent.com/TevenLeScao/pet/master/exported_results/{task.lower()}/wandb_export.csv" df = pd.read_csv(url) training_points = df["training_points"] head_performances = np.transpose(np.array([df[naming_convention(task, i)] for i in range(task_reps[task])])) pattern_performances = {} for pattern in task_patterns[task]: pattern_performances[pattern] = { "normal": np.transpose(np.array([df[naming_convention(task, i, pattern)] for i in range(task_reps[task])])) } if task_neutral[task]: pattern_performances[pattern]["neutral"] = np.transpose( np.array([df[naming_convention(task, i, pattern, True)] for i in range(task_reps[task])]) ) return training_points, head_performances, pattern_performances def reduct(performances, reduction="accmax", final_pattern=0, verbalizer="normal", exclude=None): # Combining the different runs for each experimental set-up reducted = None if isinstance(performances, dict): performances = performances[final_pattern][verbalizer] if exclude is not None: performances = np.delete(performances, exclude, axis=1) if reduction == "avg": # Average reducted = np.nanmean(performances, axis=1) if reduction == "std": # Standard deviation reducted = np.nanstd(performances, axis=1) if reduction == "max": # Maximum reducted = np.nanmax(performances, axis=1) if reduction == "accmax": # This makes the maximum curve monotonic max_performance = np.nanmax(performances, axis=1) reducted = np.maximum.accumulate(max_performance) assert reducted is not None, "unrecognized reduction method" return reducted def find_surrounding_points(perf, clf_results, pvp_results): for i, clf_result in enumerate(clf_results): if i - 1 > 0 and clf_result == clf_results[i - 1]: continue if clf_result > perf: if i == 0: raise ValueError(f"value {perf} too small") else: break for j, pvp_result in enumerate(pvp_results): if j - 1 > 0 and pvp_result == pvp_results[j - 1]: continue if pvp_result > perf: if j == 0: raise ValueError(f"value {perf} too small") else: break return i - 1, j - 1 def interpolate(perf, x1, x2, y1, y2): return x1 + (perf - y1) * (x2 - x1) / (y2 - y1) def interpolate_from_idx(perf, idx, results, training_points): return interpolate(perf, training_points[idx], training_points[idx + 1], results[idx], results[idx + 1]) def interpolate_from_perf(perf, overlapping_range, training_points, clf_results, pvp_results): if not overlapping_range[0] <= perf <= overlapping_range[1]: raise ValueError(f"perf {perf} not in acceptable bounds {overlapping_range}") clf_idx, pvp_idx = find_surrounding_points(perf, clf_results, pvp_results) return interpolate_from_idx(perf, clf_idx, clf_results, training_points), interpolate_from_idx( perf, pvp_idx, pvp_results, training_points ) def data_difference(perf, overlapping_range, training_points, clf_results, pvp_results): x1, x2 = interpolate_from_perf(perf, overlapping_range, training_points, clf_results, pvp_results) return x1 - x2 def calculate_overlap(clf_results, pvp_results, full_range=False): if full_range: return (min(min(clf_results), min(pvp_results)), max(max(clf_results), max(pvp_results))) else: return (max(min(clf_results), min(pvp_results)), min(max(clf_results), max(pvp_results))) def calculate_range(overlapping_range, number_of_points): integral_range = ( overlapping_range[0] + i / (number_of_points + 1) * (overlapping_range[1] - overlapping_range[0]) for i in range(1, number_of_points + 1) ) return integral_range def calculate_differences(integral_range, overlapping_range, training_points, clf_results, pvp_results): differences = [ data_difference(y, overlapping_range, training_points, clf_results, pvp_results) for y in integral_range ] return differences def calculate_offset(training_points, clf_results, pvp_results, number_of_points=1000): overlapping_range = calculate_overlap(clf_results, pvp_results) integral_range = calculate_range(overlapping_range, number_of_points) differences = calculate_differences(integral_range, overlapping_range, training_points, clf_results, pvp_results) offset = sum(differences) / number_of_points return offset def intersection_with_range(training_points, results, band): result_polygon = Polygon( [(training_points[i], results[i]) for i in range(len(training_points))] + [(training_points[-1], 0), (training_points[0], 0)] ) return result_polygon.intersection(band) def fill_polygon(fig, polygon, color, label=None, alpha=1.0): if polygon.is_empty or isinstance(polygon, shapely.geometry.LineString): return if isinstance(polygon, Polygon): xs, ys = polygon.exterior.xy fig.patch(xs, ys, color=color, alpha=alpha) else: for geom in polygon.geoms: if isinstance(geom, shapely.geometry.LineString): continue xs, ys = geom.exterior.xy fig.patch(xs, ys, color=color, alpha=alpha) label = None label_order = { "head run": 0, "head advantage": 1, "control run": 2, "optimization advantage": 3, "prompting run": 4, "semantics advantage": 5, "region of comparison": 6, } def metric_tap( event, overlapping_range, training_points, clf_results, pvp_results, advantage_box, advantage_plot ): _, metric_value = event.x, event.y try: advantage_value = data_difference(metric_value, overlapping_range, training_points, clf_results, pvp_results) advantage_box.text = advantage_text(advantage_value) if not isinstance(advantage_plot.renderers[-1], Span): metric_line = Span( location=metric_value, line_alpha=0.7, dimension="width", line_color=clf_colors[0] if advantage_value < 0 else pvp_colors[0], line_dash="dashed", line_width=1, ) advantage_plot.renderers.extend([metric_line]) else: advantage_plot.renderers[-1].location = metric_value advantage_plot.renderers[-1].line_color = clf_colors[0] if advantage_value < 0 else pvp_colors[0] # clicking outside the region except ValueError: pass def plot_polygons_bokeh(task, training_points, clf_results, pvp_results, clf_colors, pvp_colors, x_log_scale=False): overlapping_range = calculate_overlap(clf_results, pvp_results, False) full_range = calculate_overlap(clf_results, pvp_results, True) middle_y = (full_range[0] + full_range[1]) / 2 fig = figure(plot_height=400, plot_width=800, max_height=400, max_width=800, x_axis_type="log" if x_log_scale else "linear", title="Performance over training subset sizes of head and prompting methods") fig.circle(training_points, clf_results, color=clf_colors[0], legend="head run") fig.circle(training_points, pvp_results, color=pvp_colors[0], legend="prompting run") fig.line(training_points, clf_results, color=clf_colors[0], alpha=1) fig.line(training_points, pvp_results, color=pvp_colors[0], alpha=1) fig.xaxis.axis_label = "training subset size" fig.yaxis.axis_label = task_metrics[task] fig.patch( [training_points[0], training_points[0], training_points[-1], training_points[-1]], [overlapping_range[0], overlapping_range[1], overlapping_range[1], overlapping_range[0]], color="black", fill_alpha=0, line_width=0, legend="comparison region", hatch_alpha=0.14, hatch_scale=40, hatch_pattern="/", ) band = Polygon( [ (training_points[0], overlapping_range[0]), (training_points[0], overlapping_range[1]), (training_points[-1], overlapping_range[1]), (training_points[-1], overlapping_range[0]), ] ) full_band = Polygon( [ (training_points[0], full_range[0]), (training_points[0], full_range[1]), (training_points[-1], full_range[1]), (training_points[-1], full_range[0]), ] ) clf_polygon = intersection_with_range(training_points, clf_results, band) pvp_polygon = intersection_with_range(training_points, pvp_results, band) full_clf_polygon = intersection_with_range(training_points, clf_results, full_band) full_pvp_polygon = intersection_with_range(training_points, pvp_results, full_band) clf_inside_area = clf_polygon.difference(pvp_polygon) pvp_inside_area = pvp_polygon.difference(clf_polygon) clf_outside_area = (full_clf_polygon.difference(full_pvp_polygon)).difference(clf_inside_area) pvp_outside_area = (full_pvp_polygon.difference(full_clf_polygon)).difference(pvp_inside_area) fill_polygon(fig, clf_outside_area, clf_colors[1], alpha=0.13) fill_polygon(fig, pvp_outside_area, pvp_colors[1], alpha=0.18) fill_polygon( fig, clf_inside_area, clf_colors[1], alpha=0.4, label="head advantage" if task == "WiC" else None ) fill_polygon(fig, pvp_inside_area, pvp_colors[1], alpha=0.4, label="prompting advantage") fig.line([training_points[0], training_points[-1]], [overlapping_range[0], overlapping_range[0]], color="dimgrey") fig.line([training_points[0], training_points[-1]], [overlapping_range[1], overlapping_range[1]], color="dimgrey") vline = Span( location=training_points[-1], dimension="height", line_color="black", line_width=2.5, line_dash="dashed" ) end_label = Label( x=training_points[-1], y=middle_y, text="End of dataset", angle=90, angle_units="deg", text_align="center" ) fig.renderers.extend([vline, end_label]) fig.legend.location = "bottom_right" return fig def plot_three_polygons_bokeh( task, training_points, clf_results, pvp_results, ctl_results, clf_colors, pvp_colors, ctl_colors, x_log_scale=False ): overlapping_range = calculate_overlap(clf_results, pvp_results, False) full_range = calculate_overlap(clf_results, pvp_results, True) middle_y = (full_range[0] + full_range[1]) / 2 fig = figure(plot_height=400, plot_width=800, max_height=400, max_width=800, x_axis_type="log" if x_log_scale else "linear", title="Performance over training subset sizes of head, prompting and prompting with a null verbalizer") fig.xaxis.axis_label = "training subset size" fig.yaxis.axis_label = task_metrics[task] fig.circle(training_points, clf_results, color=clf_colors[0], legend="head run") fig.circle(training_points, pvp_results, color=pvp_colors[0], legend="prompting run") fig.circle(training_points, ctl_results, color=ctl_colors[0], legend="null verbalizer run") fig.line(training_points, clf_results, color=clf_colors[0], alpha=1) fig.line(training_points, pvp_results, color=pvp_colors[0], alpha=1) fig.line(training_points, ctl_results, color=ctl_colors[0], alpha=1) fig.patch( [training_points[0], training_points[0], training_points[-1], training_points[-1]], [overlapping_range[0], overlapping_range[1], overlapping_range[1], overlapping_range[0]], color="black", fill_alpha=0, line_width=0, legend="comparison region", hatch_alpha=0.14, hatch_scale=40, hatch_pattern="/", ) band = Polygon( [ (training_points[0], overlapping_range[0]), (training_points[0], overlapping_range[1]), (training_points[-1], overlapping_range[1]), (training_points[-1], overlapping_range[0]), ] ) full_band = Polygon( [ (training_points[0], full_range[0]), (training_points[0], full_range[1]), (training_points[-1], full_range[1]), (training_points[-1], full_range[0]), ] ) clf_polygon = intersection_with_range(training_points, clf_results, band) pvp_polygon = intersection_with_range(training_points, pvp_results, band) ctl_polygon = intersection_with_range(training_points, ctl_results, band) full_clf_polygon = intersection_with_range(training_points, clf_results, full_band) full_pvp_polygon = intersection_with_range(training_points, pvp_results, full_band) full_ctl_polygon = intersection_with_range(training_points, ctl_results, full_band) clf_inside_area = clf_polygon.difference(ctl_polygon) pvp_inside_area = pvp_polygon.difference(clf_polygon).difference(ctl_polygon) ctl_inside_area = ctl_polygon.difference(clf_polygon) clf_outside_area = (full_clf_polygon.difference(full_ctl_polygon)).difference(clf_inside_area) pvp_outside_area = (full_pvp_polygon.difference(full_clf_polygon).difference(ctl_polygon)).difference( pvp_inside_area ) ctl_outside_area = (full_ctl_polygon.difference(full_clf_polygon)).difference(pvp_inside_area) fill_polygon( fig, clf_inside_area, clf_colors[1], alpha=0.4, label="head advantage" if task == "WiC" else None ) fill_polygon(fig, pvp_inside_area, pvp_colors[1], alpha=0.4, label="prompting advantage") fill_polygon(fig, ctl_inside_area, ctl_colors[1], alpha=0.4, label="null verbalizer advantage") fill_polygon(fig, clf_outside_area, clf_colors[1], alpha=0.13) fill_polygon(fig, pvp_outside_area, pvp_colors[1], alpha=0.18) fill_polygon(fig, ctl_outside_area, ctl_colors[1], alpha=0.13) fig.line([training_points[0], training_points[-1]], [overlapping_range[0], overlapping_range[0]], color="dimgrey") fig.line([training_points[0], training_points[-1]], [overlapping_range[1], overlapping_range[1]], color="dimgrey") vline = Span( location=training_points[-1], dimension="height", line_color="black", line_width=2.5, line_dash="dashed" ) end_label = Label( x=training_points[-1], y=middle_y, text="End of dataset", angle=90, angle_units="deg", text_align="center" ) fig.renderers.extend([vline, end_label]) fig.legend.location = "bottom_right" return fig def pattern_graph(task): fig = figure(plot_height=400, plot_width=800, max_height=400, max_width=800, x_axis_type="log", title="Performance over training subset sizes of different prompt patterns") fig.xaxis.axis_label = "training subset size" fig.yaxis.axis_label = task_metrics[task] url = f"https://raw.githubusercontent.com/TevenLeScao/pet/master/exported_results/{task.lower()}/wandb_export.csv" df = pd.read_csv(url) expanded_training_points = np.array(list(df["training_points"]) * task_reps[task] * len(task_patterns[task])) data = np.array(df[[naming_convention(task, seed, pattern) for pattern in task_patterns[task] for seed in range(task_reps[task])]]) data = data.reshape(-1, task_reps[task]) col_med = np.nanmean(data, axis=1) # Find indices that you need to replace inds = np.where(np.isnan(data)) # Place column means in the indices. Align the arrays using take data[inds] = np.take(col_med, inds[0]) data = data.reshape(len(df["training_points"]), -1) data = data.transpose().reshape(-1) data = data + np.random.normal(0, 0.01, len(data)) pattern = np.array([i // (len(data) // len(task_patterns[task])) for i in range(len(data))]) seed = np.array([0, 1, 2, 3] * (len(data) // task_reps[task])) long_df = pd.DataFrame(np.stack((expanded_training_points, pattern, seed, data), axis=1), columns=["training_points", "pattern", "seed", task_metrics[task]]) long_df['pattern'] = long_df['pattern'].astype(int).astype(str) gby_pattern = long_df.groupby('pattern') pattern_colors = ["royalblue", "darkturquoise", "darkviolet"] for i, (pattern, pattern_df) in enumerate(gby_pattern): gby_training_points = pattern_df.groupby('training_points') x = [training_point for training_point, training_point_df in gby_training_points] y_max = list([np.max(training_point_df[task_metrics[task]]) for training_point, training_point_df in gby_training_points]) y_min = list([np.min(training_point_df[task_metrics[task]]) for training_point, training_point_df in gby_training_points]) y = list([np.median(training_point_df[task_metrics[task]]) for training_point, training_point_df in gby_training_points]) fig.circle(x, y, color=pattern_colors[i], alpha=1, legend=f"Pattern {i}") fig.line(x, y, color=pattern_colors[i], alpha=1) fig.varea(x=x, y1=y_max, y2=y_min, color=pattern_colors[i], alpha=0.11) # source = ColumnDataSource(data=dict(base=x, lower=y_min, upper=y_max)) # w = Whisker(source=source, base="base", upper="upper", lower="lower", line_color=pattern_colors[i], line_alpha=0.3) # w.upper_head.line_color = pattern_colors[i] # w.lower_head.line_color = pattern_colors[i] # fig.add_layout(w) return fig def cubic_easing(t): if t < 0.5: return 4 * t * t * t p = 2 * t - 2 return 0.5 * p * p * p + 1 def circ_easing(t): if t < 0.5: return 0.5 * (1 - math.sqrt(1 - 4 * (t * t))) return 0.5 * (math.sqrt(-((2 * t) - 3) * ((2 * t) - 1)) + 1)