File size: 6,259 Bytes
acf107e
 
 
 
 
 
 
 
 
 
 
e0a5b35
 
 
acf107e
 
e0a5b35
acf107e
e0a5b35
 
 
 
 
 
 
acf107e
 
e0a5b35
 
 
 
 
 
acf107e
 
e0a5b35
 
 
 
 
 
acf107e
 
e0a5b35
 
 
 
 
 
 
 
 
 
 
acf107e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66ca921
e0a5b35
acf107e
e0a5b35
 
acf107e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from typing import List, Tuple
import numpy as np
import pandas as pd
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, RobertaForSequenceClassification, RobertaTokenizer, ElectraModel, ElectraForCausalLM, GPT2Tokenizer, GPT2Model, GPT2LMHeadModel
import torch
import os
import json
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import gzip
from transformers import Text2TextGenerationPipeline
from transformers import Text2TextGenerationPipeline, AutoModelForSeq2SeqLM, AutoTokenizer
import torch
import numpy as np

class TestEnsembleQAPipeline(Text2TextGenerationPipeline):
    def __init__(self, model=None, tokenizer=None, framework="pt", **kwargs):
        super().__init__(model=model, tokenizer=tokenizer, framework=framework)
        self.quiz_bowl_model = QuizBowlModel()  # Initialize your QuizBowl model

    def preprocess(self, text, **kwargs):
        """Prepare the text inputs for processing."""
        prompt = "Please provide a concise answer to the following question:"
        input_text = f"{prompt} {text}"
        return self.tokenizer(input_text, return_tensors=self.framework, padding=True, truncation=True)

    def _forward(self, model_inputs, **generate_kwargs):
        """Forward pass to generate outputs from the model."""
        if self.framework == "pt":
            model_outputs = self.model.generate(**model_inputs, **generate_kwargs, return_dict_in_generate=True, output_scores=True)
        else:
            raise NotImplementedError("TensorFlow framework is not supported in this pipeline.")
        return model_outputs

    def postprocess(self, model_outputs):
        """Process model outputs to extract answers and confidence scores."""
        results = []
        for output in model_outputs.sequences:
            decoded_text = self.tokenizer.decode(output, skip_special_tokens=True)
            scores = self.calculate_confidence(model_outputs.scores)
            results.append({'guess': decoded_text, 'confidence': scores})
        return results

    def calculate_confidence(self, scores):
        """Calculate confidence from the model's score outputs."""
        if scores:
            log_probs = [torch.nn.functional.log_softmax(score, dim=-1) for score in scores]
            avg_scores = [log_probs[i][0, output[i + 1]].item() for i in range(len(output) - 1)]
            confidence_score = np.exp(np.mean(avg_scores))
        else:
            confidence_score = None
        return confidence_score


class QuizBowlModel:
    def __init__(self):
        self.load_models()

    def load_models(self):
        """Load all models"""
        # model_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'models', 't5-model-params')
        # self.load_seq2seq_model(model_dir)
        self.load_flan_models('google/flan-t5-large', 'google/flan-t5-small')

    def load_seq2seq_model(self, model_dir):
        """Load saved models"""
        self.test_tokenizer = AutoTokenizer.from_pretrained(model_dir)
        self.test_model = AutoModelForSeq2SeqLM.from_pretrained(model_dir)
        self.test_model.eval()

    def load_flan_models(self, large_model_id, small_model_id):
        """Load hugging face models."""        
        self.tokenizer_flan_t5 = AutoTokenizer.from_pretrained(large_model_id)
        self.model_flan_t5 = AutoModelForSeq2SeqLM.from_pretrained(large_model_id)
        self.tokenizer_t5 = AutoTokenizer.from_pretrained(small_model_id)
        self.model_t5 = AutoModelForSeq2SeqLM.from_pretrained(small_model_id)

    def guess_and_buzz(self, question_texts):
        """Generate answers from all models for given questions"""
        total_answers = self.generate_answers(question_texts)
        # Display the model's guesses before voting
        # print("Answers Before Voting Mechanism:")

        # for question, model_answers in zip(question_texts, total_answers):
        #     print(f"{question}\nModel Guesses: {model_answers}\n\n")
        return self.ensemble_tfidf_voting(total_answers)

    def generate_answers(self, question_texts):
        """Generate answers from each model."""
        # Tokenize and generate answers using each model
        return [(self.decode_answer(self.model_flan_t5, self.tokenizer_flan_t5, question),
                 self.decode_answer(self.model_t5, self.tokenizer_t5, question))
                for question in question_texts]

    def decode_answer(self, model, tokenizer, input_text):
        input_ids = tokenizer(input_text, return_tensors="pt", padding=True, truncation=True)
        with torch.no_grad():
            outputs = model.generate(**input_ids, max_new_tokens=5, output_scores=True, return_dict_in_generate=True)
        
        decoded_text = tokenizer.decode(outputs.sequences[0], skip_special_tokens=True)
        
        if outputs.scores:
            log_probs = [torch.nn.functional.log_softmax(score, dim=-1) for score in outputs.scores]
            scores = []
            for i in range(len(outputs.sequences[0]) - 1):
                selected_log_prob = log_probs[i][0, outputs.sequences[0][i + 1]].item()
                scores.append(selected_log_prob)
            confidence_score = np.exp(np.mean(scores))
        else:
            confidence_score = None

        return decoded_text, confidence_score

    def ensemble_tfidf_voting(self, all_answers):
        """Find answer with highest confidence"""
        for answers in all_answers:
            highest_confidence_answer = max(answers, key=lambda x: x[1])
            yield {'guess': highest_confidence_answer[0], 'confidence': highest_confidence_answer[1]}

        # for answers in all_answers:
        #     texts = [answer[0] for answer in answers]
            
        #     vectorizer = TfidfVectorizer()
        #     tfidf_matrix = vectorizer.fit_transform(texts)
        #     cosine_scores = cosine_similarity(tfidf_matrix)
        #     most_similar_index = np.argmax(np.mean(cosine_scores, axis=0))
        #     yield {'guess': answers[most_similar_index][0], 'confidence': answers[most_similar_index][1]}