|
|
|
from textwrap3 import wrap |
|
|
|
text = """Elon Musk has shown again he can influence the digital currency market with just his tweets. After saying that his electric vehicle-making company |
|
Tesla will not accept payments in Bitcoin because of environmental concerns, he tweeted that he was working with developers of Dogecoin to improve |
|
system transaction efficiency. Following the two distinct statements from him, the world's largest cryptocurrency hit a two-month low, while Dogecoin |
|
rallied by about 20 percent. The SpaceX CEO has in recent months often tweeted in support of Dogecoin, but rarely for Bitcoin. In a recent tweet, |
|
Musk put out a statement from Tesla that it was “concerned” about the rapidly increasing use of fossil fuels for Bitcoin (price in India) mining and |
|
transaction, and hence was suspending vehicle purchases using the cryptocurrency. A day later he again tweeted saying, “To be clear, I strongly |
|
believe in crypto, but it can't drive a massive increase in fossil fuel use, especially coal”. It triggered a downward spiral for Bitcoin value but |
|
the cryptocurrency has stabilised since. A number of Twitter users welcomed Musk's statement. One of them said it's time people started realising |
|
that Dogecoin “is here to stay” and another referred to Musk's previous assertion that crypto could become the world's future currency.""" |
|
|
|
for wrp in wrap(text, 150): |
|
print (wrp) |
|
print ("\n") |
|
|
|
"""## Example 2""" |
|
|
|
import torch |
|
from transformers import T5ForConditionalGeneration,T5Tokenizer |
|
summary_model = T5ForConditionalGeneration.from_pretrained('t5-base') |
|
summary_tokenizer = T5Tokenizer.from_pretrained('t5-base') |
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
summary_model = summary_model.to(device) |
|
|
|
import random |
|
import numpy as np |
|
|
|
def set_seed(seed: int): |
|
random.seed(seed) |
|
np.random.seed(seed) |
|
torch.manual_seed(seed) |
|
torch.cuda.manual_seed_all(seed) |
|
|
|
set_seed(42) |
|
|
|
import nltk |
|
nltk.download('punkt') |
|
nltk.download('brown') |
|
nltk.download('wordnet') |
|
from nltk.corpus import wordnet as wn |
|
from nltk.tokenize import sent_tokenize |
|
|
|
def postprocesstext (content): |
|
final="" |
|
for sent in sent_tokenize(content): |
|
sent = sent.capitalize() |
|
final = final +" "+sent |
|
return final |
|
|
|
|
|
def summarizer(text,model,tokenizer): |
|
text = text.strip().replace("\n"," ") |
|
text = "summarize: "+text |
|
|
|
max_len = 512 |
|
encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) |
|
|
|
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] |
|
|
|
outs = model.generate(input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
early_stopping=True, |
|
num_beams=3, |
|
num_return_sequences=1, |
|
no_repeat_ngram_size=2, |
|
min_length = 75, |
|
max_length=300) |
|
|
|
|
|
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] |
|
summary = dec[0] |
|
summary = postprocesstext(summary) |
|
summary= summary.strip() |
|
|
|
return summary |
|
|
|
|
|
summarized_text = summarizer(text,summary_model,summary_tokenizer) |
|
|
|
|
|
print ("\noriginal Text >>") |
|
for wrp in wrap(text, 150): |
|
print (wrp) |
|
print ("\n") |
|
print ("Summarized Text >>") |
|
for wrp in wrap(summarized_text, 150): |
|
print (wrp) |
|
print ("\n") |
|
|
|
"""# **Answer Span Extraction (Keywords and Noun Phrases)**""" |
|
|
|
total = 10 |
|
|
|
import nltk |
|
nltk.download('stopwords') |
|
from nltk.corpus import stopwords |
|
import string |
|
import pke |
|
import traceback |
|
|
|
def get_nouns_multipartite(content): |
|
out=[] |
|
try: |
|
extractor = pke.unsupervised.MultipartiteRank() |
|
extractor.load_document(input=content,language='en') |
|
|
|
pos = {'PROPN','NOUN'} |
|
|
|
stoplist = list(string.punctuation) |
|
stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-'] |
|
stoplist += stopwords.words('english') |
|
|
|
extractor.candidate_selection(pos=pos) |
|
|
|
|
|
|
|
extractor.candidate_weighting(alpha=1.1, |
|
threshold=0.75, |
|
method='average') |
|
keyphrases = extractor.get_n_best(n=15) |
|
|
|
|
|
for val in keyphrases: |
|
out.append(val[0]) |
|
except: |
|
out = [] |
|
traceback.print_exc() |
|
|
|
return out |
|
|
|
from flashtext import KeywordProcessor |
|
|
|
|
|
def get_keywords(originaltext,summarytext,total): |
|
keywords = get_nouns_multipartite(originaltext) |
|
print ("keywords unsummarized: ",keywords) |
|
keyword_processor = KeywordProcessor() |
|
for keyword in keywords: |
|
keyword_processor.add_keyword(keyword) |
|
|
|
keywords_found = keyword_processor.extract_keywords(summarytext) |
|
keywords_found = list(set(keywords_found)) |
|
print ("keywords_found in summarized: ",keywords_found) |
|
|
|
important_keywords =[] |
|
for keyword in keywords: |
|
if keyword in keywords_found: |
|
important_keywords.append(keyword) |
|
|
|
return important_keywords[:total] |
|
|
|
|
|
imp_keywords = get_keywords(text,summarized_text,total) |
|
print (imp_keywords) |
|
|
|
"""# **Question generation with T5**""" |
|
|
|
question_model = T5ForConditionalGeneration.from_pretrained('ramsrigouthamg/t5_squad_v1') |
|
question_tokenizer = T5Tokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1') |
|
question_model = question_model.to(device) |
|
|
|
def get_question(context,answer,model,tokenizer): |
|
text = "context: {} answer: {}".format(context,answer) |
|
encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) |
|
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] |
|
|
|
outs = model.generate(input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
early_stopping=True, |
|
num_beams=5, |
|
num_return_sequences=1, |
|
no_repeat_ngram_size=2, |
|
max_length=72) |
|
|
|
|
|
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] |
|
|
|
|
|
Question = dec[0].replace("question:","") |
|
Question= Question.strip() |
|
return Question |
|
|
|
|
|
|
|
for wrp in wrap(summarized_text, 150): |
|
print (wrp) |
|
print ("\n") |
|
|
|
for answer in imp_keywords: |
|
ques = get_question(summarized_text,answer,question_model,question_tokenizer) |
|
print (ques) |
|
print (answer.capitalize()) |
|
print ("\n") |
|
|
|
"""# **Gradio UI Visualization**""" |
|
|
|
|
|
|
|
|
|
import numpy as np |
|
from sense2vec import Sense2Vec |
|
s2v = Sense2Vec().from_disk('s2v_old') |
|
|
|
from sentence_transformers import SentenceTransformer |
|
|
|
sentence_transformer_model = SentenceTransformer('msmarco-distilbert-base-v3') |
|
|
|
from similarity.normalized_levenshtein import NormalizedLevenshtein |
|
normalized_levenshtein = NormalizedLevenshtein() |
|
|
|
def filter_same_sense_words(original,wordlist): |
|
filtered_words=[] |
|
base_sense =original.split('|')[1] |
|
print (base_sense) |
|
for eachword in wordlist: |
|
if eachword[0].split('|')[1] == base_sense: |
|
filtered_words.append(eachword[0].split('|')[0].replace("_", " ").title().strip()) |
|
return filtered_words |
|
|
|
def get_highest_similarity_score(wordlist,wrd): |
|
score=[] |
|
for each in wordlist: |
|
score.append(normalized_levenshtein.similarity(each.lower(),wrd.lower())) |
|
return max(score) |
|
|
|
def sense2vec_get_words(word,s2v,topn,question): |
|
output = [] |
|
print ("word ",word) |
|
try: |
|
sense = s2v.get_best_sense(word, senses= ["NOUN", "PERSON","PRODUCT","LOC","ORG","EVENT","NORP","WORK OF ART","FAC","GPE","NUM","FACILITY"]) |
|
most_similar = s2v.most_similar(sense, n=topn) |
|
|
|
output = filter_same_sense_words(sense,most_similar) |
|
print ("Similar ",output) |
|
except: |
|
output =[] |
|
|
|
threshold = 0.6 |
|
final=[word] |
|
checklist =question.split() |
|
for x in output: |
|
if get_highest_similarity_score(final,x)<threshold and x not in final and x not in checklist: |
|
final.append(x) |
|
|
|
return final[1:] |
|
|
|
def mmr(doc_embedding, word_embeddings, words, top_n, lambda_param): |
|
|
|
|
|
word_doc_similarity = cosine_similarity(word_embeddings, doc_embedding) |
|
word_similarity = cosine_similarity(word_embeddings) |
|
|
|
|
|
keywords_idx = [np.argmax(word_doc_similarity)] |
|
candidates_idx = [i for i in range(len(words)) if i != keywords_idx[0]] |
|
|
|
for _ in range(top_n - 1): |
|
|
|
|
|
candidate_similarities = word_doc_similarity[candidates_idx, :] |
|
target_similarities = np.max(word_similarity[candidates_idx][:, keywords_idx], axis=1) |
|
|
|
|
|
mmr = (lambda_param) * candidate_similarities - (1-lambda_param) * target_similarities.reshape(-1, 1) |
|
mmr_idx = candidates_idx[np.argmax(mmr)] |
|
|
|
|
|
keywords_idx.append(mmr_idx) |
|
candidates_idx.remove(mmr_idx) |
|
|
|
return [words[idx] for idx in keywords_idx] |
|
|
|
from collections import OrderedDict |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
def get_distractors_wordnet(word): |
|
distractors=[] |
|
try: |
|
syn = wn.synsets(word,'n')[0] |
|
|
|
word= word.lower() |
|
orig_word = word |
|
if len(word.split())>0: |
|
word = word.replace(" ","_") |
|
hypernym = syn.hypernyms() |
|
if len(hypernym) == 0: |
|
return distractors |
|
for item in hypernym[0].hyponyms(): |
|
name = item.lemmas()[0].name() |
|
|
|
if name == orig_word: |
|
continue |
|
name = name.replace("_"," ") |
|
name = " ".join(w.capitalize() for w in name.split()) |
|
if name is not None and name not in distractors: |
|
distractors.append(name) |
|
except: |
|
print ("Wordnet distractors not found") |
|
return distractors |
|
|
|
def get_distractors (word,origsentence,sense2vecmodel,sentencemodel,top_n,lambdaval): |
|
distractors = sense2vec_get_words(word,sense2vecmodel,top_n,origsentence) |
|
print ("distractors ",distractors) |
|
if len(distractors) ==0: |
|
return distractors |
|
distractors_new = [word.capitalize()] |
|
distractors_new.extend(distractors) |
|
|
|
|
|
embedding_sentence = origsentence+ " "+word.capitalize() |
|
|
|
keyword_embedding = sentencemodel.encode([embedding_sentence]) |
|
distractor_embeddings = sentencemodel.encode(distractors_new) |
|
|
|
|
|
max_keywords = min(len(distractors_new),5) |
|
filtered_keywords = mmr(keyword_embedding, distractor_embeddings,distractors_new,max_keywords,lambdaval) |
|
|
|
final = [word.capitalize()] |
|
for wrd in filtered_keywords: |
|
if wrd.lower() !=word.lower(): |
|
final.append(wrd.capitalize()) |
|
final = final[1:] |
|
return final |
|
|
|
sent = "What cryptocurrency did Musk rarely tweet about?" |
|
keyword = "Bitcoin" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print (get_distractors(keyword,sent,s2v,sentence_transformer_model,40,0.2)) |
|
|
|
"""# **Gradio Visualization with MCQs**""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
import re |
|
|
|
|
|
context = gr.Textbox(lines=10, placeholder="Enter paragraph/content here...", label="Enter your content (words input must be more than 150 words).") |
|
total = gr.Slider(1,10, value=1,step=1, label="Total Number Of Questions") |
|
subject = gr.Textbox(placeholder="Enter subject/title here...", label="Enter your title (title must contain 1 word)") |
|
|
|
output = gr.HTML( label="Question and Answers") |
|
|
|
|
|
def generate_question_text(context,subject,total): |
|
|
|
words_text = len(re.findall(r'\w+', context)) |
|
words_subject = len(re.findall(r'\w+', subject)) |
|
|
|
if (words_text < 150): |
|
raise gr.Error("Invalid Input (Words limit must be more than 150 words).") |
|
|
|
|
|
elif (words_subject < 1): |
|
raise gr.Error("Invalid Input (Title must be one or more than one word).") |
|
|
|
|
|
elif (words_subject < 1): |
|
raise gr.Error("Invalid Input (Title must be one or more than one word).") |
|
|
|
else: |
|
summary_text = summarizer(context,summary_model,summary_tokenizer) |
|
for wrp in wrap(summary_text, 150): |
|
print (wrp) |
|
|
|
np = get_keywords(context,summary_text,total) |
|
random.shuffle(np) |
|
print ("\n\nNoun phrases",np) |
|
|
|
output="<b style='color:black;'>Select/Tick the correct answer.</b><br><br>" |
|
i = 1 |
|
for answer in np: |
|
ques = get_question(summary_text,answer,question_model,question_tokenizer) |
|
|
|
distractors = get_distractors(answer.capitalize(),ques,s2v,sentence_transformer_model,40,0.2) |
|
|
|
output = output + "<b style='color:black;'>Q"+ str(i) + ") " + ques + "</b><br/>" |
|
|
|
i += 1 |
|
|
|
answerlist=[answer.capitalize()] |
|
|
|
|
|
for distractor in distractors[:3]: |
|
answerlist.append(distractor) |
|
random.shuffle(answerlist) |
|
|
|
print(answerlist) |
|
|
|
for answer in answerlist: |
|
output = output +answer.capitalize()+ "<br/>" |
|
|
|
output = output + "<br><b style='color:black;'>" + "Correct Answer Key:</b><br>" |
|
|
|
i = 1 |
|
for answer in np: |
|
output = output + "<b style='color:green;'>Ans"+ str(i) + ") " +answer.capitalize()+ "</b>" |
|
output = output + "<br>" |
|
i += 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return output |
|
|
|
iface = gr.Interface( |
|
fn=generate_question_text, |
|
inputs=[context,subject,total], |
|
outputs=output, |
|
allow_flagging="never",flagging_options=["Save Data"]) |
|
|
|
|
|
|
|
def generate_question(context,subject,total): |
|
summary_text = summarizer(context,summary_model,summary_tokenizer) |
|
for wrp in wrap(summary_text, 150): |
|
print (wrp) |
|
|
|
np = get_keywords(context,summary_text,total) |
|
random.shuffle(np) |
|
|
|
print ("\n\nNoun phrases",np) |
|
|
|
output="<b style='color:black;'>Select/Tick the correct answer.</b><br><br>" |
|
i = 1 |
|
for answer in np: |
|
ques = get_question(summary_text,answer,question_model,question_tokenizer) |
|
|
|
distractors = get_distractors(answer.capitalize(),ques,s2v,sentence_transformer_model,40,0.2) |
|
|
|
output = output + "<b style='color:black;'>Q"+ str(i) + ") " + ques + "</b><br/>" |
|
|
|
i += 1 |
|
|
|
answerlist=[answer.capitalize()] |
|
|
|
|
|
for distractor in distractors[:3]: |
|
answerlist.append(distractor) |
|
random.shuffle(answerlist) |
|
|
|
|
|
|
|
for answer in answerlist: |
|
output = output +answer.capitalize()+ "<br/>" |
|
|
|
output = output + "<br><b style='color:black;'>" + "Correct Answer Key:</b><br>" |
|
|
|
i = 1 |
|
for answer in np: |
|
output = output + "<b style='color:green;'>Ans"+ str(i) + ") " +answer.capitalize()+ "</b>" |
|
output = output + "<br/>" |
|
i += 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return output |
|
|
|
import pandas as pd |
|
|
|
file =None |
|
|
|
def filecreate(x,subject,total): |
|
|
|
with open(x.name) as fo: |
|
text = fo.read() |
|
|
|
|
|
words_text = len(re.findall(r'\w+', text)) |
|
words_subject = len(re.findall(r'\w+', subject)) |
|
|
|
|
|
if (words_text < 150): |
|
raise gr.Error("Invalid Input (Words limit must be more than 150 words).") |
|
|
|
|
|
elif (words_subject < 1): |
|
raise gr.Error("Invalid Input (Title must be one or more than one word).") |
|
|
|
else: |
|
generated = generate_question(text,subject, total) |
|
|
|
|
|
return generated |
|
|
|
|
|
|
|
import gradio as gr |
|
|
|
context = gr.HTML(label="Text") |
|
file = gr.File(label="Upload your *.txt file (File must contain more than 150 words).") |
|
total = gr.Slider(1,10, value=1,step=1, label="Total Number Of Questions") |
|
subject = gr.Textbox(placeholder="Enter subject/title here...", label="Enter your title (title must contain 1 word).") |
|
|
|
|
|
fface = gr.Interface( |
|
fn=filecreate, |
|
inputs=[file,subject,total], |
|
outputs=context, |
|
|
|
allow_flagging="never",flagging_options=["Save Data"]) |
|
|
|
|
|
|
|
demo = gr.TabbedInterface([iface, fface], ["Text", "Upload File"]) |
|
demo.launch(debug=True, show_api=False) |