File size: 5,104 Bytes
1681237
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import abc
import logging
from typing import Dict

import torch
from datasets import Dataset
from pie_modules.document.processing import tokenize_document
from pie_modules.documents import TokenDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions
from pytorch_ie.annotations import Span
from pytorch_ie.documents import TextBasedDocument
from torch import FloatTensor, Tensor
from torch.utils.data import DataLoader
from transformers import AutoModel, AutoTokenizer

logger = logging.getLogger(__name__)


class EmbeddingModel(abc.ABC):
    def __call__(
        self, document: TextBasedDocument, span_layer_name: str
    ) -> Dict[Span, FloatTensor]:
        """Embed text annotations from a document.

        Args:
            document: The document to embed.
            span_layer_name: The name of the annotation layer in the document that contains the
                text span annotations to embed.

        Returns:
            A dictionary mapping text annotations to their embeddings.
        """
        pass


class HuggingfaceEmbeddingModel(EmbeddingModel):
    def __init__(
        self,
        model_name_or_path: str,
        revision: str = None,
        device: str = "cpu",
        max_length: int = 512,
        batch_size: int = 16,
    ):
        self.load(model_name_or_path, revision, device)
        self.max_length = max_length
        self.batch_size = batch_size

    def load(self, model_name_or_path: str, revision: str = None, device: str = "cpu") -> None:
        self._model = AutoModel.from_pretrained(model_name_or_path, revision=revision).to(device)
        self._tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, revision=revision)

    def __call__(
        self, document: TextBasedDocument, span_layer_name: str
    ) -> Dict[Span, FloatTensor]:
        # to not modify the original document
        document = document.copy()
        # tokenize_document does not yet consider predictions, so we need to add them manually
        document[span_layer_name].extend(document[span_layer_name].predictions.clear())
        added_annotations = []
        tokenizer_kwargs = {
            "max_length": self.max_length,
            "stride": self.max_length // 8,
            "truncation": True,
            "padding": True,
            "return_overflowing_tokens": True,
        }
        # tokenize once to get the tokenized documents with mapped annotations
        tokenized_documents = tokenize_document(
            document,
            tokenizer=self._tokenizer,
            result_document_type=TokenDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions,
            partition_layer="labeled_partitions",
            added_annotations=added_annotations,
            strict_span_conversion=False,
            **tokenizer_kwargs,
        )

        # just tokenize again to get tensors in the correct format for the model
        dataset = Dataset.from_dict({"text": [document.text]})

        def tokenize_function(examples):
            return self._tokenizer(examples["text"], **tokenizer_kwargs)

        # Tokenize the texts. Note that we remove the text column directly in the map call,
        # otherwise the map would fail because we produce we amy produce multipel new rows
        # (tokenization result) for each input row (text).
        tokenized_dataset = dataset.map(tokenize_function, batched=True, remove_columns=["text"])
        # remove the overflow_to_sample_mapping column
        tokenized_dataset = tokenized_dataset.remove_columns(["overflow_to_sample_mapping"])
        tokenized_dataset.set_format(type="torch")

        dataloader = DataLoader(tokenized_dataset, batch_size=self.batch_size)

        embeddings = {}
        example_idx = 0
        for batch in dataloader:
            batch_at_device = {
                k: v.to(self._model.device) if isinstance(v, Tensor) else v
                for k, v in batch.items()
            }
            with torch.no_grad():
                model_output = self._model(**batch_at_device)

            for last_hidden_state in model_output.last_hidden_state:
                text2tok_ann = added_annotations[example_idx][span_layer_name]
                tok2text_ann = {v: k for k, v in text2tok_ann.items()}
                for tok_ann in tokenized_documents[example_idx].labeled_spans:
                    # skip "empty" annotations
                    if tok_ann.start == tok_ann.end:
                        continue
                    # use the max pooling strategy to get a single embedding for the annotation text
                    embedding = (
                        last_hidden_state[tok_ann.start : tok_ann.end].max(dim=0)[0].detach().cpu()
                    )
                    text_ann = tok2text_ann[tok_ann]

                    if text_ann in embeddings:
                        logger.warning(
                            f"Overwriting embedding for annotation '{text_ann}' (do you use striding?)"
                        )
                    embeddings[text_ann] = embedding
                example_idx += 1

        return embeddings