File size: 18,554 Bytes
ab13cee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
import os
import random
import uuid
from collections import Counter, defaultdict
from shutil import rmtree
from typing import Dict, List, Optional, Tuple

import pandas as pd
import pkg_resources
import yaml
from jinja2 import BaseLoader, Environment, meta


# Truncation of jinja template variables
# 1710 = 300 words x 4.7 avg characters per word + 300 spaces
TEXT_VAR_LENGTH = 2048

# Local path to the folder containing the templates
TEMPLATES_FOLDER_PATH = pkg_resources.resource_filename(__name__, "templates")

env = Environment(loader=BaseLoader)

# Allow the python function zip()
env.globals.update(zip=zip)

# These are users whose datasets should be included in the results returned by
# filter_english_datasets (regardless of their metadata)
INCLUDED_USERS = {"Zaid", "craffel"}


def highlight(input):
    return "<span style='color: #F08080'>" + input + "</span>"


def choice(choices):
    return random.choice(choices)


def most_frequent(items):
    """Returns the set of items which appear most frequently in the input"""
    if not items:
        return
    item_counts = Counter(items).most_common()
    max_freq = item_counts[0][1]
    most_frequent_items = [c[0] for c in item_counts if c[1] == max_freq]
    return most_frequent_items


env.filters["highlight"] = highlight
env.filters["choice"] = choice
env.filters["most_frequent"] = most_frequent


class Template(yaml.YAMLObject):
    """
    A prompt template.
    """

    yaml_tag = "!Template"

    def __init__(self, name, jinja, reference, metadata=None, answer_choices=None):
        """
        Creates a prompt template.

        A prompt template is expressed in Jinja. It is rendered using an example
        from the corresponding Hugging Face datasets library (a dictionary). The
        separator ||| should appear once to divide the template into prompt and
        output. Generally, the prompt should provide information on the desired
        behavior, e.g., text passage and instructions, and the output should be
        a desired response.

        :param name: unique name (per dataset) for template
        :param jinja: template expressed in Jinja
        :param reference: string describing author or paper reference for template
        :param metadata: a Metadata object with template annotations
        :param answer_choices: Jinja expression for answer choices. Should produce
                               a ||| delimited string of choices that enumerates
                               the possible completions for templates that should
                               be evaluated as ranked completions. If None, then
                               the template is open-ended. This list is accessible
                               from within Jinja as the variable `answer_choices`.
        """
        self.id = str(uuid.uuid4())
        self.name = name
        self.jinja = jinja
        self.reference = reference
        self.metadata = metadata if metadata is not None else Template.Metadata()
        self.answer_choices = answer_choices

    def get_id(self):
        """
        Returns the id of the template

        :return: unique id for template
        """
        return self.id

    def get_name(self):
        """
        Returns the name of the template

        :return: unique (per dataset) name for template
        """
        return self.name

    def get_reference(self):
        """
        Returns the bibliographic reference (or author) for the template

        :return: reference as a string
        """
        return self.reference

    def get_answer_choices_expr(self):
        """
        Returns a Jinja expression for computing the answer choices from an example.

        :return: String, or None if no answer choices
        """
        return self.answer_choices

    def get_answer_choices_list(self, example):
        """
        Returns a list of answer choices for a given example

        :return: list of strings, or None if get_answer_choices_expr is None
        """
        jinja = self.get_answer_choices_expr()
        if jinja is None:
            return None

        rtemplate = env.from_string(jinja)
        protected_example = self._escape_pipe(example)
        rendered_choices = rtemplate.render(**protected_example)
        return [self._unescape_pipe(answer_choice.strip()) for answer_choice in rendered_choices.split("|||")]

    def get_fixed_answer_choices_list(self):
        """
        Returns a list of answer choices that is static across examples, if possible

        :return: list of strings, or None if no static list exists
        """
        jinja = self.get_answer_choices_expr()
        if jinja is None:
            return None

        parse = env.parse(jinja)
        variables = meta.find_undeclared_variables(parse)
        if len(variables) == 0:
            rtemplate = env.from_string(jinja)
            rendered_choices = rtemplate.render()
            return [answer_choice.strip() for answer_choice in rendered_choices.split("|||")]
        else:
            return None

    def apply(self, example, truncate=True, highlight_variables=False):
        """
        Creates a prompt by applying this template to an example

        :param example: the dataset example to create a prompt for
        :param truncate: if True, example fields will be truncated to TEXT_VAR_LENGTH chars
        :param highlight_variables: highlight the added variables
        :return: tuple of 2 strings, for prompt and output
        """
        jinja = self.jinja

        # Truncates the prompt if needed
        if truncate:
            trunc_command = (
                f" | string | truncate({TEXT_VAR_LENGTH}) }}}}"  # Escaping curly braces requires doubling them
            )
            jinja = jinja.replace("}}", trunc_command)

        # Highlights text that was substituted for variables, if requested
        if highlight_variables:
            jinja = jinja.replace("}}", " | highlight }}")
        rtemplate = env.from_string(jinja)

        protected_example = self._escape_pipe(example)

        # Adds in answer_choices variable
        if "answer_choices" in protected_example:
            raise ValueError("Example contains the restricted key 'answer_choices'.")

        protected_example["answer_choices"] = self.get_answer_choices_list(example)

        # Renders the Jinja template
        rendered_example = rtemplate.render(**protected_example)

        # Splits on the separator, and then replaces back any occurrences of the
        # separator in the original example
        return [self._unescape_pipe(part).strip() for part in rendered_example.split("|||")]

    pipe_protector = "3ed2dface8203c4c9dfb1a5dc58e41e0"

    @classmethod
    def _escape_pipe(cls, example):
        # Replaces any occurrences of the "|||" separator in the example, which
        # which will be replaced back after splitting
        protected_example = {
            key: value.replace("|||", cls.pipe_protector) if isinstance(value, str) else value
            for key, value in example.items()
        }
        return protected_example

    @classmethod
    def _unescape_pipe(cls, string):
        # replaces back any occurrences of the separator in a string
        return string.replace(cls.pipe_protector, "|||")

    class Metadata(yaml.YAMLObject):
        """
        Metadata for a prompt template.
        """

        yaml_tag = "!TemplateMetadata"

        def __init__(
            self,
            original_task: Optional[bool] = None,
            choices_in_prompt: Optional[bool] = None,
            metrics: Optional[List[str]] = None,
        ):
            """
            Initializes template metadata.

            In the following, trivial choices are defined as Yes/No, True/False,
            etc. and nontrivial choices are other types of choices denoted in
            the answer_choices field.

            :param original_task: If True, this prompt asks a model to perform the original task designed for
                this dataset.
            :param choices_in_prompt: If True, the answer choices are included in the templates such that models
                see those choices in the input. Only applicable to classification tasks.
            :param metrics: List of strings denoting metrics to use for evaluation
            """
            self.original_task = original_task
            self.choices_in_prompt = choices_in_prompt
            self.metrics = metrics


class TemplateCollection:
    """
    This helper class wraps the DatasetTemplates class
    - Initialized the DatasetTemplates for all existing template folder
    - Give access to each DatasetTemplates
    - Provides aggregated counts over all DatasetTemplates
    """

    def __init__(self):

        # Dict of all the DatasetTemplates, key is the tuple (dataset_name, subset_name)
        self.datasets_templates: Dict[(str, Optional[str]), DatasetTemplates] = self._collect_datasets()

    @property
    def keys(self):
        return list(self.datasets_templates.keys())

    def __len__(self) -> int:
        return len(self.datasets_templates)

    def remove(self, dataset_name: str, subset_name: Optional[str] = None) -> None:
        del self.datasets_templates[dataset_name, subset_name]

    def _collect_datasets(self) -> Dict[Tuple[str, str], "DatasetTemplates"]:
        """
        Initialize a DatasetTemplates object for each templates.yaml detected in the templates folder

        Returns: a dict with key=(dataset_name, subset_name)
        """
        dataset_folders = os.listdir(TEMPLATES_FOLDER_PATH)
        dataset_folders = [folder for folder in dataset_folders if not folder.startswith(".")]

        output = {}  # format is {(dataset_name, subset_name): DatasetsTemplates}
        for dataset in dataset_folders:
            if dataset in INCLUDED_USERS:
                for filename in os.listdir(os.path.join(TEMPLATES_FOLDER_PATH, dataset)):
                    output = {**output, **self._collect_dataset(dataset + "/" + filename)}
            else:
                output = {**output, **self._collect_dataset(dataset)}
        return output

    def _collect_dataset(self, dataset):
        output = {}  # format is {(dataset_name, subset_name): DatasetsTemplates}
        for filename in os.listdir(os.path.join(TEMPLATES_FOLDER_PATH, dataset)):
            if filename.endswith(".yaml"):
                # If there is no sub-folder, there is no subset for this dataset
                output[(dataset, None)] = DatasetTemplates(dataset)
            else:
                # This is a subfolder, and its name corresponds to the subset name
                output[(dataset, filename)] = DatasetTemplates(dataset_name=dataset, subset_name=filename)
        return output

    def get_dataset(self, dataset_name: str, subset_name: Optional[str] = None) -> "DatasetTemplates":
        """
        Return the DatasetTemplates object corresponding to the dataset name

        :param dataset_name: name of the dataset to get
        :param subset_name: name of the subset
        """
        # if the dataset does not exist, we add it
        if dataset_name not in self.keys:
            self.datasets_templates[(dataset_name, subset_name)] = DatasetTemplates(dataset_name, subset_name)

        return self.datasets_templates[(dataset_name, subset_name)]

    def get_templates_count(self) -> Dict:
        """
        Return the overall number count over all datasets

        NB: we don't breakdown datasets into subsets for the count, i.e subsets count are included
        into the dataset count
        """

        count_dict = defaultdict(int)
        for k, v in self.datasets_templates.items():
            # Subsets count towards dataset count
            count_dict[k[0]] += len(v)
        # converting to regular dict
        return dict(count_dict)


class DatasetTemplates:
    """
    Class that wraps all templates for a specific dataset/subset and implements all the helper
    functions necessary to read/write to the yaml file
    """

    TEMPLATES_KEY = "templates"
    DATASET_KEY = "dataset"
    SUBSET_KEY = "subset"
    TEMPLATE_FILENAME = "templates.yaml"

    def __init__(self, dataset_name: str, subset_name: str = None):
        self.dataset_name: str = dataset_name
        self.subset_name: str = subset_name
        # dictionary is keyed by template name.
        self.templates: Dict = self.read_from_file()

        # Mapping from template name to template id
        self.name_to_id_mapping = {}
        self.sync_mapping()

    def sync_mapping(self) -> None:
        """
        Re-compute the name_to_id_mapping to ensure it is in sync with self.templates
        """
        self.name_to_id_mapping = {template.name: template.id for template in self.templates.values()}

    @property
    def all_template_names(self) -> List[str]:
        """
        Sorted list of all templates names for this dataset
        """
        return sorted([template.name for template in self.templates.values()])

    @property
    def folder_path(self) -> str:
        if self.subset_name:
            return os.path.join(TEMPLATES_FOLDER_PATH, self.dataset_name, self.subset_name)
        else:
            return os.path.join(TEMPLATES_FOLDER_PATH, self.dataset_name)

    @property
    def yaml_path(self) -> str:
        return os.path.join(self.folder_path, self.TEMPLATE_FILENAME)

    def format_for_dump(self) -> Dict:
        """
        Create a formatted dictionary for the class attributes
        """
        formatted_dict = {self.DATASET_KEY: self.dataset_name, self.TEMPLATES_KEY: self.templates}
        if self.subset_name:
            formatted_dict[self.SUBSET_KEY] = self.subset_name
        return formatted_dict

    def read_from_file(self) -> Dict:
        """
        Reads a file containing a prompt collection.
        """

        if not os.path.exists(self.yaml_path):
            return {}
        yaml_dict = yaml.load(open(self.yaml_path, "r"), Loader=yaml.FullLoader)
        return yaml_dict[self.TEMPLATES_KEY]

    def write_to_file(self) -> None:
        """
        Writes to a file with the current prompt collection.
        """
        # Sync the mapping
        self.sync_mapping()

        # We only create the folder if a template is written
        if not os.path.exists(self.folder_path):
            os.makedirs(self.folder_path)
        yaml.dump(self.format_for_dump(), open(self.yaml_path, "w"))

    def add_template(self, template: "Template") -> None:
        """
        Adds a new template for the dataset

        :param template: template
        """
        self.templates[template.get_id()] = template

        self.write_to_file()

    def remove_template(self, template_name: str) -> None:
        """
        Deletes a template

        :param template_name: name of template to remove
        """

        # Even if we have an ID, we want to check for duplicate names
        if template_name not in self.all_template_names:
            raise ValueError(f"No template with name {template_name} for dataset {self.dataset_name} exists.")

        del self.templates[self.name_to_id_mapping[template_name]]

        if len(self.templates) == 0:
            # There is no remaining template, we can remove the entire folder
            self.delete_folder()
        else:
            # We just update the file
            self.write_to_file()

    def update_template(
        self,
        current_template_name: str,
        new_template_name: str,
        jinja: str,
        reference: str,
        metadata: Template.Metadata,
        answer_choices: str,
    ) -> None:
        """
        Updates a pre-existing template and writes changes

        :param current_template_name: current name of the template stored in self.templates
        :param new_template_name: new name for the template
        :param jinja: new jinja entry
        :param reference: new reference entry
        :param metadata: a Metadata object with template annotations
        :param answer_choices: new answer_choices string
        """
        template_id = self.name_to_id_mapping[current_template_name]
        self.templates[template_id].name = new_template_name
        self.templates[template_id].jinja = jinja
        self.templates[template_id].reference = reference
        self.templates[template_id].metadata = metadata
        self.templates[template_id].answer_choices = answer_choices

        self.write_to_file()

    def delete_folder(self) -> None:
        """
        Delete the folder corresponding to self.folder_path
        """
        self.sync_mapping()

        rmtree(self.folder_path)

        # If it is a subset, we have to check whether to remove the dataset folder
        if self.subset_name:
            # have to check for other folders
            base_dataset_folder = os.path.join(TEMPLATES_FOLDER_PATH, self.dataset_name)
            if len(os.listdir(base_dataset_folder)) == 0:
                rmtree(base_dataset_folder)

    def __getitem__(self, template_key: str) -> "Template":
        return self.templates[self.name_to_id_mapping[template_key]]

    def __len__(self) -> int:
        return len(self.templates)


def get_templates_data_frame():
    """
    Gathers all template information into a Pandas DataFrame.

    :return: Pandas DataFrame
    """
    data = {
        "id": [],
        "dataset": [],
        "subset": [],
        "name": [],
        "reference": [],
        "original_task": [],
        "choices_in_prompt": [],
        "metrics": [],
        "answer_choices": [],
        "jinja": [],
    }

    template_collection = TemplateCollection()

    for key in template_collection.keys:
        templates = template_collection.get_dataset(key[0], key[1])
        for template_name in templates.all_template_names:
            template = templates[template_name]
            data["id"].append(template.get_id())
            data["dataset"].append(key[0])
            data["subset"].append(key[1])
            data["name"].append(template.get_name())
            data["reference"].append(template.get_reference())
            data["original_task"].append(template.metadata.original_task)
            data["choices_in_prompt"].append(template.metadata.choices_in_prompt)
            data["metrics"].append(template.metadata.metrics)
            data["answer_choices"].append(template.get_answer_choices_expr())
            data["jinja"].append(template.jinja)

    return pd.DataFrame(data)