mcqs / app.py
Technozam's picture
Update app.py
6c3fd16
from textwrap3 import wrap
text = """Elon Musk has shown again he can influence the digital currency market with just his tweets. After saying that his electric vehicle-making company
Tesla will not accept payments in Bitcoin because of environmental concerns, he tweeted that he was working with developers of Dogecoin to improve
system transaction efficiency. Following the two distinct statements from him, the world's largest cryptocurrency hit a two-month low, while Dogecoin
rallied by about 20 percent. The SpaceX CEO has in recent months often tweeted in support of Dogecoin, but rarely for Bitcoin. In a recent tweet,
Musk put out a statement from Tesla that it was “concerned” about the rapidly increasing use of fossil fuels for Bitcoin (price in India) mining and
transaction, and hence was suspending vehicle purchases using the cryptocurrency. A day later he again tweeted saying, “To be clear, I strongly
believe in crypto, but it can't drive a massive increase in fossil fuel use, especially coal”. It triggered a downward spiral for Bitcoin value but
the cryptocurrency has stabilised since. A number of Twitter users welcomed Musk's statement. One of them said it's time people started realising
that Dogecoin “is here to stay” and another referred to Musk's previous assertion that crypto could become the world's future currency."""
for wrp in wrap(text, 150):
print (wrp)
print ("\n")
"""## Example 2"""
import torch
from transformers import T5ForConditionalGeneration,T5Tokenizer
summary_model = T5ForConditionalGeneration.from_pretrained('t5-base')
summary_tokenizer = T5Tokenizer.from_pretrained('t5-base')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
summary_model = summary_model.to(device)
import random
import numpy as np
def set_seed(seed: int):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
set_seed(42)
import nltk
nltk.download('punkt')
nltk.download('brown')
nltk.download('wordnet')
from nltk.corpus import wordnet as wn
from nltk.tokenize import sent_tokenize
def postprocesstext (content):
final=""
for sent in sent_tokenize(content):
sent = sent.capitalize()
final = final +" "+sent
return final
def summarizer(text,model,tokenizer):
text = text.strip().replace("\n"," ")
text = "summarize: "+text
# print (text)
max_len = 512
encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device)
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"]
outs = model.generate(input_ids=input_ids,
attention_mask=attention_mask,
early_stopping=True,
num_beams=3,
num_return_sequences=1,
no_repeat_ngram_size=2,
min_length = 75,
max_length=300)
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs]
summary = dec[0]
summary = postprocesstext(summary)
summary= summary.strip()
return summary
summarized_text = summarizer(text,summary_model,summary_tokenizer)
print ("\noriginal Text >>")
for wrp in wrap(text, 150):
print (wrp)
print ("\n")
print ("Summarized Text >>")
for wrp in wrap(summarized_text, 150):
print (wrp)
print ("\n")
"""# **Answer Span Extraction (Keywords and Noun Phrases)**"""
total = 10
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
import string
import pke
import traceback
def get_nouns_multipartite(content):
out=[]
try:
extractor = pke.unsupervised.MultipartiteRank()
extractor.load_document(input=content,language='en')
# not contain punctuation marks or stopwords as candidates.
pos = {'PROPN','NOUN'}
#pos = {'PROPN','NOUN'}
stoplist = list(string.punctuation)
stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-']
stoplist += stopwords.words('english')
# extractor.candidate_selection(pos=pos, stoplist=stoplist)
extractor.candidate_selection(pos=pos)
# 4. build the Multipartite graph and rank candidates using random walk,
# alpha controls the weight adjustment mechanism, see TopicRank for
# threshold/method parameters.
extractor.candidate_weighting(alpha=1.1,
threshold=0.75,
method='average')
keyphrases = extractor.get_n_best(n=15)
for val in keyphrases:
out.append(val[0])
except:
out = []
traceback.print_exc()
return out
from flashtext import KeywordProcessor
def get_keywords(originaltext,summarytext,total):
keywords = get_nouns_multipartite(originaltext)
print ("keywords unsummarized: ",keywords)
keyword_processor = KeywordProcessor()
for keyword in keywords:
keyword_processor.add_keyword(keyword)
keywords_found = keyword_processor.extract_keywords(summarytext)
keywords_found = list(set(keywords_found))
print ("keywords_found in summarized: ",keywords_found)
important_keywords =[]
for keyword in keywords:
if keyword in keywords_found:
important_keywords.append(keyword)
return important_keywords[:total]
imp_keywords = get_keywords(text,summarized_text,total)
print (imp_keywords)
"""# **Question generation with T5**"""
question_model = T5ForConditionalGeneration.from_pretrained('ramsrigouthamg/t5_squad_v1')
question_tokenizer = T5Tokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1')
question_model = question_model.to(device)
def get_question(context,answer,model,tokenizer):
text = "context: {} answer: {}".format(context,answer)
encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device)
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"]
outs = model.generate(input_ids=input_ids,
attention_mask=attention_mask,
early_stopping=True,
num_beams=5,
num_return_sequences=1,
no_repeat_ngram_size=2,
max_length=72)
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs]
Question = dec[0].replace("question:","")
Question= Question.strip()
return Question
for wrp in wrap(summarized_text, 150):
print (wrp)
print ("\n")
for answer in imp_keywords:
ques = get_question(summarized_text,answer,question_model,question_tokenizer)
print (ques)
print (answer.capitalize())
print ("\n")
"""# **Gradio UI Visualization**"""
# wget https://github.com/explosion/sense2vec/releases/download/v1.0.0/s2v_reddit_2015_md.tar.gz
# !tar -xvf s2v_reddit_2015_md.tar.gz
import numpy as np
from sense2vec import Sense2Vec
s2v = Sense2Vec().from_disk('s2v_old')
from sentence_transformers import SentenceTransformer
# paraphrase-distilroberta-base-v1
sentence_transformer_model = SentenceTransformer('msmarco-distilbert-base-v3')
from similarity.normalized_levenshtein import NormalizedLevenshtein
normalized_levenshtein = NormalizedLevenshtein()
def filter_same_sense_words(original,wordlist):
filtered_words=[]
base_sense =original.split('|')[1]
print (base_sense)
for eachword in wordlist:
if eachword[0].split('|')[1] == base_sense:
filtered_words.append(eachword[0].split('|')[0].replace("_", " ").title().strip())
return filtered_words
def get_highest_similarity_score(wordlist,wrd):
score=[]
for each in wordlist:
score.append(normalized_levenshtein.similarity(each.lower(),wrd.lower()))
return max(score)
def sense2vec_get_words(word,s2v,topn,question):
output = []
print ("word ",word)
try:
sense = s2v.get_best_sense(word, senses= ["NOUN", "PERSON","PRODUCT","LOC","ORG","EVENT","NORP","WORK OF ART","FAC","GPE","NUM","FACILITY"])
most_similar = s2v.most_similar(sense, n=topn)
# print (most_similar)
output = filter_same_sense_words(sense,most_similar)
print ("Similar ",output)
except:
output =[]
threshold = 0.6
final=[word]
checklist =question.split()
for x in output:
if get_highest_similarity_score(final,x)<threshold and x not in final and x not in checklist:
final.append(x)
return final[1:]
def mmr(doc_embedding, word_embeddings, words, top_n, lambda_param):
# Extract similarity within words, and between words and the document
word_doc_similarity = cosine_similarity(word_embeddings, doc_embedding)
word_similarity = cosine_similarity(word_embeddings)
# Initialize candidates and already choose best keyword/keyphrase
keywords_idx = [np.argmax(word_doc_similarity)]
candidates_idx = [i for i in range(len(words)) if i != keywords_idx[0]]
for _ in range(top_n - 1):
# Extract similarities within candidates and
# between candidates and selected keywords/phrases
candidate_similarities = word_doc_similarity[candidates_idx, :]
target_similarities = np.max(word_similarity[candidates_idx][:, keywords_idx], axis=1)
# Calculate MMR
mmr = (lambda_param) * candidate_similarities - (1-lambda_param) * target_similarities.reshape(-1, 1)
mmr_idx = candidates_idx[np.argmax(mmr)]
# Update keywords & candidates
keywords_idx.append(mmr_idx)
candidates_idx.remove(mmr_idx)
return [words[idx] for idx in keywords_idx]
from collections import OrderedDict
from sklearn.metrics.pairwise import cosine_similarity
def get_distractors_wordnet(word):
distractors=[]
try:
syn = wn.synsets(word,'n')[0]
word= word.lower()
orig_word = word
if len(word.split())>0:
word = word.replace(" ","_")
hypernym = syn.hypernyms()
if len(hypernym) == 0:
return distractors
for item in hypernym[0].hyponyms():
name = item.lemmas()[0].name()
#print ("name ",name, " word",orig_word)
if name == orig_word:
continue
name = name.replace("_"," ")
name = " ".join(w.capitalize() for w in name.split())
if name is not None and name not in distractors:
distractors.append(name)
except:
print ("Wordnet distractors not found")
return distractors
def get_distractors (word,origsentence,sense2vecmodel,sentencemodel,top_n,lambdaval):
distractors = sense2vec_get_words(word,sense2vecmodel,top_n,origsentence)
print ("distractors ",distractors)
if len(distractors) ==0:
return distractors
distractors_new = [word.capitalize()]
distractors_new.extend(distractors)
# print ("distractors_new .. ",distractors_new)
embedding_sentence = origsentence+ " "+word.capitalize()
# embedding_sentence = word
keyword_embedding = sentencemodel.encode([embedding_sentence])
distractor_embeddings = sentencemodel.encode(distractors_new)
# filtered_keywords = mmr(keyword_embedding, distractor_embeddings,distractors,4,0.7)
max_keywords = min(len(distractors_new),5)
filtered_keywords = mmr(keyword_embedding, distractor_embeddings,distractors_new,max_keywords,lambdaval)
# filtered_keywords = filtered_keywords[1:]
final = [word.capitalize()]
for wrd in filtered_keywords:
if wrd.lower() !=word.lower():
final.append(wrd.capitalize())
final = final[1:]
return final
sent = "What cryptocurrency did Musk rarely tweet about?"
keyword = "Bitcoin"
# sent = "What did Musk say he was working with to improve system transaction efficiency?"
# keyword= "Dogecoin"
# sent = "What company did Musk say would not accept bitcoin payments?"
# keyword= "Tesla"
# sent = "What has Musk often tweeted in support of?"
# keyword = "Cryptocurrency"
print (get_distractors(keyword,sent,s2v,sentence_transformer_model,40,0.2))
"""# **Gradio Visualization with MCQs**"""
# import mysql.connector
# import datetime;
# mydb = mysql.connector.connect(
# host="qtechdb-1.cexugk1h8rui.ap-northeast-1.rds.amazonaws.com",
# user="admin",
# password="F3v2vGWzb8vaniE3nqzi",
# database="spring_social"
# )
import gradio as gr
import re
context = gr.Textbox(lines=10, placeholder="Enter paragraph/content here...", label="Enter your content (words input must be more than 150 words).")
total = gr.Slider(1,10, value=1,step=1, label="Total Number Of Questions")
subject = gr.Textbox(placeholder="Enter subject/title here...", label="Enter your title (title must contain 1 word)")
output = gr.HTML( label="Question and Answers")
def generate_question_text(context,subject,total):
words_text = len(re.findall(r'\w+', context))
words_subject = len(re.findall(r'\w+', subject))
if (words_text < 150):
raise gr.Error("Invalid Input (Words limit must be more than 150 words).")
# print("Number of words:", words)
elif (words_subject < 1):
raise gr.Error("Invalid Input (Title must be one or more than one word).")
elif (words_subject < 1):
raise gr.Error("Invalid Input (Title must be one or more than one word).")
else:
summary_text = summarizer(context,summary_model,summary_tokenizer)
for wrp in wrap(summary_text, 150):
print (wrp)
# np = getnounphrases(summary_text,sentence_transformer_model,3)
np = get_keywords(context,summary_text,total)
random.shuffle(np)
print ("\n\nNoun phrases",np)
output="<b style='color:black;'>Select/Tick the correct answer.</b><br><br>"
i = 1
for answer in np:
ques = get_question(summary_text,answer,question_model,question_tokenizer)
distractors = get_distractors(answer.capitalize(),ques,s2v,sentence_transformer_model,40,0.2)
# output= output + ques + "\n" + "Ans: "+answer.capitalize() + "\n\n"
output = output + "<b style='color:black;'>Q"+ str(i) + ") " + ques + "</b><br/>"
# output = output + "<br>"
i += 1
answerlist=[answer.capitalize()]
# output = output + "<br><b> ▪ " +answer.capitalize()+ "</b>"
for distractor in distractors[:3]:
answerlist.append(distractor)
random.shuffle(answerlist)
print(answerlist)
for answer in answerlist:
output = output +answer.capitalize()+ "<br/>"
output = output + "<br><b style='color:black;'>" + "Correct Answer Key:</b><br>"
i = 1
for answer in np:
output = output + "<b style='color:green;'>Ans"+ str(i) + ") " +answer.capitalize()+ "</b>"
output = output + "<br>"
i += 1
# mycursor = mydb.cursor()
# timedate = datetime.datetime.now()
# sql = "INSERT INTO mcqstexts (subject, input, output, timedate) VALUES (%s,%s, %s,%s)"
# val = (subject, context, output, timedate)
# mycursor.execute(sql, val)
# mydb.commit()
# print(mycursor.rowcount, "record inserted.")
return output
iface = gr.Interface(
fn=generate_question_text,
inputs=[context,subject,total],
outputs=output,
allow_flagging="never",flagging_options=["Save Data"])
# iface.launch(debug=True, share=True)
def generate_question(context,subject,total):
summary_text = summarizer(context,summary_model,summary_tokenizer)
for wrp in wrap(summary_text, 150):
print (wrp)
# np = getnounphrases(summary_text,sentence_transformer_model,3)
np = get_keywords(context,summary_text,total)
random.shuffle(np)
print ("\n\nNoun phrases",np)
output="<b style='color:black;'>Select/Tick the correct answer.</b><br><br>"
i = 1
for answer in np:
ques = get_question(summary_text,answer,question_model,question_tokenizer)
distractors = get_distractors(answer.capitalize(),ques,s2v,sentence_transformer_model,40,0.2)
# output= output + ques + "\n" + "Ans: "+answer.capitalize() + "\n\n"
output = output + "<b style='color:black;'>Q"+ str(i) + ") " + ques + "</b><br/>"
# output = output + "<br>"
i += 1
answerlist=[answer.capitalize()]
# output = output + "<br><b> ▪ " +answer.capitalize()+ "</b>"
for distractor in distractors[:3]:
answerlist.append(distractor)
random.shuffle(answerlist)
# print(answerlist)
for answer in answerlist:
output = output +answer.capitalize()+ "<br/>"
output = output + "<br><b style='color:black;'>" + "Correct Answer Key:</b><br>"
i = 1
for answer in np:
output = output + "<b style='color:green;'>Ans"+ str(i) + ") " +answer.capitalize()+ "</b>"
output = output + "<br/>"
i += 1
# mycursor = mydb.cursor()
# timedate = datetime.datetime.now()
# sql = "INSERT INTO mcqstexts (subject, input, output, timedate) VALUES (%s,%s, %s,%s)"
# val = (subject, context, output, timedate)
# mycursor.execute(sql, val)
# mydb.commit()
# print(mycursor.rowcount, "record inserted.")
return output
import pandas as pd
file =None
def filecreate(x,subject,total):
with open(x.name) as fo:
text = fo.read()
# print(text)
words_text = len(re.findall(r'\w+', text))
words_subject = len(re.findall(r'\w+', subject))
if (words_text < 150):
raise gr.Error("Invalid Input (Words limit must be more than 150 words).")
# print("Number of words:", words)
elif (words_subject < 1):
raise gr.Error("Invalid Input (Title must be one or more than one word).")
else:
generated = generate_question(text,subject, total)
return generated
# filecreate(file,2)
import gradio as gr
context = gr.HTML(label="Text")
file = gr.File(label="Upload your *.txt file (File must contain more than 150 words).")
total = gr.Slider(1,10, value=1,step=1, label="Total Number Of Questions")
subject = gr.Textbox(placeholder="Enter subject/title here...", label="Enter your title (title must contain 1 word).")
fface = gr.Interface(
fn=filecreate,
inputs=[file,subject,total],
outputs=context,
# css=".gradio-container {background-image: url('file=blue.jpg')}",
allow_flagging="never",flagging_options=["Save Data"])
# fface.launch(debug=True, share=True)
demo = gr.TabbedInterface([iface, fface], ["Text", "Upload File"])
demo.launch(debug=True, show_api=False)