Spaces:
Runtime error
Runtime error
from textwrap3 import wrap | |
text = """Elon Musk has shown again he can influence the digital currency market with just his tweets. After saying that his electric vehicle-making company | |
Tesla will not accept payments in Bitcoin because of environmental concerns, he tweeted that he was working with developers of Dogecoin to improve | |
system transaction efficiency. Following the two distinct statements from him, the world's largest cryptocurrency hit a two-month low, while Dogecoin | |
rallied by about 20 percent. The SpaceX CEO has in recent months often tweeted in support of Dogecoin, but rarely for Bitcoin. In a recent tweet, | |
Musk put out a statement from Tesla that it was “concerned” about the rapidly increasing use of fossil fuels for Bitcoin (price in India) mining and | |
transaction, and hence was suspending vehicle purchases using the cryptocurrency. A day later he again tweeted saying, “To be clear, I strongly | |
believe in crypto, but it can't drive a massive increase in fossil fuel use, especially coal”. It triggered a downward spiral for Bitcoin value but | |
the cryptocurrency has stabilised since. A number of Twitter users welcomed Musk's statement. One of them said it's time people started realising | |
that Dogecoin “is here to stay” and another referred to Musk's previous assertion that crypto could become the world's future currency.""" | |
for wrp in wrap(text, 150): | |
print (wrp) | |
print ("\n") | |
"""## Example 2""" | |
import torch | |
from transformers import T5ForConditionalGeneration,T5Tokenizer | |
summary_model = T5ForConditionalGeneration.from_pretrained('t5-base') | |
summary_tokenizer = T5Tokenizer.from_pretrained('t5-base') | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
summary_model = summary_model.to(device) | |
import random | |
import numpy as np | |
def set_seed(seed: int): | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
set_seed(42) | |
import nltk | |
nltk.download('punkt') | |
nltk.download('brown') | |
nltk.download('wordnet') | |
from nltk.corpus import wordnet as wn | |
from nltk.tokenize import sent_tokenize | |
def postprocesstext (content): | |
final="" | |
for sent in sent_tokenize(content): | |
sent = sent.capitalize() | |
final = final +" "+sent | |
return final | |
def summarizer(text,model,tokenizer): | |
text = text.strip().replace("\n"," ") | |
text = "summarize: "+text | |
# print (text) | |
max_len = 512 | |
encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
outs = model.generate(input_ids=input_ids, | |
attention_mask=attention_mask, | |
early_stopping=True, | |
num_beams=3, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
min_length = 75, | |
max_length=300) | |
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
summary = dec[0] | |
summary = postprocesstext(summary) | |
summary= summary.strip() | |
return summary | |
summarized_text = summarizer(text,summary_model,summary_tokenizer) | |
print ("\noriginal Text >>") | |
for wrp in wrap(text, 150): | |
print (wrp) | |
print ("\n") | |
print ("Summarized Text >>") | |
for wrp in wrap(summarized_text, 150): | |
print (wrp) | |
print ("\n") | |
"""# **Answer Span Extraction (Keywords and Noun Phrases)**""" | |
total = 10 | |
import nltk | |
nltk.download('stopwords') | |
from nltk.corpus import stopwords | |
import string | |
import pke | |
import traceback | |
def get_nouns_multipartite(content): | |
out=[] | |
try: | |
extractor = pke.unsupervised.MultipartiteRank() | |
extractor.load_document(input=content,language='en') | |
# not contain punctuation marks or stopwords as candidates. | |
pos = {'PROPN','NOUN'} | |
#pos = {'PROPN','NOUN'} | |
stoplist = list(string.punctuation) | |
stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-'] | |
stoplist += stopwords.words('english') | |
# extractor.candidate_selection(pos=pos, stoplist=stoplist) | |
extractor.candidate_selection(pos=pos) | |
# 4. build the Multipartite graph and rank candidates using random walk, | |
# alpha controls the weight adjustment mechanism, see TopicRank for | |
# threshold/method parameters. | |
extractor.candidate_weighting(alpha=1.1, | |
threshold=0.75, | |
method='average') | |
keyphrases = extractor.get_n_best(n=15) | |
for val in keyphrases: | |
out.append(val[0]) | |
except: | |
out = [] | |
traceback.print_exc() | |
return out | |
from flashtext import KeywordProcessor | |
def get_keywords(originaltext,summarytext,total): | |
keywords = get_nouns_multipartite(originaltext) | |
print ("keywords unsummarized: ",keywords) | |
keyword_processor = KeywordProcessor() | |
for keyword in keywords: | |
keyword_processor.add_keyword(keyword) | |
keywords_found = keyword_processor.extract_keywords(summarytext) | |
keywords_found = list(set(keywords_found)) | |
print ("keywords_found in summarized: ",keywords_found) | |
important_keywords =[] | |
for keyword in keywords: | |
if keyword in keywords_found: | |
important_keywords.append(keyword) | |
return important_keywords[:total] | |
imp_keywords = get_keywords(text,summarized_text,total) | |
print (imp_keywords) | |
"""# **Question generation with T5**""" | |
question_model = T5ForConditionalGeneration.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
question_tokenizer = T5Tokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
question_model = question_model.to(device) | |
def get_question(context,answer,model,tokenizer): | |
text = "context: {} answer: {}".format(context,answer) | |
encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
outs = model.generate(input_ids=input_ids, | |
attention_mask=attention_mask, | |
early_stopping=True, | |
num_beams=5, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
max_length=72) | |
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
Question = dec[0].replace("question:","") | |
Question= Question.strip() | |
return Question | |
for wrp in wrap(summarized_text, 150): | |
print (wrp) | |
print ("\n") | |
for answer in imp_keywords: | |
ques = get_question(summarized_text,answer,question_model,question_tokenizer) | |
print (ques) | |
print (answer.capitalize()) | |
print ("\n") | |
"""# **Gradio UI Visualization**""" | |
# wget https://github.com/explosion/sense2vec/releases/download/v1.0.0/s2v_reddit_2015_md.tar.gz | |
# !tar -xvf s2v_reddit_2015_md.tar.gz | |
import numpy as np | |
from sense2vec import Sense2Vec | |
s2v = Sense2Vec().from_disk('s2v_old') | |
from sentence_transformers import SentenceTransformer | |
# paraphrase-distilroberta-base-v1 | |
sentence_transformer_model = SentenceTransformer('msmarco-distilbert-base-v3') | |
from similarity.normalized_levenshtein import NormalizedLevenshtein | |
normalized_levenshtein = NormalizedLevenshtein() | |
def filter_same_sense_words(original,wordlist): | |
filtered_words=[] | |
base_sense =original.split('|')[1] | |
print (base_sense) | |
for eachword in wordlist: | |
if eachword[0].split('|')[1] == base_sense: | |
filtered_words.append(eachword[0].split('|')[0].replace("_", " ").title().strip()) | |
return filtered_words | |
def get_highest_similarity_score(wordlist,wrd): | |
score=[] | |
for each in wordlist: | |
score.append(normalized_levenshtein.similarity(each.lower(),wrd.lower())) | |
return max(score) | |
def sense2vec_get_words(word,s2v,topn,question): | |
output = [] | |
print ("word ",word) | |
try: | |
sense = s2v.get_best_sense(word, senses= ["NOUN", "PERSON","PRODUCT","LOC","ORG","EVENT","NORP","WORK OF ART","FAC","GPE","NUM","FACILITY"]) | |
most_similar = s2v.most_similar(sense, n=topn) | |
# print (most_similar) | |
output = filter_same_sense_words(sense,most_similar) | |
print ("Similar ",output) | |
except: | |
output =[] | |
threshold = 0.6 | |
final=[word] | |
checklist =question.split() | |
for x in output: | |
if get_highest_similarity_score(final,x)<threshold and x not in final and x not in checklist: | |
final.append(x) | |
return final[1:] | |
def mmr(doc_embedding, word_embeddings, words, top_n, lambda_param): | |
# Extract similarity within words, and between words and the document | |
word_doc_similarity = cosine_similarity(word_embeddings, doc_embedding) | |
word_similarity = cosine_similarity(word_embeddings) | |
# Initialize candidates and already choose best keyword/keyphrase | |
keywords_idx = [np.argmax(word_doc_similarity)] | |
candidates_idx = [i for i in range(len(words)) if i != keywords_idx[0]] | |
for _ in range(top_n - 1): | |
# Extract similarities within candidates and | |
# between candidates and selected keywords/phrases | |
candidate_similarities = word_doc_similarity[candidates_idx, :] | |
target_similarities = np.max(word_similarity[candidates_idx][:, keywords_idx], axis=1) | |
# Calculate MMR | |
mmr = (lambda_param) * candidate_similarities - (1-lambda_param) * target_similarities.reshape(-1, 1) | |
mmr_idx = candidates_idx[np.argmax(mmr)] | |
# Update keywords & candidates | |
keywords_idx.append(mmr_idx) | |
candidates_idx.remove(mmr_idx) | |
return [words[idx] for idx in keywords_idx] | |
from collections import OrderedDict | |
from sklearn.metrics.pairwise import cosine_similarity | |
def get_distractors_wordnet(word): | |
distractors=[] | |
try: | |
syn = wn.synsets(word,'n')[0] | |
word= word.lower() | |
orig_word = word | |
if len(word.split())>0: | |
word = word.replace(" ","_") | |
hypernym = syn.hypernyms() | |
if len(hypernym) == 0: | |
return distractors | |
for item in hypernym[0].hyponyms(): | |
name = item.lemmas()[0].name() | |
#print ("name ",name, " word",orig_word) | |
if name == orig_word: | |
continue | |
name = name.replace("_"," ") | |
name = " ".join(w.capitalize() for w in name.split()) | |
if name is not None and name not in distractors: | |
distractors.append(name) | |
except: | |
print ("Wordnet distractors not found") | |
return distractors | |
def get_distractors (word,origsentence,sense2vecmodel,sentencemodel,top_n,lambdaval): | |
distractors = sense2vec_get_words(word,sense2vecmodel,top_n,origsentence) | |
print ("distractors ",distractors) | |
if len(distractors) ==0: | |
return distractors | |
distractors_new = [word.capitalize()] | |
distractors_new.extend(distractors) | |
# print ("distractors_new .. ",distractors_new) | |
embedding_sentence = origsentence+ " "+word.capitalize() | |
# embedding_sentence = word | |
keyword_embedding = sentencemodel.encode([embedding_sentence]) | |
distractor_embeddings = sentencemodel.encode(distractors_new) | |
# filtered_keywords = mmr(keyword_embedding, distractor_embeddings,distractors,4,0.7) | |
max_keywords = min(len(distractors_new),5) | |
filtered_keywords = mmr(keyword_embedding, distractor_embeddings,distractors_new,max_keywords,lambdaval) | |
# filtered_keywords = filtered_keywords[1:] | |
final = [word.capitalize()] | |
for wrd in filtered_keywords: | |
if wrd.lower() !=word.lower(): | |
final.append(wrd.capitalize()) | |
final = final[1:] | |
return final | |
sent = "What cryptocurrency did Musk rarely tweet about?" | |
keyword = "Bitcoin" | |
# sent = "What did Musk say he was working with to improve system transaction efficiency?" | |
# keyword= "Dogecoin" | |
# sent = "What company did Musk say would not accept bitcoin payments?" | |
# keyword= "Tesla" | |
# sent = "What has Musk often tweeted in support of?" | |
# keyword = "Cryptocurrency" | |
print (get_distractors(keyword,sent,s2v,sentence_transformer_model,40,0.2)) | |
"""# **Gradio Visualization with MCQs**""" | |
# import mysql.connector | |
# import datetime; | |
# mydb = mysql.connector.connect( | |
# host="qtechdb-1.cexugk1h8rui.ap-northeast-1.rds.amazonaws.com", | |
# user="admin", | |
# password="F3v2vGWzb8vaniE3nqzi", | |
# database="spring_social" | |
# ) | |
import gradio as gr | |
import re | |
context = gr.Textbox(lines=10, placeholder="Enter paragraph/content here...", label="Enter your content (words input must be more than 150 words).") | |
total = gr.Slider(1,10, value=1,step=1, label="Total Number Of Questions") | |
subject = gr.Textbox(placeholder="Enter subject/title here...", label="Enter your title (title must contain 1 word)") | |
output = gr.HTML( label="Question and Answers") | |
def generate_question_text(context,subject,total): | |
words_text = len(re.findall(r'\w+', context)) | |
words_subject = len(re.findall(r'\w+', subject)) | |
if (words_text < 150): | |
raise gr.Error("Invalid Input (Words limit must be more than 150 words).") | |
# print("Number of words:", words) | |
elif (words_subject < 1): | |
raise gr.Error("Invalid Input (Title must be one or more than one word).") | |
elif (words_subject < 1): | |
raise gr.Error("Invalid Input (Title must be one or more than one word).") | |
else: | |
summary_text = summarizer(context,summary_model,summary_tokenizer) | |
for wrp in wrap(summary_text, 150): | |
print (wrp) | |
# np = getnounphrases(summary_text,sentence_transformer_model,3) | |
np = get_keywords(context,summary_text,total) | |
random.shuffle(np) | |
print ("\n\nNoun phrases",np) | |
output="<b style='color:black;'>Select/Tick the correct answer.</b><br><br>" | |
i = 1 | |
for answer in np: | |
ques = get_question(summary_text,answer,question_model,question_tokenizer) | |
distractors = get_distractors(answer.capitalize(),ques,s2v,sentence_transformer_model,40,0.2) | |
# output= output + ques + "\n" + "Ans: "+answer.capitalize() + "\n\n" | |
output = output + "<b style='color:black;'>Q"+ str(i) + ") " + ques + "</b><br/>" | |
# output = output + "<br>" | |
i += 1 | |
answerlist=[answer.capitalize()] | |
# output = output + "<br><b> ▪ " +answer.capitalize()+ "</b>" | |
for distractor in distractors[:3]: | |
answerlist.append(distractor) | |
random.shuffle(answerlist) | |
print(answerlist) | |
for answer in answerlist: | |
output = output +answer.capitalize()+ "<br/>" | |
output = output + "<br><b style='color:black;'>" + "Correct Answer Key:</b><br>" | |
i = 1 | |
for answer in np: | |
output = output + "<b style='color:green;'>Ans"+ str(i) + ") " +answer.capitalize()+ "</b>" | |
output = output + "<br>" | |
i += 1 | |
# mycursor = mydb.cursor() | |
# timedate = datetime.datetime.now() | |
# sql = "INSERT INTO mcqstexts (subject, input, output, timedate) VALUES (%s,%s, %s,%s)" | |
# val = (subject, context, output, timedate) | |
# mycursor.execute(sql, val) | |
# mydb.commit() | |
# print(mycursor.rowcount, "record inserted.") | |
return output | |
iface = gr.Interface( | |
fn=generate_question_text, | |
inputs=[context,subject,total], | |
outputs=output, | |
allow_flagging="never",flagging_options=["Save Data"]) | |
# iface.launch(debug=True, share=True) | |
def generate_question(context,subject,total): | |
summary_text = summarizer(context,summary_model,summary_tokenizer) | |
for wrp in wrap(summary_text, 150): | |
print (wrp) | |
# np = getnounphrases(summary_text,sentence_transformer_model,3) | |
np = get_keywords(context,summary_text,total) | |
random.shuffle(np) | |
print ("\n\nNoun phrases",np) | |
output="<b style='color:black;'>Select/Tick the correct answer.</b><br><br>" | |
i = 1 | |
for answer in np: | |
ques = get_question(summary_text,answer,question_model,question_tokenizer) | |
distractors = get_distractors(answer.capitalize(),ques,s2v,sentence_transformer_model,40,0.2) | |
# output= output + ques + "\n" + "Ans: "+answer.capitalize() + "\n\n" | |
output = output + "<b style='color:black;'>Q"+ str(i) + ") " + ques + "</b><br/>" | |
# output = output + "<br>" | |
i += 1 | |
answerlist=[answer.capitalize()] | |
# output = output + "<br><b> ▪ " +answer.capitalize()+ "</b>" | |
for distractor in distractors[:3]: | |
answerlist.append(distractor) | |
random.shuffle(answerlist) | |
# print(answerlist) | |
for answer in answerlist: | |
output = output +answer.capitalize()+ "<br/>" | |
output = output + "<br><b style='color:black;'>" + "Correct Answer Key:</b><br>" | |
i = 1 | |
for answer in np: | |
output = output + "<b style='color:green;'>Ans"+ str(i) + ") " +answer.capitalize()+ "</b>" | |
output = output + "<br/>" | |
i += 1 | |
# mycursor = mydb.cursor() | |
# timedate = datetime.datetime.now() | |
# sql = "INSERT INTO mcqstexts (subject, input, output, timedate) VALUES (%s,%s, %s,%s)" | |
# val = (subject, context, output, timedate) | |
# mycursor.execute(sql, val) | |
# mydb.commit() | |
# print(mycursor.rowcount, "record inserted.") | |
return output | |
import pandas as pd | |
file =None | |
def filecreate(x,subject,total): | |
with open(x.name) as fo: | |
text = fo.read() | |
# print(text) | |
words_text = len(re.findall(r'\w+', text)) | |
words_subject = len(re.findall(r'\w+', subject)) | |
if (words_text < 150): | |
raise gr.Error("Invalid Input (Words limit must be more than 150 words).") | |
# print("Number of words:", words) | |
elif (words_subject < 1): | |
raise gr.Error("Invalid Input (Title must be one or more than one word).") | |
else: | |
generated = generate_question(text,subject, total) | |
return generated | |
# filecreate(file,2) | |
import gradio as gr | |
context = gr.HTML(label="Text") | |
file = gr.File(label="Upload your *.txt file (File must contain more than 150 words).") | |
total = gr.Slider(1,10, value=1,step=1, label="Total Number Of Questions") | |
subject = gr.Textbox(placeholder="Enter subject/title here...", label="Enter your title (title must contain 1 word).") | |
fface = gr.Interface( | |
fn=filecreate, | |
inputs=[file,subject,total], | |
outputs=context, | |
# css=".gradio-container {background-image: url('file=blue.jpg')}", | |
allow_flagging="never",flagging_options=["Save Data"]) | |
# fface.launch(debug=True, share=True) | |
demo = gr.TabbedInterface([iface, fface], ["Text", "Upload File"]) | |
demo.launch(debug=True, show_api=False) |