Spaces:
Runtime error
Runtime error
import gradio as gr | |
from transformers import AutoTokenizer, T5ForConditionalGeneration | |
import requests | |
from bs4 import BeautifulSoup | |
import time | |
import json | |
import os | |
import huggingface_hub | |
from huggingface_hub import Repository | |
import psutil | |
import xml.etree.ElementTree as ET | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
DATASET_REPO_URL = "https://huggingface.co/datasets/IronOneAILabs/custom_questions_data" | |
DATA_FILENAME = "saved_questions.xml" | |
DATA_FILE = os.path.join("data", DATA_FILENAME) | |
repo = Repository( | |
local_dir="data", clone_from=DATASET_REPO_URL, use_auth_token=HF_TOKEN | |
) | |
def get_cpu_spec(): | |
cpu_info = {} | |
# cpu_info['Brand'] = psutil.cpu_brand() | |
cpu_info['Architecture'] = psutil.cpu_freq().current / 1000 # Convert to GHz | |
cpu_info['Physical cores'] = psutil.cpu_count(logical=False) | |
cpu_info['Total cores'] = psutil.cpu_count(logical=True) | |
return cpu_info | |
def get_ram_info(): | |
ram = psutil.virtual_memory() | |
ram_info = { | |
'Total': round(ram.total / (1024 ** 3), 2), # Convert to GB | |
'Available': round(ram.available / (1024 ** 3), 2), | |
'Used': round(ram.used / (1024 ** 3), 2), | |
'Percentage': ram.percent | |
} | |
return ram_info | |
def print_system_info(): | |
cpu_spec = get_cpu_spec() | |
ram_info = get_ram_info() | |
print("CPU Specifications:") | |
for key, value in cpu_spec.items(): | |
print(f"{key}: {value}") | |
print("\nRAM Information:") | |
for key, value in ram_info.items(): | |
print(f"{key}: {value} GB") | |
print_system_info() | |
# =================================================== | |
def fetch_text(url, api_key): | |
params = {'url': url} | |
headers = {'token': api_key} | |
response = requests.get(os.environ["URL"], params=params, headers=headers) | |
print("response", response) | |
if response.status_code == 200: | |
print("response.json()", response.json()) | |
maintext_text = json.loads(response.json())['maintext'] | |
summary_text = json.loads(response.json())['description'] | |
title_text = json.loads(response.json())['title'] | |
if maintext_text is None: | |
maintext_text = "" | |
if summary_text is None: | |
summary_text = "" | |
if summary_text is None: | |
summary_text = "" | |
return maintext_text, summary_text, title_text | |
else: | |
print("API response code ", response.status_code) | |
return "" | |
# =============================================================== | |
tree = ET.parse(DATA_FILE) | |
root = tree.getroot() | |
custom_questions_from_file = [qs.text for qs in root.findall('question')] | |
print("list of qs ",custom_questions_from_file) | |
question_list = [ | |
"Which building did this happen ?", | |
"What is the name of the street where the event occurred ?", | |
"In which area or vicinity did the event happen including the street name ?", | |
"Which road/street did this happen ?", | |
"Which road did this happen ?", | |
"Which street did this happen ?", | |
"Which point of interest(POI) did this happen ?", | |
"Which city did the event described in the article occur in ?", | |
"What city is mentioned as the site of the event in the news story ?", | |
"Which city/state did this happen ?", | |
"Which city did this happen ?", | |
"Which district did this happen ?", | |
"Which place did this happen ?", | |
"Which location did this happen ?", | |
"Which country did this happen ?", | |
"What is the relevant location ?", | |
"What are the relevant locations ?", | |
"Where did this happen ?", | |
] | |
start_time = time.time() | |
model_name = "QA_model" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
end_time = time.time() | |
elapsed_time = end_time - start_time | |
print(f"Model Load Elapsed time: {elapsed_time} seconds") | |
print("=========================================== Model Loaded ============================================") | |
model = T5ForConditionalGeneration.from_pretrained(model_name) | |
max_input_length = 80 | |
with gr.Blocks() as app: | |
gr.Markdown("# QA Model testing") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("## Crawl Source") | |
with gr.Row(): | |
api_token = gr.Textbox(label="token") | |
with gr.Row(): | |
link = gr.Textbox(label="link") | |
crawl_btn = gr.Button("crawl source", interactive=True) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("## Input Text") | |
with gr.Row(): | |
title = gr.Textbox(label="title") | |
summary = gr.Textbox(label="summary") | |
maintext = gr.Textbox(label="maintext") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("## Preset Questions") | |
with gr.Row(): | |
check_question_list = gr.Checkbox(label="check_question_list 1", value=True, | |
interactive=True, | |
) | |
question_dropdown = gr.Dropdown(label="Choose a question:", choices=question_list, | |
value=question_list[0], | |
interactive=True, | |
) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("## Custom Questions") | |
with gr.Row(): | |
with gr.Column(): | |
check_custom_question = gr.Checkbox(label="check_custom_question 1", value=False, | |
interactive=True, | |
) | |
add_qs_btn = gr.Button("save question", interactive=True) | |
# custom_question = gr.Textbox(label="custom_question", | |
# interactive=False | |
# ) | |
custom_question_list = gr.Dropdown(label="Custom question ", choices=custom_questions_from_file, | |
# value=question_list[0], | |
interactive=True, | |
allow_custom_value=True | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("submit", interactive=True) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("## Output") | |
with gr.Row(): | |
output = gr.Textbox(label="output") | |
def chb1_clicked(chb_q_list): | |
print("chb_q_list", chb_q_list) | |
if chb_q_list: | |
return gr.Textbox.update(interactive=True), gr.Checkbox.update(value=False) | |
else: | |
return gr.Textbox.update(interactive=False), gr.Checkbox.update(value=True) | |
def chb2_clicked(chb_cus_qs): | |
print("t2`", chb_cus_qs) | |
if chb_cus_qs: | |
return gr.Textbox.update(interactive=True), gr.Checkbox.update(value=False) | |
else: | |
return gr.Textbox.update(interactive=False), gr.Checkbox.update(value=True) | |
def submit(title, summary, maintext, check_question_list, question_dropdown, check_custom_question, | |
custom_question): | |
print("title - ", title) | |
print("summary - ", summary) | |
print("maintext - ", maintext) | |
print("check_question_list - ", check_question_list) | |
print("dropdown - ", question_dropdown) | |
print("check_custom_question - ", check_custom_question) | |
print("custom_question - ", custom_question) | |
if check_question_list: | |
question = question_dropdown | |
else: | |
question = custom_question | |
print("question - ", question) | |
main_text_trimmed = maintext.split() | |
main_text_trimmed = main_text_trimmed[:max_input_length] | |
main_text_trimmed = ' '.join(main_text_trimmed) | |
context = title + " " + summary + " " + main_text_trimmed | |
context_question = question + "</s>" + context | |
print("main_text_trimmed - ", main_text_trimmed) | |
print("context - ", context) | |
start_time = time.time() | |
input_ids = tokenizer(context_question, return_tensors="pt").input_ids | |
end_time = time.time() | |
elapsed_time = end_time - start_time | |
print(f"Input Tokenizing Elapsed time: {elapsed_time} seconds") | |
start_time = time.time() | |
outputs = model.generate(input_ids, max_new_tokens=50) | |
end_time = time.time() | |
elapsed_time = end_time - start_time | |
print(f"Output Generating Elapsed time: {elapsed_time} seconds") | |
start_time = time.time() | |
answer = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
end_time = time.time() | |
elapsed_time = end_time - start_time | |
print(f"Output Decoding Elapsed time: {elapsed_time} seconds") | |
return answer | |
def save_qs(custom_question): | |
repo.pull_from_hub() | |
tree = ET.ElementTree() | |
tree.parse(DATA_FILE) | |
root = tree.getroot() | |
name_elem = ET.SubElement(root, 'question') | |
name_elem.text = custom_question | |
tree.write(DATA_FILE) | |
commit_url = repo.push_to_hub() | |
print(commit_url) | |
tree = ET.parse(DATA_FILE) | |
root = tree.getroot() | |
custom_questions_from_file = [qs.text for qs in root.findall('question')] | |
print("list of qs ",custom_questions_from_file) | |
# question_dropdown.choices = names | |
print("custom_question",custom_question) | |
return gr.Dropdown.update(choices=custom_questions_from_file, value = custom_questions_from_file[-1]) | |
check_question_list.change(fn=chb1_clicked, inputs=check_question_list, | |
outputs=[question_dropdown, check_custom_question]) | |
check_custom_question.change(fn=chb2_clicked, inputs=check_custom_question, | |
outputs=[custom_question_list, check_question_list]) | |
crawl_btn.click(fn=fetch_text, inputs=[link, api_token], outputs=[maintext, summary, title]) | |
submit_btn.click(fn=submit, | |
inputs=[title, summary, maintext, check_question_list, question_dropdown, check_custom_question, | |
custom_question_list], outputs=output) | |
add_qs_btn.click(fn=save_qs ,inputs=custom_question_list , outputs=custom_question_list) | |
app.launch() |