File size: 5,632 Bytes
fa9baae
5d3c45e
fa9baae
7738cde
af0421d
993071c
5d3c45e
 
1f06868
5d3c45e
 
 
 
 
af0421d
48a26a2
 
1f06868
 
993071c
fa9baae
5d3c45e
8fd48be
fa9baae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
from typing import Dict, List, Any
import itertools
from nltk import sent_tokenize
import nltk
import torch

class PreTrainedPipeline():

    def __init__(self, path=""):
        # IMPLEMENT_THIS
        # Preload all the elements you are going to need at inference.
        # For instance your model, processors, tokenizer that might be needed.
        # This function is only called once, so do all the heavy processing I/O here"""
        nltk.download('punkt')
        self.model = AutoModelForSeq2SeqLM.from_pretrained(path)        
        self.tokenizer = AutoTokenizer.from_pretrained(path)
        
        self.model_type="t5"
        self.device = "cuda" if torch.cuda.is_available() else "cpu"


    def __call__(self, inputs: str):
        if len(inputs) == 0: return []
        inputs = " ".join(inputs.split())
        sents, answers = self._extract_answers(inputs)
        flat_answers = list(itertools.chain(*answers))
        
        if len(flat_answers) == 0:
          return []

        qg_examples = self._prepare_inputs_for_qg_from_answers_hl(sents, answers)
        
        qg_inputs = [example['source_text'] for example in qg_examples]
        questions = self._generate_questions(qg_inputs)
        output = [{'answer': example['answer'], 'question': que} for example, que in zip(qg_examples, questions)]
        output = self.clean_generated_QAs(output)
        return output  

    def _extract_answers(self, context):
        print("_extract_answers")
        sents, inputs = self._prepare_inputs_for_ans_extraction(context)
        inputs = self._tokenize(inputs, padding=True, truncation=True)

        outs = self.model.generate(
            input_ids=inputs['input_ids'].to(self.device), 
            attention_mask=inputs['attention_mask'].to(self.device), 
            max_length=32,
        )
        
        dec = [self.tokenizer.decode(ids, skip_special_tokens=False) for ids in outs]
        answers = [item.split('<sep>') for item in dec]
        answers = [i[:-1] for i in answers]
        
        return sents, answers

    
    def _prepare_inputs_for_ans_extraction(self, text):
        print("_prepare_inputs_for_ans_extraction")
        sents = sent_tokenize(text)

        inputs = []
        for i in range(len(sents)):
            source_text = "extract answers:"
            for j, sent in enumerate(sents):
                if i == j:
                    sent = "<hl> %s <hl>" % sent
                source_text = "%s %s" % (source_text, sent)
                source_text = source_text.strip()
            
            if self.model_type == "t5":
              source_text = source_text + " </s>"
            inputs.append(source_text)

        return sents, inputs      
          
    def _tokenize(self,
        inputs,
        padding=True,
        truncation=True,
        add_special_tokens=True,
        max_length=512
    ):
        inputs = self.tokenizer.batch_encode_plus(
            inputs, 
            max_length=max_length,
            add_special_tokens=add_special_tokens,
            truncation=truncation,
            padding="max_length" if padding else False,
            pad_to_max_length=padding,
            return_tensors="pt"
        )
        return inputs        

    def _generate_questions(self, inputs):
        print("_generate_questions")
        inputs = self._tokenize(inputs, padding=True, truncation=True)
        
        outs = self.model.generate(
            input_ids=inputs['input_ids'].to(self.device), 
            attention_mask=inputs['attention_mask'].to(self.device), 
            max_length=32,
            num_beams=4,
        )
        
        questions = [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in outs]
        return questions


    
    def _prepare_inputs_for_qg_from_answers_hl(self, sents, answers):
        print("_prepare_inputs_for_qg_from_answers_hl")
        inputs = []
        for i, answer in enumerate(answers):
            if len(answer) == 0: continue
            for answer_text in answer:
                sent = sents[i]
                sents_copy = sents[:]
                answer_text = self.remove_pad(answer_text)
                answer_text = answer_text.strip()
                print("Answer", answer)
                print("Answer text", answer_text)
                
                try:
                  ans_start_idx = sent.lower().index(answer_text.lower())
                except ValueError:
                  # Means the answer is not in the sentence so we skip this one
                  continue
                
                sent = f"{sent[:ans_start_idx]} <hl> {answer_text} <hl> {sent[ans_start_idx + len(answer_text): ]}"
                sents_copy[i] = sent
                
                source_text = " ".join(sents_copy)
                source_text = f"generate question: {source_text}" 
                if self.model_type == "t5":
                    source_text = source_text + " </s>"
                
                inputs.append({"answer": answer_text, "source_text": source_text})
        
        return inputs

    def clean_generated_QAs(self, generated_QAs):
      clean_QAs = []
      answers_used = set()
      # Only allow 1 question per answer, take the first case of it
      for qa in generated_QAs:
        if qa['answer'] in answers_used:
          break
        answers_used.add(qa['answer'])
        clean_QAs.append(qa)
      return clean_QAs

    def remove_pad(self, str):
      if "<pad>" in str:
        return str.replace("<pad>", "")
      return str