|
from typing import Dict, List, Any |
|
from scipy.special import softmax |
|
import numpy as np |
|
import weakref |
|
import re |
|
import nltk |
|
from nltk.corpus import stopwords |
|
nltk.download('stopwords') |
|
|
|
from utils import clean_str, clean_str_nopunct |
|
import torch |
|
from utils import MultiHeadModel, BertInputBuilder, get_num_words, MATH_PREFIXES, MATH_WORDS |
|
|
|
import transformers |
|
from transformers import BertTokenizer, BertForSequenceClassification |
|
from transformers.utils import logging |
|
|
|
transformers.logging.set_verbosity_debug() |
|
|
|
UPTAKE_MODEL = 'ddemszky/uptake-model' |
|
REASONING_MODEL = 'ddemszky/student-reasoning' |
|
QUESTION_MODEL = 'ddemszky/question-detection' |
|
FOCUSING_QUESTION_MODEL = 'ddemszky/focusing-questions' |
|
|
|
|
|
class Utterance: |
|
def __init__(self, speaker, text, uid=None, |
|
transcript=None, starttime=None, endtime=None, **kwargs): |
|
self.speaker = speaker |
|
self.text = text |
|
self.uid = uid |
|
self.starttime = starttime |
|
self.endtime = endtime |
|
self.transcript = weakref.ref(transcript) if transcript else None |
|
self.props = kwargs |
|
self.role = None |
|
self.word_count = self.get_num_words() |
|
self.timestamp = [starttime, endtime] |
|
if starttime is not None and endtime is not None: |
|
self.unit_measure = endtime - starttime |
|
else: |
|
self.unit_measure = None |
|
self.aggregate_unit_measure = endtime |
|
self.num_math_terms = None |
|
self.math_terms = None |
|
|
|
|
|
self.uptake = None |
|
self.reasoning = None |
|
self.question = None |
|
self.focusing_question = None |
|
|
|
def get_clean_text(self, remove_punct=False): |
|
if remove_punct: |
|
return clean_str_nopunct(self.text) |
|
return clean_str(self.text) |
|
|
|
def get_num_words(self): |
|
return get_num_words(self.text) |
|
|
|
def to_dict(self): |
|
return { |
|
'speaker': self.speaker, |
|
'text': self.text, |
|
'uid': self.uid, |
|
'starttime': self.starttime, |
|
'endtime': self.endtime, |
|
'uptake': self.uptake, |
|
'reasoning': self.reasoning, |
|
'question': self.question, |
|
'focusingQuestion': self.focusing_question, |
|
'numMathTerms': self.num_math_terms, |
|
'mathTerms': self.math_terms, |
|
**self.props |
|
} |
|
|
|
def to_talk_timeline_dict(self): |
|
return{ |
|
'speaker': self.speaker, |
|
'text': self.text, |
|
'uid': self.uid, |
|
'role': self.role, |
|
'timestamp': self.timestamp, |
|
'moments': {'reasoning': True if self.reasoning else False, 'questioning': True if self.question else False, 'uptake': True if self.uptake else False, 'focusingQuestion': True if self.focusing_question else False}, |
|
'unitMeasure': self.unit_measure, |
|
'aggregateUnitMeasure': self.aggregate_unit_measure, |
|
'wordCount': self.word_count, |
|
'numMathTerms': self.num_math_terms, |
|
'mathTerms': self.math_terms, |
|
} |
|
|
|
def __repr__(self): |
|
return f"Utterance(speaker='{self.speaker}'," \ |
|
f"text='{self.text}', uid={self.uid}," \ |
|
f"starttime={self.starttime}, endtime={self.endtime}, props={self.props})" |
|
|
|
|
|
class Transcript: |
|
def __init__(self, **kwargs): |
|
self.utterances = [] |
|
self.params = kwargs |
|
|
|
def add_utterance(self, utterance): |
|
utterance.transcript = weakref.ref(self) |
|
self.utterances.append(utterance) |
|
|
|
def get_idx(self, idx): |
|
if idx >= len(self.utterances): |
|
return None |
|
return self.utterances[idx] |
|
|
|
def get_uid(self, uid): |
|
for utt in self.utterances: |
|
if utt.uid == uid: |
|
return utt |
|
return None |
|
|
|
def length(self): |
|
return len(self.utterances) |
|
|
|
def update_utterance_roles(self, uptake_speaker): |
|
for utt in self.utterances: |
|
if (utt.speaker == uptake_speaker): |
|
utt.role = 'teacher' |
|
else: |
|
utt.role = 'student' |
|
|
|
def get_talk_distribution_and_length(self, uptake_speaker): |
|
if ((uptake_speaker is None)): |
|
return None |
|
teacher_words = 0 |
|
teacher_utt_count = 0 |
|
student_words = 0 |
|
student_utt_count = 0 |
|
for utt in self.utterances: |
|
if (utt.speaker == uptake_speaker): |
|
utt.role = 'teacher' |
|
teacher_words += utt.get_num_words() |
|
teacher_utt_count += 1 |
|
else: |
|
utt.role = 'student' |
|
student_words += utt.get_num_words() |
|
student_utt_count += 1 |
|
if teacher_words + student_words > 0: |
|
teacher_percentage = round( |
|
(teacher_words / (teacher_words + student_words)) * 100) |
|
student_percentage = 100 - teacher_percentage |
|
else: |
|
teacher_percentage = student_percentage = 0 |
|
avg_teacher_length = teacher_words / teacher_utt_count if teacher_utt_count > 0 else 0 |
|
avg_student_length = student_words / student_utt_count if student_utt_count > 0 else 0 |
|
return {'teacher': teacher_percentage, 'student': student_percentage}, {'teacher': avg_teacher_length, 'student': avg_student_length} |
|
|
|
def get_word_clouds(self): |
|
teacher_dict = {} |
|
student_dict = {} |
|
uptake_teacher_dict = {} |
|
stop_words = stopwords.words('english') |
|
for utt in self.utterances: |
|
words = (utt.get_clean_text(remove_punct=True)).split(' ') |
|
for word in words: |
|
if word in stop_words or word in ['inaudible', 'crosstalk']: continue |
|
|
|
if utt.role == 'teacher': |
|
if utt.uptake == 1: |
|
if word not in uptake_teacher_dict: |
|
uptake_teacher_dict[word] = 0 |
|
uptake_teacher_dict[word] += 1 |
|
|
|
if any(math_word in word for math_word in utt.math_terms): continue |
|
if utt.role == 'teacher': |
|
if word not in teacher_dict: |
|
teacher_dict[word] = 0 |
|
teacher_dict[word] += 1 |
|
|
|
else: |
|
if word not in student_dict: |
|
student_dict[word] = 0 |
|
student_dict[word] += 1 |
|
dict_list = [] |
|
uptake_dict_list = [] |
|
teacher_dict_list = [] |
|
student_dict_list = [] |
|
for word in uptake_teacher_dict.keys(): |
|
uptake_dict_list.append({'text': word, 'value': uptake_teacher_dict[word], 'category': 'teacher'}) |
|
for word in teacher_dict.keys(): |
|
teacher_dict_list.append( |
|
{'text': word, 'value': teacher_dict[word], 'category': 'general'}) |
|
dict_list.append({'text': word, 'value': teacher_dict[word], 'category': 'general'}) |
|
for word in student_dict.keys(): |
|
student_dict_list.append( |
|
{'text': word, 'value': student_dict[word], 'category': 'general'}) |
|
dict_list.append({'text': word, 'value': student_dict[word], 'category': 'general'}) |
|
sorted_dict_list = sorted(dict_list, key=lambda x: x['value'], reverse=True) |
|
sorted_uptake_dict_list = sorted(uptake_dict_list, key=lambda x: x['value'], reverse=True) |
|
sorted_teacher_dict_list = sorted(teacher_dict_list, key=lambda x: x['value'], reverse=True) |
|
sorted_student_dict_list = sorted(student_dict_list, key=lambda x: x['value'], reverse=True) |
|
return sorted_dict_list[:50], sorted_uptake_dict_list[:50], sorted_teacher_dict_list[:50], sorted_student_dict_list[:50] |
|
|
|
def get_talk_timeline(self): |
|
return [utterance.to_talk_timeline_dict() for utterance in self.utterances] |
|
|
|
def calculate_aggregate_word_count(self): |
|
unit_measures = [utt.unit_measure for utt in self.utterances] |
|
if None in unit_measures: |
|
aggregate_word_count = 0 |
|
for utt in self.utterances: |
|
aggregate_word_count += utt.get_num_words() |
|
utt.unit_measure = utt.get_num_words() |
|
utt.aggregate_unit_measure = aggregate_word_count |
|
|
|
|
|
def to_dict(self): |
|
return { |
|
'utterances': [utterance.to_dict() for utterance in self.utterances], |
|
**self.params |
|
} |
|
|
|
def __repr__(self): |
|
return f"Transcript(utterances={self.utterances}, custom_params={self.params})" |
|
|
|
|
|
class QuestionModel: |
|
def __init__(self, device, tokenizer, input_builder, max_length=300, path=QUESTION_MODEL): |
|
print("Loading models...") |
|
self.device = device |
|
self.tokenizer = tokenizer |
|
self.input_builder = input_builder |
|
self.max_length = max_length |
|
self.model = MultiHeadModel.from_pretrained( |
|
path, head2size={"is_question": 2}) |
|
self.model.to(self.device) |
|
|
|
def run_inference(self, transcript): |
|
self.model.eval() |
|
with torch.no_grad(): |
|
for i, utt in enumerate(transcript.utterances): |
|
if "?" in utt.text: |
|
utt.question = 1 |
|
else: |
|
text = utt.get_clean_text(remove_punct=True) |
|
instance = self.input_builder.build_inputs([], text, |
|
max_length=self.max_length, |
|
input_str=True) |
|
output = self.get_prediction(instance) |
|
|
|
utt.question = np.argmax( |
|
output["is_question_logits"][0].tolist()) |
|
|
|
def get_prediction(self, instance): |
|
instance["attention_mask"] = [[1] * len(instance["input_ids"])] |
|
for key in ["input_ids", "token_type_ids", "attention_mask"]: |
|
instance[key] = torch.tensor( |
|
instance[key]).unsqueeze(0) |
|
instance[key].to(self.device) |
|
|
|
output = self.model(input_ids=instance["input_ids"], |
|
attention_mask=instance["attention_mask"], |
|
token_type_ids=instance["token_type_ids"], |
|
return_pooler_output=False) |
|
return output |
|
|
|
|
|
class ReasoningModel: |
|
def __init__(self, device, tokenizer, input_builder, max_length=128, path=REASONING_MODEL): |
|
print("Loading models...") |
|
self.device = device |
|
self.tokenizer = tokenizer |
|
self.input_builder = input_builder |
|
self.max_length = max_length |
|
self.model = BertForSequenceClassification.from_pretrained(path) |
|
self.model.to(self.device) |
|
|
|
def run_inference(self, transcript, min_num_words=8, uptake_speaker=None): |
|
self.model.eval() |
|
with torch.no_grad(): |
|
for i, utt in enumerate(transcript.utterances): |
|
if utt.get_num_words() >= min_num_words and utt.speaker != uptake_speaker: |
|
instance = self.input_builder.build_inputs([], utt.text, |
|
max_length=self.max_length, |
|
input_str=True) |
|
output = self.get_prediction(instance) |
|
utt.reasoning = np.argmax(output["logits"][0].tolist()) |
|
|
|
def get_prediction(self, instance): |
|
instance["attention_mask"] = [[1] * len(instance["input_ids"])] |
|
for key in ["input_ids", "token_type_ids", "attention_mask"]: |
|
instance[key] = torch.tensor( |
|
instance[key]).unsqueeze(0) |
|
instance[key].to(self.device) |
|
|
|
output = self.model(input_ids=instance["input_ids"], |
|
attention_mask=instance["attention_mask"], |
|
token_type_ids=instance["token_type_ids"]) |
|
return output |
|
|
|
|
|
class UptakeModel: |
|
def __init__(self, device, tokenizer, input_builder, max_length=120, path=UPTAKE_MODEL): |
|
print("Loading models...") |
|
self.device = device |
|
self.tokenizer = tokenizer |
|
self.input_builder = input_builder |
|
self.max_length = max_length |
|
self.model = MultiHeadModel.from_pretrained(path, head2size={"nsp": 2}) |
|
self.model.to(self.device) |
|
|
|
def run_inference(self, transcript, min_prev_words, uptake_speaker=None): |
|
self.model.eval() |
|
prev_num_words = 0 |
|
prev_utt = None |
|
with torch.no_grad(): |
|
for i, utt in enumerate(transcript.utterances): |
|
if ((uptake_speaker is None) or (utt.speaker == uptake_speaker)) and (prev_num_words >= min_prev_words): |
|
textA = prev_utt.get_clean_text(remove_punct=False) |
|
textB = utt.get_clean_text(remove_punct=False) |
|
instance = self.input_builder.build_inputs([textA], textB, |
|
max_length=self.max_length, |
|
input_str=True) |
|
output = self.get_prediction(instance) |
|
|
|
utt.uptake = int( |
|
softmax(output["nsp_logits"][0].tolist())[1] > .8) |
|
prev_num_words = utt.get_num_words() |
|
prev_utt = utt |
|
|
|
def get_prediction(self, instance): |
|
instance["attention_mask"] = [[1] * len(instance["input_ids"])] |
|
for key in ["input_ids", "token_type_ids", "attention_mask"]: |
|
instance[key] = torch.tensor( |
|
instance[key]).unsqueeze(0) |
|
instance[key].to(self.device) |
|
|
|
output = self.model(input_ids=instance["input_ids"], |
|
attention_mask=instance["attention_mask"], |
|
token_type_ids=instance["token_type_ids"], |
|
return_pooler_output=False) |
|
return output |
|
|
|
class FocusingQuestionModel: |
|
def __init__(self, device, tokenizer, input_builder, max_length=128, path=FOCUSING_QUESTION_MODEL): |
|
print("Loading models...") |
|
self.device = device |
|
self.tokenizer = tokenizer |
|
self.input_builder = input_builder |
|
self.model = BertForSequenceClassification.from_pretrained(path) |
|
self.model.to(self.device) |
|
self.max_length = max_length |
|
|
|
def run_inference(self, transcript, min_focusing_words=0, uptake_speaker=None): |
|
self.model.eval() |
|
with torch.no_grad(): |
|
for i, utt in enumerate(transcript.utterances): |
|
if utt.speaker != uptake_speaker or uptake_speaker is None: |
|
utt.focusing_question = None |
|
continue |
|
if utt.get_num_words() < min_focusing_words: |
|
utt.focusing_question = None |
|
continue |
|
instance = self.input_builder.build_inputs([], utt.text, max_length=self.max_length, input_str=True) |
|
output = self.get_prediction(instance) |
|
utt.focusing_question = np.argmax(output["logits"][0].tolist()) |
|
|
|
def get_prediction(self, instance): |
|
instance["attention_mask"] = [[1] * len(instance["input_ids"])] |
|
for key in ["input_ids", "token_type_ids", "attention_mask"]: |
|
instance[key] = torch.tensor( |
|
instance[key]).unsqueeze(0) |
|
instance[key].to(self.device) |
|
|
|
output = self.model(input_ids=instance["input_ids"], |
|
attention_mask=instance["attention_mask"], |
|
token_type_ids=instance["token_type_ids"]) |
|
return output |
|
|
|
def load_math_terms(): |
|
math_terms = [] |
|
math_terms_dict = {} |
|
for term in MATH_WORDS: |
|
if term in MATH_PREFIXES: |
|
math_terms_dict[f"(^|[^a-zA-Z]){term}(s|es)?([^a-zA-Z]|$)"] = term |
|
math_terms.append(f"(^|[^a-zA-Z]){term}(s|es)?([^a-zA-Z]|$)") |
|
else: |
|
math_terms.append(term) |
|
math_terms_dict[term] = term |
|
return math_terms, math_terms_dict |
|
|
|
def run_math_density(transcript): |
|
math_terms, math_terms_dict = load_math_terms() |
|
sorted_terms = sorted(math_terms, key=len, reverse=True) |
|
teacher_math_word_cloud = {} |
|
student_math_word_cloud = {} |
|
for i, utt in enumerate(transcript.utterances): |
|
text = utt.get_clean_text(remove_punct=True) |
|
num_matches = 0 |
|
matched_positions = set() |
|
match_list = [] |
|
for term in sorted_terms: |
|
matches = list(re.finditer(term, text, re.IGNORECASE)) |
|
|
|
matches = [match for match in matches if not any(match.start() in range(existing[0], existing[1]) for existing in matched_positions)] |
|
|
|
if len(matches) > 0: |
|
if utt.role == "teacher": |
|
if math_terms_dict[term] not in teacher_math_word_cloud: |
|
teacher_math_word_cloud[math_terms_dict[term]] = 0 |
|
teacher_math_word_cloud[math_terms_dict[term]] += len(matches) |
|
else: |
|
if math_terms_dict[term] not in student_math_word_cloud: |
|
student_math_word_cloud[math_terms_dict[term]] = 0 |
|
student_math_word_cloud[math_terms_dict[term]] += len(matches) |
|
match_list.append(math_terms_dict[term]) |
|
|
|
matched_positions.update((match.start(), match.end()) for match in matches) |
|
num_matches += len(matches) |
|
|
|
utt.num_math_terms = num_matches |
|
utt.math_terms = match_list |
|
|
|
|
|
teacher_dict_list = [] |
|
student_dict_list = [] |
|
dict_list = [] |
|
for word in teacher_math_word_cloud.keys(): |
|
teacher_dict_list.append( |
|
{'text': word, 'value': teacher_math_word_cloud[word], 'category': "math"}) |
|
dict_list.append({'text': word, 'value': teacher_math_word_cloud[word], 'category': "math"}) |
|
for word in student_math_word_cloud.keys(): |
|
student_dict_list.append( |
|
{'text': word, 'value': student_math_word_cloud[word], 'category': "math"}) |
|
dict_list.append({'text': word, 'value': student_math_word_cloud[word], 'category': "math"}) |
|
sorted_dict_list = sorted(dict_list, key=lambda x: x['value'], reverse=True) |
|
sorted_teacher_dict_list = sorted(teacher_dict_list, key=lambda x: x['value'], reverse=True) |
|
sorted_student_dict_list = sorted(student_dict_list, key=lambda x: x['value'], reverse=True) |
|
|
|
return sorted_dict_list[:50], sorted_teacher_dict_list[:50], sorted_student_dict_list[:50] |
|
|
|
class EndpointHandler(): |
|
def __init__(self, path="."): |
|
print("Loading models...") |
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
self.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") |
|
self.input_builder = BertInputBuilder(tokenizer=self.tokenizer) |
|
|
|
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
""" |
|
data args: |
|
inputs (:obj: `list`): |
|
List of dicts, where each dict represents an utterance; each utterance object must have a `speaker`, |
|
`text` and `uid`and can include list of custom properties |
|
parameters (:obj: `dict`) |
|
Return: |
|
A :obj:`list` | `dict`: will be serialized and returned |
|
""" |
|
|
|
utterances = data.pop("inputs", data) |
|
params = data.pop("parameters", None) |
|
|
|
transcript = Transcript(filename=params.pop("filename", None)) |
|
for utt in utterances: |
|
transcript.add_utterance(Utterance(**utt)) |
|
|
|
print("Running inference on %d examples..." % transcript.length()) |
|
logging.set_verbosity_info() |
|
|
|
uptake_model = UptakeModel( |
|
self.device, self.tokenizer, self.input_builder) |
|
uptake_speaker = params.pop("uptake_speaker", None) |
|
uptake_model.run_inference(transcript, min_prev_words=params['uptake_min_num_words'], |
|
uptake_speaker=uptake_speaker) |
|
del uptake_model |
|
|
|
|
|
reasoning_model = ReasoningModel( |
|
self.device, self.tokenizer, self.input_builder) |
|
reasoning_model.run_inference(transcript, uptake_speaker=uptake_speaker) |
|
del reasoning_model |
|
|
|
|
|
question_model = QuestionModel( |
|
self.device, self.tokenizer, self.input_builder) |
|
question_model.run_inference(transcript) |
|
del question_model |
|
|
|
|
|
focusing_question_model = FocusingQuestionModel( |
|
self.device, self.tokenizer, self.input_builder) |
|
focusing_question_model.run_inference(transcript, uptake_speaker=uptake_speaker) |
|
del focusing_question_model |
|
|
|
transcript.update_utterance_roles(uptake_speaker) |
|
sorted_math_cloud, teacher_math_cloud, student_math_cloud = run_math_density(transcript) |
|
transcript.calculate_aggregate_word_count() |
|
return_dict = {'talkDistribution': None, 'talkLength': None, 'talkMoments': None, 'studentTopWords': None, 'teacherTopWords': None} |
|
talk_dist, talk_len = transcript.get_talk_distribution_and_length(uptake_speaker) |
|
return_dict['talkDistribution'] = talk_dist |
|
return_dict['talkLength'] = talk_len |
|
talk_moments = transcript.get_talk_timeline() |
|
return_dict['talkMoments'] = talk_moments |
|
word_cloud, uptake_word_cloud, teacher_general_cloud, student_general_cloud = transcript.get_word_clouds() |
|
teacher_cloud = teacher_math_cloud + teacher_general_cloud |
|
student_cloud = student_math_cloud + student_general_cloud |
|
|
|
|
|
return_dict['teacherTopWords'] = teacher_cloud |
|
return_dict['studentTopWords'] = student_cloud |
|
|
|
return return_dict |
|
|