c-reviewer / main.py
ml-visoft's picture
WIP login w HF
f8637f2
raw
history blame
30.2 kB
import json
import uuid
import os
import pathlib
from uuid import uuid4
from datetime import datetime
import dataclasses
import html
import base64
import requests
import secrets
from fasthtml.common import *
from fastcore.all import typedispatch
from sqlite_minutils.db import Database
from huggingface_hub import CommitScheduler
import eval_code
import storage
# the secrets. Will be loaded from HF, or for docker --env-file or from IDE
OAUTH_CLIENT_ID = os.environ.get('OAUTH_CLIENT_ID')
OAUTH_SCOPES = os.environ.get('OAUTH_SCOPES')
OAUTH_CLIENT_SECRET = os.environ.get('OAUTH_CLIENT_SECRET')
OPENID_PROVIDER_URL = os.environ.get("OPENID_PROVIDER_URL", "https://huggingface.co")
SPACE_HOST = os.environ.get('SPACE_HOST')
HF_DATASET_AUTH_TOKEN = os.environ.get('HF_DATASET_AUTH_TOKEN', "none")
LOCAL_STORAGE_PATH = ""
datetime_now = datetime.isoformat(datetime.utcnow())
FILE_EVENTS_NAME = f"events-{datetime_now}-{uuid4()}.jsonl"
FILE_SUBMITTED_NAME = f"submitted-{datetime_now}-{uuid4()}.jsonl"
if "localhost" in SPACE_HOST:
IS_LOCALHOST = True
else:
IS_LOCALHOST = False
if IS_LOCALHOST:
DATABASE_NAME = "data/sessions_meta.db"
LOCAL_STORAGE_PATH = Path("data/persistent/")
pathlib.Path(DATABASE_NAME).unlink(missing_ok=True)
print(f"Database {DATABASE_NAME=} exists? {pathlib.Path(DATABASE_NAME).exists()}")
else:
DATABASE_NAME = "/tmp/cache/sessions_meta.db"
LOCAL_STORAGE_PATH = Path("/tmp/cache/persistent")
LOCAL_STORAGE_PATH.mkdir(exist_ok=True, parents=True)
EVENTS_FILE_PATH = LOCAL_STORAGE_PATH / FILE_EVENTS_NAME
SUBMITTED_FILE_PATH = LOCAL_STORAGE_PATH / FILE_SUBMITTED_NAME
scheduler = None
try:
scheduler = CommitScheduler(
repo_id="ml-visoft/c-reviewer",
repo_type="dataset",
folder_path=LOCAL_STORAGE_PATH,
every=10,
path_in_repo="raw_data",
token=HF_DATASET_AUTH_TOKEN,
squash_history=False
)
except:
import traceback
traceback.print_exc()
global_database = Database(DATABASE_NAME)
global_database_tables = global_database.t
# We store session specific feedback from registered users.
# Will be later used to submit/storage the answer.
question_evaluation_table = global_database_tables.question_answer
if question_evaluation_table not in global_database_tables:
question_evaluation_table.create(id=int, code_text=str,
answer_eval_text=str, submitted=int, pk='id')
Question_Evaluation_cls = question_evaluation_table.dataclass()
# We store real-time session IDs and the state of the questions. Will link to question_answer
session_state_table = global_database_tables.session_state
if session_state_table not in global_database_tables:
# session_id stored in cookies
# see EVAL_STATUS_x below for state
session_state_table.create(id=int, session_id=str, state=int, submitted_date=str, evaluated_date=str,
current_qeval=int, pk='id',)
# Can't really nail the fk specs
# foreign_keys=[("current_qeval", question_evaluation_table, "id")])
Session_State_cls = session_state_table.dataclass()
EVAL_STATE_NEW=0
EVAL_STATE_QUERY=1
EVAL_STATE_TIMEDOUT=2
EVAL_STATE_ANSWER=3
EVAL_STATE_ERROR=4
# Constants to name the various HTML ids in the code
HTML_SUBMIT_CODE_AREA = "submit_code_area"
HTML_RESULTS_AREA = "prompt_response"
HTML_CLEAR_FORM = "clear_the_form"
HTML_SUBMIT_FEEDBACK = "submit_feedback"
HTML_USER_DATA = "login_user_data"
def get_openid_configuration():
config_url = OPENID_PROVIDER_URL + "/.well-known/openid-configuration"
try:
response = requests.get(config_url)
response.raise_for_status() # Raises an HTTPError for bad responses (4xx or 5xx)
config = response.json()
token_endpoint = config.get('token_endpoint')
userinfo_endpoint = config.get('userinfo_endpoint')
return token_endpoint, userinfo_endpoint
except requests.RequestException as e:
print(f"An error occurred: {e}")
return None, None
# Use the function
HF_OAUTH_TOKEN_URL, HF_OAUTH_USERINFO = get_openid_configuration()
if HF_OAUTH_TOKEN_URL and HF_OAUTH_USERINFO:
print(f"Token Endpoint: {HF_OAUTH_TOKEN_URL}")
print(f"UserInfo Endpoint: {HF_OAUTH_USERINFO}")
else:
print("Failed to retrieve the endpoints.")
hdrs = (
HighlightJS(langs=['python', 'javascript', 'html', 'css']),
)
if IS_LOCALHOST:
# Are we hacking locally?
print("Localhost detected in SPACE_HOST. App started in debug+live mode!")
app, rt = fast_app(debug=True, live=True, hdrs=hdrs)
REFRESH_TIME = 0.1
else:
app, rt = fast_app(debug=False, live=False, hdrs=hdrs)
REFRESH_TIME = 1
################# STORAGE
def untyped_save_to_storage(dc, filename):
if scheduler is None and IS_LOCALHOST:
# this is on local system, dump without checking for backup strategy
# TODO duplicate code, refactor!
js_str = json.dumps(dataclasses.asdict(dc)) + "\n"
with open(filename, "a") as f:
f.write(js_str)
return
with scheduler.lock:
js_str = json.dumps(dataclasses.asdict(dc)) + "\n"
with open(filename, "a") as f:
f.write(js_str)
@typedispatch
def save_to_storage(nav_event:storage.NavigationEvent):
untyped_save_to_storage(nav_event, EVENTS_FILE_PATH)
@typedispatch
def save_to_storage(eval_event:storage.CodeSubmittedEvent):
untyped_save_to_storage(eval_event, SUBMITTED_FILE_PATH)
########## EVALUATION FEEDBACK
def validate_and_get_question_evaluation_objectid(session, qe_id:int):
if 'session_id' not in session:
print("validate_and_get_question_evaluation_objectid bad session data")
return False, None, None
session_id = session["session_id"]
state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
if len(state_rows) <= 0:
print("validate_and_get_question_evaluation_objectid there is no state")
return False, None, None
answer_id = state_rows[0].current_qeval
qa_obj_row = question_evaluation_table(limit=1, where=f"id == {answer_id}")
if len(qa_obj_row) <= 0:
print("validate_and_get_question_evaluation_objectid There is no answer recorded")
return False, None, None
qe_obj = qa_obj_row[0]
if qe_id != qe_obj.id:
print("validate_and_get_question_evaluation_objectid QE {qe_id} does not belong to {qe_obj.id}")
return False, None, None
return True, qe_obj, state_rows[0]
def html_create_feedback_updown_button(qe_id, ans_id, selected=0, disabled=False):
html_target_id = f"buttons_{ans_id}"
colors = ["grey", "blue"]
up_col = colors[0]
down_col = colors[0]
if selected == 1: up_col = colors[1]
if selected == -1: down_col = colors[1]
toggle_url = f"/toggle_up_down/{qe_id}/{ans_id}"
up = Button("+", hx_get=f"{toggle_url}/1", hx_swap="outerHTML",
target_id=f"{html_target_id}", style=f"background-color:{up_col}")
down = Button("-", hx_get=f"{toggle_url}/-1", disabled=disabled, hx_swap="outerHTML",
hx_preserve=True, hx_trigger="click", hx_push_url="false",
target_id=f"{html_target_id}", style=f"background-color:{down_col}")
button_row = Div(up, down, id=html_target_id)
return button_row
def html_augment_evaluation_text_with_feedback(eval_html, qe_id, ans_id, selected=0):
"""
Will plot the + / - buttons for feedback.
:param eval_html:
:param qe_id:
:param ans_id:
:return:
"""
buttons = html_create_feedback_updown_button(qe_id, ans_id, selected)
final_div = Div(eval_html, buttons, style=" background-color: #f0f0f0;")
return final_div
@rt("/toggle_up_down/{qe_id}/{ans_id}/{which}")
def get(session, qe_id:int, ans_id:int, which:int):
"""
Answer to the +/- button presses
:param session:
:param qe_id:
:param ans_id:
:param which:
:return:
"""
print(qe_id, ans_id, which)
if which not in {-1, 1}:
print(f"The {which=} is bad")
return None
print(f"{qe_id=} {ans_id=} {which=}")
is_ok, qe_obj, session_obj = validate_and_get_question_evaluation_objectid(session, qe_id)
if not is_ok:
print("toggle_up_down made session/object error")
return "Error"
# save_to_storage(
# storage.NavigationEvent(event_type="/", event_session_id=session_id, event_params={"qe_id":qe_id})
# )
answer_eval_js = json.loads(qe_obj.answer_eval_text)
crt_selection = answer_eval_js[ans_id]["EVAL"]
input_button = which
out_selection = (input_button if crt_selection == 0 else (0 if crt_selection == input_button else input_button))
print(f"out selection: {out_selection}")
# store it back in DB
answer_eval_js[ans_id]["EVAL"] = out_selection
qe_obj.answer_eval_text = answer_eval_js
qe_obj.submitted = False # mark object as dirty
question_evaluation_table.upsert(qe_obj)
buttons= html_create_feedback_updown_button(qe_id, ans_id, selected=out_selection)
return buttons
def html_get_textual_feedback_form(qe_obj, thank=False):
if thank:
ph = "Thank you!"
else:
ph = "Write your general feedback here"
form = Form(Input(name="freeform_feedback", placeholder=ph),
Button("Submit", disabled=(qe_obj.submitted == 1)), hx_post=f"/submit_feedback/{qe_obj.id}",
target_id=HTML_SUBMIT_FEEDBACK, hx_swap="outerHTML",)
div = Div(P("Give us a general feedback for the evaluation (optional)"), form, id=HTML_SUBMIT_FEEDBACK)
return div
@rt("/submit_feedback/{qe_id}")
def post(session, qe_id:int, freeform_feedback:str):
is_ok, qe_obj, session_obj = validate_and_get_question_evaluation_objectid(session, qe_id)
if not is_ok:
print("submit_feedback made session/object error")
return "Error"
# Update the object
session_id = session.get("session_id", "Not set")
save_to_storage(
storage.NavigationEvent(event_type="/submit_feedback", event_session_id=session_id,
event_params={"qe_id":qe_id})
)
answer_eval_js = json.loads(qe_obj.answer_eval_text)
answer_eval_js[0]["explanation"] = freeform_feedback
qe_obj.submitted = True
qe_obj.answer_eval_text = json.dumps(answer_eval_js)
question_evaluation_table.upsert(qe_obj)
save_to_storage(
storage.CodeSubmittedEvent(event_session_id=session["session_id"], db_question_evaluation_id=qe_obj.id,
submitted_date=session_obj.submitted_date,
received_date=session_obj.evaluated_date,
code_to_eval=qe_obj.code_text, evaluation_response=qe_obj.answer_eval_text,
has_feedback=True, feedback_date=datetime.isoformat(datetime.utcnow()))
)
return html_get_textual_feedback_form(qe_obj, thank=True)
####### EVALUATE CODE
def html_format_code_review_form(qe_obj, html_id=""):
"""
Formats the code review, adding fields for feedback if it is required.
:param feedback_js:
:param c_code:
:param html_id:
:return:
"""
c_code = qe_obj.code_text
enhanced_answer = json.loads(qe_obj.answer_eval_text)
list_of_citerias = []
for caug_code, caug_txt in eval_code.CODE_AUGMENTATIONS:
crit_tag = [H3(caug_code), P(caug_txt)]
list_of_citerias.extend(crit_tag)
# yeah, I know . . .
for k, eval_line in enumerate(enhanced_answer):
if caug_code == eval_line["criteria"]:
eval_txt = P(eval_line["explanation"])
eval_txt_fb = html_augment_evaluation_text_with_feedback(eval_txt, qe_obj.id, k)
list_of_citerias.append(eval_txt_fb)
textual_feedback = html_get_textual_feedback_form(qe_obj)
return Div(html_render_code_output(c_code), *list_of_citerias, textual_feedback, id=html_id)
def html_default_results(html_id):
return Div(P("This is where criterias will show up once the code is evaluated"), id=html_id)
def html_waiting_for_results(html_id):
return Div(P("Working . . ."), id=html_id,
hx_get=f"/render_answer",
hx_trigger = f"every {REFRESH_TIME}s",
hx_swap = "outerHTML",
)
def get_latest_eval_request_status(session_id):
state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
if len(state_rows) <= 0:
return EVAL_STATE_NEW, None
state_obj = state_rows[0]
if state_obj.state in {EVAL_STATE_NEW, EVAL_STATE_QUERY, EVAL_STATE_ANSWER}:
return state_obj.state, state_obj
return EVAL_STATE_ERROR, state_obj
def html_error_results(message, html_id):
div = Div(P("There was an error:", P(message)), id=html_id)
return div
def html_render_code_output(code):
txtarea = Pre(Code(code))
return txtarea
def html_render_answer_from_db(session_id, html_id):
eval_request_status, state_obj = get_latest_eval_request_status(session_id)
# state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
# print(eval_request_status, state_obj)
if eval_request_status == EVAL_STATE_NEW:
return html_default_results(html_id), #, html_render_inputbox(HTML_RESULTS_AREA, HTML_SUBMIT_CODE_AREA)
if eval_request_status == EVAL_STATE_ANSWER:
qe_obj_lst = question_evaluation_table(limit=1, where=f"id == {state_obj.current_qeval}")
if len(qe_obj_lst) < 1:
print(f"Object id {state_obj.current_qeval} can't be found in question_evaluation_table")
return (None,)
qe_obj = qe_obj_lst[0]
return (html_format_code_review_form(qe_obj, html_id),
html_render_inputbox(target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA)) #, html_render_code_output(HTML_SUBMIT_CODE_AREA, state_obj.code)
if eval_request_status == EVAL_STATE_QUERY:
return html_waiting_for_results(html_id),
return html_error_results(state_obj.answer, html_id), #, html_render_code_output(HTML_SUBMIT_CODE_AREA, state_obj.code)
# How can I timeout? Well ... TBD.
@threaded
def call_gpt_and_store_result(session_obj_id, code_to_check):
"""
Threaded function that will submit code to LLM and wait for the answer.
Communication with "main" thread is through db.
All parameters must be pickable.
:param session_obj_id:
:param code_to_check:
:return:
"""
# TODO refactor considering new join!
# print("evaluatign code")
try:
# Pesky way to get a new cursor, in a thread safe way, into the db. This code runs in another thread.
# Can we do better?
local_database = Database(DATABASE_NAME)
local_sess_state = local_database.t.session_state
local_sess_state_cls = local_sess_state.dataclass()
local_sess_obj_lst = local_sess_state(limit=1, where=f"id == {session_obj_id}")
local_sess_obj = local_sess_obj_lst[0]
# Trigger the lenghtly operation
enhanced_answer = eval_code.eval_the_piece_of_c_code(openai_client=None, ccode=code_to_check)
# we create a new QA entry.
qe_obj = Question_Evaluation_cls(code_text=code_to_check, answer_eval_text=enhanced_answer, submitted=0)
qe_obj = question_evaluation_table.insert(qe_obj)
local_sess_obj.current_qeval = qe_obj.id
evaluation_date = datetime.isoformat(datetime.utcnow())
# save to persistent storage
save_to_storage(
storage.CodeSubmittedEvent(event_session_id=local_sess_obj.session_id, db_question_evaluation_id=qe_obj.id,
submitted_date=local_sess_obj.submitted_date,
received_date=evaluation_date,
code_to_eval=code_to_check, evaluation_response=json.dumps(enhanced_answer))
)
# Update the session object.
if "error" in enhanced_answer:
local_sess_obj.state = EVAL_STATE_ERROR
local_sess_obj.answer = enhanced_answer["error"]
local_sess_obj.evaluated_date = evaluation_date
else:
local_sess_obj.state = EVAL_STATE_ANSWER
local_sess_obj.evaluated_date = evaluation_date
local_sess_state.update(local_sess_obj)
except:
import traceback
traceback.print_exc()
######## CODE INPUT FORM
def html_render_inputbox(target_html_id, region_html_id):
txtarea = Textarea(id="ccodetoeval", name="ccodetoeval", placeholder="Enter a piece of C code", rows=3)
form = Form(Group(txtarea, Button("Evaluate")),
hx_post="/submit_to_eval",
hx_swap="outerHTML",
target_id=f"{target_html_id}"
)
return Div(form, id=region_html_id, hx_swap_oob='true')
########## CLEAR FORM
def html_render_clear_area_button(html_id):
button = Button("Clear form",
hx_get="/clear_area",
hx_swap="outerHTML",
target_id=HTML_RESULTS_AREA,
)
div = Div(button, id=html_id, hx_swap_oob='true')
return div
def render_clear_area(session_id, html_id):
# return html_render_clear_area_button(html_id)
eval_request_status, _ = get_latest_eval_request_status(session_id)
if eval_request_status != EVAL_STATE_NEW:
# print("clear button: render button")
return html_render_clear_area_button(html_id)
else:
# print("clear button: render empty")
return Div(P(""), id=html_id, hx_swap_oob='true')
########## AUTHENTICATION
def html_render_login_to_get_access_part(session):
"""
Will render the Log in to get access part and set the session oauth_secret.
:param session:
:return:
"""
content = []
content.append(H3("Log in to give feedback!"))
content.append(P("We will record your name, hugginface name, hf profile page, email address. "
"We will try HARD not to make them public but . . ."))
content.append(P("Your feedback will be made public but in an anonymized form."))
# build the redirect URL
global SPACE_HOST
if "localhost" in SPACE_HOST:
space_host = f"http://{SPACE_HOST}/auth_callback/"
else:
space_host = f"https://{SPACE_HOST}/auth_callback/"
secret = secrets.token_urlsafe(32)
session["oauth_secret"] = secret
encoded_scopes = html.escape(OAUTH_SCOPES)
redirect_link = (f"https://huggingface.co/oauth/authorize?redirect_uri={space_host}&scope={encoded_scopes}"
f"&client_id={OAUTH_CLIENT_ID}&state={secret}&response_type=code&prompt=consent")
login_button = A(Img(src="https://huggingface.co/datasets/huggingface/badges/resolve/main/sign-in-with-huggingface-md.svg",
alt="Sign in with Hugging Face",
# style="cursor: pointer; display: none;",
id="signin", name=None),
href=redirect_link, target="_blank")
content.append(login_button)
content.append(P(" "))
div = Div(*content, id=HTML_USER_DATA)
return div
def html_render_welcome_user(user_info):
"""
Extracts the user data and paints the Welcome screen
:param user_info:
:return:
"""
content = []
content.append(H4(f"Welcome {user_info['name']}!"))
# content.append(Img(src=OPENID_PROVIDER_URL + user_info["picture"], alt="Picture"))
content.append(P("Your feedback will be made public but in an anonymized form."))
div = Div(*content, id=HTML_USER_DATA)
return div
@rt("/auth_callback")
def get(session, code:str=None, state:str=None, error:str=None, error_description:str=None):
"""
Endpoint that will be called by HF once the user gives (or not) consent
:param session:
:param code:
:param state:
:param error:
:param error_description:
:return:
"""
ans = None
if error is not None:
print(f"HF OAuth returned an error: {error} {error_description}")
ans = Div(P(f"We can't log you in. Huggingface says: {error_description}"),
P("Please close this page"))
return ans
print(f"OAuth returned a code")
# validating the secret
sess_secret = session.get("oauth_secret", None)
if sess_secret is None:
print("No session secret")
return P("access denied")
if sess_secret != state:
print(f"Mistmatch session secret and HF secret: {sess_secret=} {state=}")
# Moving on and get the token
global SPACE_HOST
if "localhost" in SPACE_HOST:
space_host = f"http://{SPACE_HOST}/auth_callback/"
else:
space_host = f"https://{SPACE_HOST}/auth_callback/"
auth_header = base64.b64encode(f"{OAUTH_CLIENT_ID}:{OAUTH_CLIENT_SECRET}".encode()).decode()
headers = {
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
}
data = {
"client_id": OAUTH_CLIENT_ID,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": space_host,
}
token_response = requests.post(HF_OAUTH_TOKEN_URL, data=data, headers=headers)
if token_response.status_code == 200:
tokens = token_response.json()
# Here you would typically store the tokens securely and/or use them
print("Succsss in getting the token")
access_token = tokens["access_token"]
user_headers = {
"Authorization": f"Bearer {access_token}"
}
response_userinfo = requests.get(HF_OAUTH_USERINFO, headers=user_headers)
if response_userinfo.status_code == 200:
user_data = response_userinfo.json()
# Set the user data in DB
# TODO set the user data in db For now, set it in session
session["user_data"] = user_data
session["oauth_secret"] = None
print(user_data)
else:
print(f"Error while taking the user data: {response_userinfo.text}" )
return P("Error logging in")
else:
print(f"We did not get the tokens: {token_response.text}")
return P("Error logging in")
print(session)
return P("Succes! Close this page.")
########## MAIN PAGE
@rt("/")
def get(session):
print(session)
if 'session_id' not in session:
session['session_id'] = str(uuid.uuid4())
session_id = session["session_id"]
if len(session_state_table(where=f"session_id=='{session_id}'")) <= 0:
# old session ID, "resetting".
session['session_id'] = str(uuid.uuid4())
save_to_storage(
storage.NavigationEvent(event_type="/", event_session_id=session_id)
)
user_data = session.get("user_data", None)
if user_data is not None:
auth_area = html_render_welcome_user(user_data)
else:
auth_area = html_render_login_to_get_access_part(session) # might set some secrets here
title = Title('C code review for students')
preamble = [
auth_area,
H1("Evaluate your C code!"),
P("Enter your code in the textbox below and wait for answers."),
P("!! The data will be saved and maybe made public !!"),
]
input_area = html_render_inputbox(target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA)
results_area = html_render_answer_from_db(session_id, HTML_RESULTS_AREA)
clear_area = render_clear_area(session_id, HTML_CLEAR_FORM)
return title, Main(*preamble, input_area, results_area, clear_area)
@rt("/render_answer")
def get(session):
if 'session_id' not in session: return "render_answer No session ID"
session_id = session["session_id"]
answer_area = html_render_answer_from_db(session_id, HTML_RESULTS_AREA)
return answer_area
@rt("/submit_to_eval", methods="post")
def post(session, ccodetoeval:str):
print(session)
if 'session_id' not in session:
return P("submit_to_eval. Bad call. No session ID")
session_id = session["session_id"]
session_obj = Session_State_cls(
session_id=session_id,
state=EVAL_STATE_QUERY,
submitted_date=datetime.isoformat(datetime.utcnow()),
)
# we insert and we get the new primary key
session_obj = session_state_table.insert(session_obj)
# will be executed in another thread with magic @threaded
call_gpt_and_store_result(session_obj.id, ccodetoeval)
return (*html_render_answer_from_db(session_id, HTML_RESULTS_AREA),
render_clear_area(session_id, HTML_CLEAR_FORM)
)
@rt("/clear_area", methods="get")
def get(session):
if 'session_id' not in session: return P("clear_area. Bad call. No session ID")
session_id = session["session_id"]
save_to_storage(
storage.NavigationEvent(event_type="/clear_area", event_session_id=session_id)
)
# insert a row to "cancel"/reset the current request
session_obj = Session_State_cls(
session_id=session_id,
state=EVAL_STATE_NEW,
submitted_date=datetime.isoformat(datetime.utcnow()),
)
session_state_table.insert(session_obj)
# re-issue forms
input_area = html_render_inputbox(target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA)
results_area = html_render_answer_from_db(session_id, HTML_RESULTS_AREA)
clear_area = render_clear_area(session_id, HTML_CLEAR_FORM)
print(results_area)
return *results_area, input_area, clear_area
## This code is for reference, for OAuth. BIG PITA, will be added later.
#
# js_hf_imports = \
# """
# {
# "imports": {
# "@huggingface/hub": "https://cdn.jsdelivr.net/npm/@huggingface/hub@0.13.0/+esm"
# }
# }
# """
#
# js_block_hf_auth = \
# """
# import { oauthLoginUrl, oauthHandleRedirectIfPresent } from "@huggingface/hub";
# console.log("huggingface env", window.huggingface);
# let oauthResult = localStorage.getItem("oauth");
# if (oauthResult) {
# try {
# oauthResult = JSON.parse(oauthResult);
# } catch {
# oauthResult = null;
# }
# }
#
# oauthResult ||= await oauthHandleRedirectIfPresent();
# if (oauthResult) {
# document.querySelector("pre").textContent = JSON.stringify(oauthResult, null, 2);
# localStorage.setItem("oauth", JSON.stringify(oauthResult));
# document.getElementById("signout").style.removeProperty("display");
# document.getElementById("signout").onclick = async function() {
# localStorage.removeItem("oauth");
# window.location.href = window.location.href.replace(/\?.*$/, '');
# window.location.reload();
# }
# } else {
# document.getElementById("signin").style.removeProperty("display");
# document.getElementById("signin").onclick = async function() {
# // prompt=consent to re-trigger the consent screen instead of silently redirecting
# window.location.href = (await oauthLoginUrl({scopes: window.huggingface.variables.OAUTH_SCOPES})) + "&prompt=consent";
# }
# }
# """
#
# js_block_hf_auth2 = \
# """
# import { oauthLoginUrl, oauthHandleRedirectIfPresent } from "@huggingface/hub";
# let oauthResult = localStorage.getItem("oauth");
# if (oauthResult) {
# try {
# oauthResult = JSON.parse(oauthResult);
# } catch {
# oauthResult = null;
# }
# }
#
# console.log("OAuth result", oauthResult);
# oauthResult ||= await oauthHandleRedirectIfPresent();
#
# if (oauthResult) {
# document.querySelector("pre").textContent = JSON.stringify(oauthResult, null, 2);
# localStorage.setItem("oauth", JSON.stringify(oauthResult));
# console.log("There is an oauth result", oauthResult);
# document.getElementById("signout").style.removeProperty("display");
# document.getElementById("signout").onclick = async function() {
# localStorage.removeItem("oauth");
# window.location.href = window.location.href.replace(/\?.*$/, '');
# window.location.reload();
# }
#
# } else {
# console.log("No OAuth result. Setting the loging button event");
# document.getElementById("signin").style.removeProperty("display");
# document.getElementById("signin").onclick = async function() {
# // prompt=consent to re-trigger the consent screen instead of silently redirecting
# window.location.href = window.hf_redirect_url;
# }
# }
# """
#
#
# def get_hf_user_data():
# pass
# @rt('/', methods="get")
# def get():
# global SPACE_HOST
# if "localhost" in SPACE_HOST:
# space_host = f"htpp://{SPACE_HOST}/"
# else:
# space_host = f"https://{SPACE_HOST}/"
# hf_redirect_url_func = (
# f"import {{ oauthLoginUrl, }} from '@huggingface/hub'; \n"
# f"window.hf_redirect_url = await oauthLoginUrl({{clientId:'{OAUTH_CLIENT_ID}',redirectUrl:'{space_host}',"
# f"scopes:'{OAUTH_SCOPES}'}}) + '&prompt=consent';\n"
# f"console.log(window.hf_redirect_url);")
#
# header = Head(Title("C code reviewing"),
# Script(src="https://unpkg.com/es-module-shims@1.7.0/dist/es-module-shims.js"),
# Script(js_hf_imports, type="importmap"),
# )
# content = Body(Div(P("Some content!")),
# A(f"Space host: {space_host}", href=space_host),
# Pre(""),
# Script(hf_redirect_url_func, type="module"),
# Img(src="https://huggingface.co/datasets/huggingface/badges/resolve/main/sign-in-with-huggingface-xl-dark.svg",
# alt="Sign in with Hugging Face",
# style="cursor: pointer; display: none;",
# id="signin", name=None),
# Button("Sign out", id="signout", style="display: none", name=None),
# A("Authenticate!", href="authorized"),
# Script(js_block_hf_auth2, type="module"))
# full_page = Html(header, content)
# return full_page
# @rt('/change')
# def get(): return P('Nice to be here!')
# @rt('/authorized', methods="get")
# def authorized():
# content = Body(P("Do we have authenticated user?"))
# return content
serve()