Spaces:
Running
Running
import streamlit as st | |
from transformers import T5ForConditionalGeneration, T5Tokenizer | |
import spacy | |
import nltk | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from rake_nltk import Rake | |
import pandas as pd | |
from fpdf import FPDF | |
import wikipediaapi | |
from functools import lru_cache | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
nltk.download('brown') | |
from nltk.tokenize import sent_tokenize | |
nltk.download('wordnet') | |
from nltk.corpus import wordnet | |
import random | |
import sense2vec | |
from wordcloud import WordCloud | |
import matplotlib.pyplot as plt | |
import json | |
import os | |
from sentence_transformers import SentenceTransformer, util | |
import textstat | |
from spellchecker import SpellChecker | |
from transformers import pipeline | |
import re | |
import pymupdf | |
import uuid | |
print("***************************************************************") | |
st.set_page_config( | |
page_title="Question Generator", | |
initial_sidebar_state="auto", | |
menu_items={ | |
"About" : "#Hi this our project." | |
} | |
) | |
# Initialize Wikipedia API with a user agent | |
user_agent = 'QGen/1.0 (channingfisher7@gmail.com)' | |
wiki_wiki = wikipediaapi.Wikipedia(user_agent= user_agent,language='en') | |
def get_session_id(): | |
if 'session_id' not in st.session_state: | |
st.session_state.session_id = str(uuid.uuid4()) | |
return st.session_state.session_id | |
def initialize_state(session_id): | |
if 'session_states' not in st.session_state: | |
st.session_state.session_states = {} | |
if session_id not in st.session_state.session_states: | |
st.session_state.session_states[session_id] = { | |
'generated_questions': [], | |
# add other state variables as needed | |
} | |
return st.session_state.session_states[session_id] | |
def get_state(session_id): | |
return st.session_state.session_states[session_id] | |
def set_state(session_id, key, value): | |
st.session_state.session_states[session_id][key] = value | |
def load_model(): | |
model_name = "DevBM/t5-large-squad" | |
model = T5ForConditionalGeneration.from_pretrained(model_name) | |
tokenizer = T5Tokenizer.from_pretrained(model_name) | |
return model, tokenizer | |
# Load Spacy Model | |
def load_nlp_models(): | |
nlp = spacy.load("en_core_web_md") | |
s2v = sense2vec.Sense2Vec().from_disk('s2v_old') | |
return nlp, s2v | |
# Load Quality Assurance Models | |
def load_qa_models(): | |
# Initialize BERT model for sentence similarity | |
similarity_model = SentenceTransformer('all-MiniLM-L6-v2') | |
spell = SpellChecker() | |
return similarity_model, spell | |
nlp, s2v = load_nlp_models() | |
model, tokenizer = load_model() | |
similarity_model, spell = load_qa_models() | |
context_model = similarity_model | |
def get_pdf_text(pdf_file): | |
doc = pymupdf.open(stream=pdf_file.read(), filetype="pdf") | |
text = "" | |
for page_num in range(doc.page_count): | |
page = doc.load_page(page_num) | |
text += page.get_text() | |
return text | |
def save_feedback(question, answer,rating): | |
feedback_file = 'question_feedback.json' | |
if os.path.exists(feedback_file): | |
with open(feedback_file, 'r') as f: | |
feedback_data = json.load(f) | |
else: | |
feedback_data = [] | |
tpl = { | |
'question' : question, | |
'answer' : answer, | |
'rating' : rating, | |
} | |
# feedback_data[question] = rating | |
feedback_data.append(tpl) | |
with open(feedback_file, 'w') as f: | |
json.dump(feedback_data, f) | |
# Function to clean text | |
def clean_text(text): | |
text = re.sub(r"[^\x00-\x7F]", " ", text) | |
return text | |
# Function to create text chunks | |
def segment_text(text, max_segment_length=1000): | |
"""Segment the text into smaller chunks.""" | |
sentences = sent_tokenize(text) | |
segments = [] | |
current_segment = "" | |
for sentence in sentences: | |
if len(current_segment) + len(sentence) <= max_segment_length: | |
current_segment += sentence + " " | |
else: | |
segments.append(current_segment.strip()) | |
current_segment = sentence + " " | |
if current_segment: | |
segments.append(current_segment.strip()) | |
print(f"\n\nSegement Chunks: {segments}\n\n") | |
return segments | |
# Function to extract keywords using combined techniques | |
def extract_keywords(text, extract_all): | |
doc = nlp(text) | |
spacy_keywords = set([ent.text for ent in doc.ents]) | |
spacy_entities = spacy_keywords | |
print(f"\n\nSpacy Entities: {spacy_entities} \n\n") | |
# Use Only Spacy Entities | |
if extract_all is False: | |
return list(spacy_entities) | |
# Use RAKE | |
rake = Rake() | |
rake.extract_keywords_from_text(text) | |
rake_keywords = set(rake.get_ranked_phrases()) | |
print(f"\n\nRake Keywords: {rake_keywords} \n\n") | |
# Use spaCy for NER and POS tagging | |
spacy_keywords.update([token.text for token in doc if token.pos_ in ["NOUN", "PROPN", "VERB", "ADJ"]]) | |
print(f"\n\nSpacy Keywords: {spacy_keywords} \n\n") | |
# Use TF-IDF | |
vectorizer = TfidfVectorizer(stop_words='english') | |
X = vectorizer.fit_transform([text]) | |
tfidf_keywords = set(vectorizer.get_feature_names_out()) | |
print(f"\n\nTFIDF Entities: {tfidf_keywords} \n\n") | |
# Combine all keywords | |
combined_keywords = rake_keywords.union(spacy_keywords).union(tfidf_keywords) | |
return list(combined_keywords) | |
def get_similar_words_sense2vec(word, n=3): | |
# Try to find the word with its most likely part-of-speech | |
word_with_pos = word + "|NOUN" | |
if word_with_pos in s2v: | |
similar_words = s2v.most_similar(word_with_pos, n=n) | |
return [word.split("|")[0] for word, _ in similar_words] | |
# If not found, try without POS | |
if word in s2v: | |
similar_words = s2v.most_similar(word, n=n) | |
return [word.split("|")[0] for word, _ in similar_words] | |
return [] | |
def get_synonyms(word, n=3): | |
synonyms = [] | |
for syn in wordnet.synsets(word): | |
for lemma in syn.lemmas(): | |
if lemma.name() != word and lemma.name() not in synonyms: | |
synonyms.append(lemma.name()) | |
if len(synonyms) == n: | |
return synonyms | |
return synonyms | |
def generate_options(answer, context, n=3): | |
options = [answer] | |
# Add contextually relevant words using a pre-trained model | |
context_embedding = context_model.encode(context) | |
answer_embedding = context_model.encode(answer) | |
context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] | |
# Compute similarity scores and sort context words | |
similarity_scores = [util.pytorch_cos_sim(context_model.encode(word), answer_embedding).item() for word in context_words] | |
sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)] | |
options.extend(sorted_context_words[:n]) | |
# Try to get similar words based on sense2vec | |
similar_words = get_similar_words_sense2vec(answer, n) | |
options.extend(similar_words) | |
# If we don't have enough options, try synonyms | |
if len(options) < n + 1: | |
synonyms = get_synonyms(answer, n - len(options) + 1) | |
options.extend(synonyms) | |
# If we still don't have enough options, extract other entities from the context | |
if len(options) < n + 1: | |
doc = nlp(context) | |
entities = [ent.text for ent in doc.ents if ent.text.lower() != answer.lower()] | |
options.extend(entities[:n - len(options) + 1]) | |
# If we still need more options, add some random words from the context | |
if len(options) < n + 1: | |
context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] | |
options.extend(random.sample(context_words, min(n - len(options) + 1, len(context_words)))) | |
print(f"\n\nAll Possible Options: {options}\n\n") | |
# Ensure we have the correct number of unique options | |
options = list(dict.fromkeys(options))[:n+1] | |
# Shuffle the options | |
random.shuffle(options) | |
return options | |
# Function to map keywords to sentences with customizable context window size | |
def map_keywords_to_sentences(text, keywords, context_window_size): | |
sentences = sent_tokenize(text) | |
keyword_sentence_mapping = {} | |
print(f"\n\nSentences: {sentences}\n\n") | |
for keyword in keywords: | |
for i, sentence in enumerate(sentences): | |
if keyword in sentence: | |
# Combine current sentence with surrounding sentences for context | |
start = max(0, i - context_window_size) | |
end = min(len(sentences), i + context_window_size + 1) | |
context = ' '.join(sentences[start:end]) | |
if keyword not in keyword_sentence_mapping: | |
keyword_sentence_mapping[keyword] = context | |
else: | |
keyword_sentence_mapping[keyword] += ' ' + context | |
return keyword_sentence_mapping | |
# Function to perform entity linking using Wikipedia API | |
def entity_linking(keyword): | |
page = wiki_wiki.page(keyword) | |
if page.exists(): | |
return page.fullurl | |
return None | |
# Function to generate questions using beam search | |
def generate_question(context, answer, num_beams): | |
input_text = f"<context> {context} <answer> {answer}" | |
input_ids = tokenizer.encode(input_text, return_tensors='pt') | |
outputs = model.generate(input_ids, num_beams=num_beams, early_stopping=True) | |
question = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return question | |
# Function to export questions to CSV | |
def export_to_csv(data): | |
# df = pd.DataFrame(data, columns=["Context", "Answer", "Question", "Options"]) | |
df = pd.DataFrame(data) | |
# csv = df.to_csv(index=False,encoding='utf-8') | |
csv = df.to_csv(index=False) | |
return csv | |
# Function to export questions to PDF | |
def export_to_pdf(data): | |
pdf = FPDF() | |
pdf.add_page() | |
pdf.set_font("Arial", size=12) | |
for item in data: | |
pdf.multi_cell(0, 10, f"Context: {item['context']}") | |
pdf.multi_cell(0, 10, f"Question: {item['question']}") | |
pdf.multi_cell(0, 10, f"Answer: {item['answer']}") | |
pdf.multi_cell(0, 10, f"Options: {', '.join(item['options'])}") | |
pdf.multi_cell(0, 10, f"Overall Score: {item['overall_score']:.2f}") | |
pdf.ln(10) | |
return pdf.output(dest='S').encode('latin-1') | |
def display_word_cloud(generated_questions): | |
word_frequency = {} | |
for question in generated_questions: | |
words = question.split() | |
for word in words: | |
word_frequency[word] = word_frequency.get(word, 0) + 1 | |
wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(word_frequency) | |
plt.figure(figsize=(10, 5)) | |
plt.imshow(wordcloud, interpolation='bilinear') | |
plt.axis('off') | |
st.pyplot() | |
def assess_question_quality(context, question, answer): | |
# Assess relevance using cosine similarity | |
context_doc = nlp(context) | |
question_doc = nlp(question) | |
relevance_score = context_doc.similarity(question_doc) | |
# Assess complexity using token length (as a simple metric) | |
complexity_score = min(len(question_doc) / 20, 1) # Normalize to 0-1 | |
# Assess Spelling correctness | |
misspelled = spell.unknown(question.split()) | |
spelling_correctness = 1 - (len(misspelled) / len(question.split())) # Normalize to 0-1 | |
# Calculate overall score (you can adjust weights as needed) | |
overall_score = ( | |
0.4 * relevance_score + | |
0.4 * complexity_score + | |
0.2 * spelling_correctness | |
) | |
return overall_score, relevance_score, complexity_score, spelling_correctness | |
def main(): | |
# Streamlit interface | |
st.title(":blue[Question Generator System]") | |
session_id = get_session_id() | |
state = initialize_state(session_id) | |
# Initialize session state | |
if 'generated_questions' not in st.session_state: | |
st.session_state.generated_questions = [] | |
with st.sidebar: | |
st.subheader("Customization Options") | |
# Customization options | |
input_type = st.radio("Select Input Preference", ("Text Input","Upload PDF")) | |
num_beams = st.slider("Select number of beams for question generation", min_value=1, max_value=10, value=5) | |
context_window_size = st.slider("Select context window size (number of sentences before and after)", min_value=1, max_value=5, value=1) | |
num_questions = st.slider("Select number of questions to generate", min_value=1, max_value=1000, value=5) | |
with st.expander("Choose the Additional Elements to show"): | |
show_context = st.checkbox("Context",True) | |
show_answer = st.checkbox("Answer",True) | |
show_options = st.checkbox("Options",False) | |
show_entity_link = st.checkbox("Entity Link For Wikipedia",True) | |
show_qa_scores = st.checkbox("QA Score",False) | |
col1, col2 = st.columns(2) | |
with col1: | |
extract_all_keywords = st.toggle("Extract Max Keywords",value=False) | |
with col2: | |
enable_feedback_mode = st.toggle("Enable Feedback Mode",False) | |
text = None | |
if input_type == "Text Input": | |
text = st.text_area("Enter text here:", value="Joe Biden, the current US president is on a weak wicket going in for his reelection later this November against former President Donald Trump.") | |
elif input_type == "Upload PDF": | |
file = st.file_uploader("Upload PDF Files") | |
if file is not None: | |
text = get_pdf_text(file) | |
if text: | |
text = clean_text(text) | |
segments = segment_text(text) | |
generate_questions_button = st.button("Generate Questions") | |
if generate_questions_button and text: | |
state['generated_questions'] = [] | |
# st.session_state.generated_questions = [] | |
for text in segments: | |
keywords = extract_keywords(text, extract_all_keywords) | |
print(f"\n\nFinal Keywords in Main Function: {keywords}\n\n") | |
keyword_sentence_mapping = map_keywords_to_sentences(text, keywords, context_window_size) | |
for i, (keyword, context) in enumerate(keyword_sentence_mapping.items()): | |
if i >= num_questions: | |
break | |
question = generate_question(context, keyword, num_beams=num_beams) | |
options = generate_options(keyword,context) | |
overall_score, relevance_score, complexity_score, spelling_correctness = assess_question_quality(context,question,keyword) | |
if overall_score < 0.5: | |
continue | |
tpl = { | |
"question" : question, | |
"context" : context, | |
"answer" : keyword, | |
"options" : options, | |
"overall_score" : overall_score, | |
"relevance_score" : relevance_score, | |
"complexity_score" : complexity_score, | |
"spelling_correctness" : spelling_correctness, | |
} | |
# st.session_state.generated_questions.append(tpl) | |
state['generated_questions'].append(tpl) | |
set_state(session_id, 'generated_questions', state['generated_questions']) | |
# sort question based on their quality score | |
# st.session_state.generated_questions = sorted(st.session_state.generated_questions,key = lambda x: x['overall_score'], reverse=True) | |
state['generated_questions'] = sorted(state['generated_questions'],key = lambda x: x['overall_score'], reverse=True) | |
# Display generated questions | |
# if st.session_state.generated_questions: | |
if state['generated_questions']: | |
st.header("Generated Questions:",divider='blue') | |
for i, q in enumerate(st.session_state.generated_questions): | |
# with st.expander(f"Question {i+1}"): | |
st.subheader(body=f":orange[Q{i+1}:] {q['question']}") | |
if show_context is True: | |
st.write(f"**Context:** {q['context']}") | |
if show_answer is True: | |
st.write(f"**Answer:** {q['answer']}") | |
if show_options is True: | |
st.write(f"**Options:**") | |
for j, option in enumerate(q['options']): | |
st.write(f"{chr(65+j)}. {option}") | |
if show_entity_link is True: | |
linked_entity = entity_linking(q['answer']) | |
if linked_entity: | |
st.write(f"**Entity Link:** {linked_entity}") | |
if show_qa_scores is True: | |
m1,m2,m3,m4 = st.columns([1.7,1,1,1]) | |
m1.metric("Overall Quality Score", value=f"{q['overall_score']:,.2f}") | |
m2.metric("Relevance Score", value=f"{q['relevance_score']:,.2f}") | |
m3.metric("Complexity Score", value=f"{q['complexity_score']:,.2f}") | |
m4.metric("Spelling Correctness", value=f"{q['spelling_correctness']:,.2f}") | |
# q['context'] = st.text_area(f"Edit Context {i+1}:", value=q['context'], key=f"context_{i}") | |
if enable_feedback_mode: | |
q['question'] = st.text_input(f"Edit Question {i+1}:", value=q['question'], key=f"question_{i}") | |
q['rating'] = st.selectbox(f"Rate this question (1-5)", options=[1, 2, 3, 4, 5], key=f"rating_{i}") | |
if st.button(f"Submit Feedback for Question {i+1}", key=f"submit_{i}"): | |
save_feedback(q['question'], q['answer'], q['rating']) | |
st.success(f"Feedback submitted for Question {i+1}") | |
st.write("---") | |
# Export buttons | |
# if st.session_state.generated_questions: | |
if state['generated_questions']: | |
with st.sidebar: | |
csv_data = export_to_csv(st.session_state.generated_questions) | |
st.download_button(label="Download CSV", data=csv_data, file_name='questions.csv', mime='text/csv') | |
pdf_data = export_to_pdf(st.session_state.generated_questions) | |
st.download_button(label="Download PDF", data=pdf_data, file_name='questions.pdf', mime='application/pdf') | |
# View Feedback Statistics | |
with st.expander("View Feedback Statistics"): | |
feedback_file = 'question_feedback.json' | |
if os.path.exists(feedback_file): | |
with open(feedback_file, 'r') as f: | |
feedback_data = json.load(f) | |
st.subheader("Feedback Statistics") | |
# Calculate average rating | |
ratings = [feedback['rating'] for feedback in feedback_data] | |
avg_rating = sum(ratings) / len(ratings) if ratings else 0 | |
st.write(f"Average Question Rating: {avg_rating:.2f}") | |
# Show distribution of ratings | |
rating_counts = {i: ratings.count(i) for i in range(1, 6)} | |
st.bar_chart(rating_counts) | |
# Show some highly rated questions | |
st.subheader("Highly Rated Questions") | |
sorted_feedback = sorted(feedback_data, key=lambda x: x['rating'], reverse=True) | |
top_questions = sorted_feedback[:5] | |
for feedback in top_questions: | |
st.write(f"Question: {feedback['question']}") | |
st.write(f"Answer: {feedback['answer']}") | |
st.write(f"Rating: {feedback['rating']}") | |
st.write("---") | |
else: | |
st.write("No feedback data available yet.") | |
print("********************************************************************************") | |
if __name__ == '__main__': | |
main() |