|
from flashtext import KeywordProcessor |
|
from nltk.tokenize import sent_tokenize |
|
from similarity.normalized_levenshtein import NormalizedLevenshtein |
|
from nltk.corpus import stopwords |
|
import torch |
|
from collections import OrderedDict |
|
import string |
|
import pke |
|
import nltk |
|
import random |
|
nltk.download('brown') |
|
nltk.download('stopwords') |
|
nltk.download('popular') |
|
|
|
|
|
def MCQs_available(word, s2v): |
|
word = word.replace(" ", "_") |
|
sense = s2v.get_best_sense(word) |
|
if sense is not None: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def edits(word): |
|
"All edits that are one edit away from `word`." |
|
letters = 'abcdefghijklmnopqrstuvwxyz '+string.punctuation |
|
splits = [(word[:i], word[i:]) for i in range(len(word) + 1)] |
|
deletes = [L + R[1:] for L, R in splits if R] |
|
transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1] |
|
replaces = [L + c + R[1:] for L, R in splits if R for c in letters] |
|
inserts = [L + c + R for L, R in splits for c in letters] |
|
return set(deletes + transposes + replaces + inserts) |
|
|
|
|
|
def sense2vec_get_words(word, s2v): |
|
output = [] |
|
|
|
word_preprocessed = word.translate( |
|
word.maketrans("", "", string.punctuation)) |
|
word_preprocessed = word_preprocessed.lower() |
|
|
|
word_edits = edits(word_preprocessed) |
|
|
|
word = word.replace(" ", "_") |
|
|
|
sense = s2v.get_best_sense(word) |
|
most_similar = s2v.most_similar(sense, n=15) |
|
|
|
compare_list = [word_preprocessed] |
|
for each_word in most_similar: |
|
append_word = each_word[0].split("|")[0].replace("_", " ") |
|
append_word = append_word.strip() |
|
append_word_processed = append_word.lower() |
|
append_word_processed = append_word_processed.translate( |
|
append_word_processed.maketrans("", "", string.punctuation)) |
|
if append_word_processed not in compare_list and word_preprocessed not in append_word_processed and append_word_processed not in word_edits: |
|
output.append(append_word.title()) |
|
compare_list.append(append_word_processed) |
|
|
|
out = list(OrderedDict.fromkeys(output)) |
|
|
|
return out |
|
|
|
|
|
def get_options(answer, s2v): |
|
distractors = [] |
|
|
|
try: |
|
distractors = sense2vec_get_words(answer, s2v) |
|
if len(distractors) > 0: |
|
print(" Sense2vec_distractors successful for word : ", answer) |
|
return distractors, "sense2vec" |
|
except: |
|
print(" Sense2vec_distractors failed for word : ", answer) |
|
|
|
return distractors, "None" |
|
|
|
|
|
def tokenize_sentences(text): |
|
sentences = [sent_tokenize(text)] |
|
sentences = [y for x in sentences for y in x] |
|
|
|
sentences = [sentence.strip() |
|
for sentence in sentences if len(sentence) > 20] |
|
return sentences |
|
|
|
|
|
def get_sentences_for_keyword(keywords, sentences): |
|
keyword_processor = KeywordProcessor() |
|
keyword_sentences = {} |
|
for word in keywords: |
|
word = word.strip() |
|
keyword_sentences[word] = [] |
|
keyword_processor.add_keyword(word) |
|
for sentence in sentences: |
|
keywords_found = keyword_processor.extract_keywords(sentence) |
|
for key in keywords_found: |
|
keyword_sentences[key].append(sentence) |
|
|
|
for key in keyword_sentences.keys(): |
|
values = keyword_sentences[key] |
|
values = sorted(values, key=len, reverse=True) |
|
keyword_sentences[key] = values |
|
|
|
delete_keys = [] |
|
for k in keyword_sentences.keys(): |
|
if len(keyword_sentences[k]) == 0: |
|
delete_keys.append(k) |
|
for del_key in delete_keys: |
|
del keyword_sentences[del_key] |
|
|
|
return keyword_sentences |
|
|
|
|
|
def is_far(words_list, currentword, thresh, normalized_levenshtein): |
|
threshold = thresh |
|
score_list = [] |
|
for word in words_list: |
|
score_list.append(normalized_levenshtein.distance( |
|
word.lower(), currentword.lower())) |
|
if min(score_list) >= threshold: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def filter_phrases(phrase_keys, max, normalized_levenshtein): |
|
filtered_phrases = [] |
|
if len(phrase_keys) > 0: |
|
filtered_phrases.append(phrase_keys[0]) |
|
for ph in phrase_keys[1:]: |
|
if is_far(filtered_phrases, ph, 0.7, normalized_levenshtein): |
|
filtered_phrases.append(ph) |
|
if len(filtered_phrases) >= max: |
|
break |
|
return filtered_phrases |
|
|
|
|
|
def get_nouns_multipartite(text): |
|
out = [] |
|
|
|
extractor = pke.unsupervised.MultipartiteRank() |
|
extractor.load_document(input=text, language='en') |
|
pos = {'PROPN', 'NOUN'} |
|
stoplist = list(string.punctuation) |
|
stoplist += stopwords.words('english') |
|
extractor.candidate_selection(pos=pos) |
|
|
|
|
|
|
|
try: |
|
extractor.candidate_weighting(alpha=1.1, |
|
threshold=0.75, |
|
method='average') |
|
except: |
|
return out |
|
|
|
keyphrases = extractor.get_n_best(n=10) |
|
|
|
for key in keyphrases: |
|
out.append(key[0]) |
|
|
|
return out |
|
|
|
|
|
def get_phrases(doc): |
|
phrases = {} |
|
for np in doc.noun_chunks: |
|
phrase = np.text |
|
len_phrase = len(phrase.split()) |
|
if len_phrase > 1: |
|
if phrase not in phrases: |
|
phrases[phrase] = 1 |
|
else: |
|
phrases[phrase] = phrases[phrase]+1 |
|
|
|
phrase_keys = list(phrases.keys()) |
|
phrase_keys = sorted(phrase_keys, key=lambda x: len(x), reverse=True) |
|
phrase_keys = phrase_keys[:50] |
|
return phrase_keys |
|
|
|
|
|
def get_keywords(nlp, text, max_keywords, s2v, fdist, normalized_levenshtein, no_of_sentences): |
|
doc = nlp(text) |
|
max_keywords = int(max_keywords) |
|
|
|
keywords = get_nouns_multipartite(text) |
|
keywords = sorted(keywords, key=lambda x: fdist[x]) |
|
keywords = filter_phrases(keywords, max_keywords, normalized_levenshtein) |
|
|
|
phrase_keys = get_phrases(doc) |
|
filtered_phrases = filter_phrases( |
|
phrase_keys, max_keywords, normalized_levenshtein) |
|
|
|
total_phrases = keywords + filtered_phrases |
|
|
|
total_phrases_filtered = filter_phrases(total_phrases, min( |
|
max_keywords, 2*no_of_sentences), normalized_levenshtein) |
|
|
|
answers = [] |
|
for answer in total_phrases_filtered: |
|
if answer not in answers and MCQs_available(answer, s2v): |
|
answers.append(answer) |
|
|
|
answers = answers[:max_keywords] |
|
return answers |
|
|
|
|
|
def generate_questions_mcq(keyword_sent_mapping, device, tokenizer, model, sense2vec, normalized_levenshtein): |
|
batch_text = [] |
|
|
|
answers = keyword_sent_mapping.keys() |
|
for answer in answers: |
|
txt = keyword_sent_mapping[answer] |
|
txt_str = "\n".join(txt) |
|
context = "context: " + txt_str |
|
text = context + " " + "answer: " + answer + " </s>" |
|
batch_text.append(text) |
|
print(batch_text) |
|
|
|
encoding = tokenizer.batch_encode_plus( |
|
batch_text, pad_to_max_length=True, return_tensors="pt") |
|
|
|
print("Running model for generation") |
|
input_ids, attention_masks = encoding["input_ids"].to( |
|
device), encoding["attention_mask"].to(device) |
|
|
|
with torch.no_grad(): |
|
outs = model.generate(input_ids=input_ids, |
|
attention_mask=attention_masks, |
|
max_length=150) |
|
|
|
output_array = {} |
|
output_array["questions"] = [] |
|
|
|
for index, val in enumerate(answers): |
|
individual_question = {} |
|
out = outs[index, :] |
|
dec = tokenizer.decode(out, skip_special_tokens=True, |
|
clean_up_tokenization_spaces=True) |
|
|
|
Question = dec.replace("question:", "") |
|
Question = Question.strip() |
|
individual_question["question_statement"] = Question |
|
individual_question["question_type"] = "MCQ" |
|
individual_question["answer"] = val |
|
individual_question["id"] = index+1 |
|
individual_question["options"], individual_question["options_algorithm"] = get_options( |
|
val, sense2vec) |
|
|
|
individual_question["options"] = filter_phrases( |
|
individual_question["options"], 10, normalized_levenshtein) |
|
index = 3 |
|
individual_question["extra_options"] = individual_question["options"][index:] |
|
individual_question["options"] = individual_question["options"][:index] |
|
individual_question["context"] = keyword_sent_mapping[val] |
|
|
|
if len(individual_question["options"]) > 0: |
|
output_array["questions"].append(individual_question) |
|
|
|
return output_array |
|
|
|
|
|
|
|
def generate_normal_questions(keyword_sent_mapping, device, tokenizer, model): |
|
batch_text = "" |
|
answers = keyword_sent_mapping.keys() |
|
for answer in answers: |
|
txt = keyword_sent_mapping[answer] |
|
context = "context: " + txt |
|
text = context + " " + "answer: " + answer + " </s>" |
|
batch_text.join(text) |
|
|
|
encoding = tokenizer.batch_encode_plus( |
|
batch_text, pad_to_max_length=True, return_tensors="pt") |
|
|
|
print("Running model for generation") |
|
input_ids, attention_masks = encoding["input_ids"].to( |
|
device), encoding["attention_mask"].to(device) |
|
|
|
with torch.no_grad(): |
|
outs = model.generate(input_ids=input_ids, |
|
attention_mask=attention_masks, |
|
max_length=150) |
|
|
|
output_array = {} |
|
output_array["questions"] = [] |
|
|
|
for index, val in enumerate(answers): |
|
individual_quest = {} |
|
out = outs[index, :] |
|
dec = tokenizer.decode(out, skip_special_tokens=True, |
|
clean_up_tokenization_spaces=True) |
|
|
|
Question = dec.replace('question:', '') |
|
Question = Question.strip() |
|
|
|
individual_quest['Question'] = Question |
|
individual_quest['Answer'] = val |
|
individual_quest["id"] = index+1 |
|
individual_quest["context"] = keyword_sent_mapping[val] |
|
|
|
output_array["questions"].append(individual_quest) |
|
|
|
return output_array |
|
|
|
|
|
def random_choice(): |
|
a = random.choice([0, 1]) |
|
return bool(a) |
|
|