Spaces:
Running
Running
import json | |
import os | |
import spacy | |
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer | |
import gradio as gr | |
from huggingface_hub import Repository | |
from datetime import datetime | |
from spacy.cli import download | |
# Load or download spaCy model | |
try: | |
nlp = spacy.load("en_core_web_sm") | |
except OSError: | |
print("Downloading 'en_core_web_sm' model...") | |
download("en_core_web_sm") | |
nlp = spacy.load("en_core_web_sm") | |
# Load Question Generation model | |
qg_model = AutoModelForSeq2SeqLM.from_pretrained("valhalla/t5-base-qa-qg-hl") | |
qg_tokenizer = AutoTokenizer.from_pretrained("valhalla/t5-base-qa-qg-hl", use_fast=True) | |
qg_pipeline = pipeline("text2text-generation", model=qg_model, tokenizer=qg_tokenizer) | |
# Global variable to accumulate Q&A | |
batch_data = [] | |
# Utility functions | |
def extract_paragraph_facts(raw_text): | |
return [p.strip() for p in raw_text.strip().split("\n\n") if p.strip()] | |
def extract_noun_phrases(text): | |
doc = nlp(text) | |
return [np.text for np in doc.noun_chunks] | |
def auto_highlight_noun_phrase(text): | |
doc = nlp(text) | |
noun_phrases = sorted(doc.noun_chunks, key=lambda np: len(np.text), reverse=True) | |
for np in noun_phrases: | |
if len(np.text.split()) > 1 or np.root.pos_ == "NOUN": | |
return np.text | |
return text | |
def highlight_selected_phrase(fact, selected_np): | |
return fact.replace(selected_np, f"<hl>{selected_np}<hl>", 1) | |
def generate_single_qna(fact, noun_phrase, min_len, max_len, temperature, top_k, top_p): | |
hl_fact = highlight_selected_phrase(fact, noun_phrase) | |
try: | |
prompt = f"generate question: {hl_fact}" | |
output = qg_pipeline( | |
prompt, | |
min_length=min_len, | |
max_length=max_len, | |
temperature=temperature, | |
top_k=top_k, | |
top_p=top_p, | |
do_sample=True | |
)[0] | |
question = output.get("generated_text", "").strip() | |
if not question.endswith("?"): | |
question += "?" | |
except Exception as e: | |
question = f"Error generating question: {str(e)}" | |
return {"question": question, "answer": fact} | |
def generate_qna_all(input_text, selected_fact, selected_np, min_len, max_len, temperature, top_k, top_p): | |
facts = extract_paragraph_facts(input_text) | |
global batch_data | |
if selected_fact: | |
noun_phrase = selected_np if selected_np else auto_highlight_noun_phrase(selected_fact) | |
result = generate_single_qna(selected_fact, noun_phrase, min_len, max_len, temperature, top_k, top_p) | |
batch_data.append(result) | |
else: | |
for fact in facts: | |
noun_phrase = auto_highlight_noun_phrase(fact) | |
result = generate_single_qna(fact, noun_phrase, min_len, max_len, temperature, top_k, top_p) | |
batch_data.append(result) | |
return json.dumps(batch_data, indent=2, ensure_ascii=False), json.dumps(batch_data, indent=2, ensure_ascii=False) | |
def save_json_to_dataset(): | |
try: | |
if not batch_data: | |
return "β No data to save. Generate some Q&A first." | |
hf_token = os.environ.get("QandA_Generator") | |
if not hf_token: | |
return "β HF_TOKEN not found in environment." | |
repo_id = "UniversityAIChatbot/University_Inquiries_AI_Chatbot" | |
target_file = "dataset.json" | |
local_dir = "hf_repo" | |
repo = Repository( | |
local_dir=local_dir, | |
clone_from=repo_id, | |
use_auth_token=hf_token, | |
repo_type="space" | |
) | |
repo.git_pull() | |
full_path = os.path.join(local_dir, target_file) | |
if os.path.exists(full_path): | |
with open(full_path, "r", encoding="utf-8") as f: | |
existing_data = json.load(f) | |
else: | |
existing_data = [] | |
now = datetime.now() | |
for entry in batch_data: | |
entry["month"] = now.strftime("%B") | |
entry["year"] = now.year | |
updated_data = existing_data + batch_data | |
with open(full_path, "w", encoding="utf-8") as f: | |
json.dump(updated_data, f, indent=2, ensure_ascii=False) | |
repo.push_to_hub(commit_message="π₯ Add new Q&A to database.json") | |
batch_data.clear() | |
return "β Data with timestamp successfully pushed to Space!" | |
except Exception as e: | |
return f"β Error: {str(e)}" | |
# New: Preview function | |
def preview_batch_data(): | |
return json.dumps(batch_data, indent=2, ensure_ascii=False) | |
# New: Append from manual JSON editor | |
def append_json_to_batch(json_text): | |
global batch_data | |
try: | |
new_data = json.loads(json_text) | |
if isinstance(new_data, dict): | |
new_data = [new_data] | |
if not isinstance(new_data, list): | |
return "β Invalid format. Must be a list or object.", preview_batch_data() | |
batch_data.extend(new_data) | |
return "β Successfully appended to batch_data.", preview_batch_data() | |
except Exception as e: | |
return f"β Error: {str(e)}", preview_batch_data() | |
# Dropdown callbacks | |
def on_extract_facts(text): | |
facts = extract_paragraph_facts(text) | |
default_fact = facts[0] if facts else None | |
return gr.update(choices=facts, value=default_fact), gr.update(choices=[], value=None) | |
def on_select_fact(fact): | |
noun_phrases = extract_noun_phrases(fact) | |
return gr.update(choices=noun_phrases, value=noun_phrases[0] if noun_phrases else None) | |
# UI | |
def main(): | |
with gr.Blocks() as demo: | |
gr.Markdown("## Paragraph-to-Question Generator (Auto Q&A for HF Dataset)") | |
input_text = gr.Textbox(lines=10, label="Enter Data (Seperated by paragraph per question)") | |
with gr.Accordion("βοΈ Customize Question Generation", open=False): | |
extract_btn = gr.Button("Extract & Customize") | |
fact_dropdown = gr.Dropdown(label="Select a Fact", interactive=True) | |
np_dropdown = gr.Dropdown(label="Select Noun Phrase to Highlight (optional)", interactive=True) | |
extract_btn.click(fn=on_extract_facts, inputs=input_text, outputs=[fact_dropdown, np_dropdown]) | |
fact_dropdown.change(fn=on_select_fact, inputs=fact_dropdown, outputs=np_dropdown) | |
gr.Markdown("π½ **Min Length**: Minimum number of tokens in the generated question.") | |
min_len = gr.Slider(5, 50, value=10, step=1, label="Min Length") | |
gr.Markdown("πΌ **Max Length**: Maximum number of tokens in the generated question.") | |
max_len = gr.Slider(20, 100, value=64, step=1, label="Max Length") | |
gr.Markdown("π‘οΈ **Temperature**: Controls randomness. Lower = more predictable, higher = more creative.") | |
temperature = gr.Slider(0.1, 1.5, value=1.0, step=0.1, label="Temperature") | |
gr.Markdown("π― **Top-k Sampling**: Limits sampling to the top-k most likely words.") | |
top_k = gr.Slider(0, 100, value=50, step=1, label="Top-k") | |
gr.Markdown("π² **Top-p (Nucleus Sampling)**: Selects from the smallest set of words with a cumulative probability > p.") | |
top_p = gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p") | |
gr.Markdown("βοΈ You can manually edit the generated JSON here or paste your own in the same format.") | |
output_json = gr.Textbox( | |
lines=14, | |
label="Q&A JSON", | |
interactive=True, | |
placeholder='[\n{\n"question": "Your question?",\n"answer": "Your answer."\n}\n]' | |
) | |
preview_box = gr.Textbox( | |
lines=14, | |
label="π¦ Preview", | |
interactive=False | |
) | |
with gr.Row(): | |
generate_btn = gr.Button("Generate Q&A") | |
append_btn = gr.Button("β Add to Dataset") | |
send_btn = gr.Button("π€ Send to Dataset") | |
send_status = gr.Textbox(label="Save Status", interactive=False) | |
generate_btn.click( | |
fn=generate_qna_all, | |
inputs=[input_text, fact_dropdown, np_dropdown, min_len, max_len, temperature, top_k, top_p], | |
outputs=[output_json, preview_box] | |
) | |
append_btn.click( | |
fn=append_json_to_batch, | |
inputs=output_json, | |
outputs=[send_status, preview_box] | |
) | |
send_btn.click(fn=save_json_to_dataset, inputs=None, outputs=send_status) | |
demo.launch() | |
if __name__ == "__main__": | |
main() | |