c-reviewer / main.py
ml-visoft's picture
wrong session_data bug
7e31458
raw
history blame
34.7 kB
import json
import uuid
import os
import pathlib
from uuid import uuid4
from datetime import datetime
import dataclasses
import html
import base64
import requests
import secrets
from fasthtml.common import *
from fastcore.all import typedispatch
from sqlite_minutils.db import Database
from huggingface_hub import CommitScheduler
import eval_code
import storage
# the secrets. Will be loaded from HF, or for docker --env-file or from IDE
OAUTH_CLIENT_ID = os.environ.get('OAUTH_CLIENT_ID')
OAUTH_SCOPES = os.environ.get('OAUTH_SCOPES')
OAUTH_CLIENT_SECRET = os.environ.get('OAUTH_CLIENT_SECRET')
OPENID_PROVIDER_URL = os.environ.get("OPENID_PROVIDER_URL", "https://huggingface.co")
SPACE_HOST = os.environ.get('SPACE_HOST')
HF_DATASET_AUTH_TOKEN = os.environ.get('HF_DATASET_AUTH_TOKEN', "(none)")
OPENAI_KEY = os.environ.get('OPENAI_KEY', "(none)")
try:
DEFAULT_ANONYMOUS_USER_LEVEL = int(os.environ.get('DEFAULT_ANONYMOUS_USER_LEVEL', 0))
except:
DEFAULT_ANONYMOUS_USER_LEVEL = 0
LOCAL_STORAGE_PATH = ""
datetime_now = datetime.isoformat(datetime.utcnow())
FILE_EVENTS_NAME = f"events-{datetime_now}-{uuid4()}.jsonl"
FILE_SUBMITTED_NAME = f"submitted-{datetime_now}-{uuid4()}.jsonl"
if "localhost" in SPACE_HOST:
IS_LOCALHOST = True
else:
IS_LOCALHOST = False
if IS_LOCALHOST:
DATABASE_NAME = "data/sessions_meta.db"
LOCAL_STORAGE_PATH = Path("data/persistent/")
# pathlib.Path(DATABASE_NAME).unlink(missing_ok=True)
print(f"Database {DATABASE_NAME=} exists? {pathlib.Path(DATABASE_NAME).exists()}")
else:
DATABASE_NAME = "/tmp/cache/sessions_meta.db"
LOCAL_STORAGE_PATH = Path("/tmp/cache/persistent")
LOCAL_STORAGE_PATH.mkdir(exist_ok=True, parents=True)
EVENTS_FILE_PATH = LOCAL_STORAGE_PATH / FILE_EVENTS_NAME
SUBMITTED_FILE_PATH = LOCAL_STORAGE_PATH / FILE_SUBMITTED_NAME
scheduler = None
try:
if HF_DATASET_AUTH_TOKEN != "(none)":
scheduler = CommitScheduler(
repo_id="ml-visoft/c-reviewer",
repo_type="dataset",
folder_path=LOCAL_STORAGE_PATH,
every=10,
path_in_repo="raw_data",
token=HF_DATASET_AUTH_TOKEN,
squash_history=False
)
except:
import traceback
traceback.print_exc()
global_database = Database(DATABASE_NAME)
global_database_tables = global_database.t
# We store session specific feedback from registered users.
# Will be later used to submit/storage the answer.
question_evaluation_table = global_database_tables.question_answer
if question_evaluation_table not in global_database_tables:
question_evaluation_table.create(id=int, code_text=str,
answer_eval_text=str, submitted=int, pk='id')
Question_Evaluation_cls = question_evaluation_table.dataclass()
# We store real-time session IDs and the state of the questions. Will link to question_answer
session_state_table = global_database_tables.session_state
if session_state_table not in global_database_tables:
# session_id stored in cookies
# see EVAL_STATUS_x below for state
session_state_table.create(id=int, session_id=str, state=int, submitted_date=str, evaluated_date=str,
current_qeval=int, pk='id',)
# Can't really nail the fk specs
# foreign_keys=[("current_qeval", question_evaluation_table, "id")])
Session_State_cls = session_state_table.dataclass()
EVAL_STATE_NEW=0
EVAL_STATE_QUERY=1
EVAL_STATE_TIMEDOUT=2
EVAL_STATE_ANSWER=3
EVAL_STATE_ERROR=4
# Constants to name the various HTML ids in the code
HTML_SUBMIT_CODE_AREA = "submit_code_area"
HTML_RESULTS_AREA = "prompt_response"
HTML_CLEAR_FORM = "clear_the_form"
HTML_USER_DATA = "login_user_data"
OAUTH_SECRET_SESSION_NAME = "oauth_secret"
USER_DATA_SESSION_NAME = "user_data"
def get_openid_configuration():
"""
Nicely read the HF openid endpoints and configure them.
:return:
"""
config_url = OPENID_PROVIDER_URL + "/.well-known/openid-configuration"
try:
response = requests.get(config_url)
response.raise_for_status() # Raises an HTTPError for bad responses (4xx or 5xx)
config = response.json()
token_endpoint = config.get('token_endpoint')
userinfo_endpoint = config.get('userinfo_endpoint')
return token_endpoint, userinfo_endpoint
except requests.RequestException as e:
print(f"An error occurred: {e}")
return None, None
# Use the function
HF_OAUTH_TOKEN_URL, HF_OAUTH_USERINFO = get_openid_configuration()
if HF_OAUTH_TOKEN_URL and HF_OAUTH_USERINFO:
print(f"Token Endpoint: {HF_OAUTH_TOKEN_URL}")
print(f"UserInfo Endpoint: {HF_OAUTH_USERINFO}")
else:
print("Failed to retrieve the endpoints.")
hdrs = (
HighlightJS(langs=['python', 'c', 'c++']),
Link(rel="stylesheet", href="https://cdnjs.cloudflare.com/ajax/libs/flexboxgrid/6.3.1/flexboxgrid.min.css", type="text/css"),
# Meta(name="htmx-config", content='{"defaultSwapStyle":"outerHTML"}')
)
if IS_LOCALHOST:
# Are we hacking locally?
print("Localhost detected in SPACE_HOST. App started in debug+live mode!")
app, rt = fast_app(debug=True, live=True, hdrs=hdrs)
REFRESH_TIME = 0.1
EVAL_TIMEOUT_SECONDS = 5
else:
app, rt = fast_app(debug=False, live=False, hdrs=hdrs)
REFRESH_TIME = 1
EVAL_TIMEOUT_SECONDS = 15
openai_client = eval_code.get_the_openai_client(OPENAI_KEY)
################# STORAGE
def untyped_save_to_storage(dc, filename):
def __write_to_file(_filename, _dc):
js_str = json.dumps(dataclasses.asdict(_dc)) + "\n"
with open(_filename, "a") as f:
f.write(js_str)
if scheduler is None and IS_LOCALHOST:
# this is on local system, dump without checking for backup strategy
__write_to_file(filename, dc)
return
with scheduler.lock:
__write_to_file(filename, dc)
@typedispatch
def save_to_storage(nav_event:storage.NavigationEvent):
untyped_save_to_storage(nav_event, EVENTS_FILE_PATH)
@typedispatch
def save_to_storage(eval_event:storage.CodeSubmittedEvent):
untyped_save_to_storage(eval_event, SUBMITTED_FILE_PATH)
########## EVALUATION FEEDBACK
def validate_and_get_question_evaluation_objectid(session, qe_id:int):
"""
Interrogates the tables to see the current answer AND current feedback if any
:param session:
:param qe_id:
:return:
"""
if 'session_id' not in session:
print("validate_and_get_question_evaluation_objectid bad session data")
return False, None, None
session_id = session["session_id"]
state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
if len(state_rows) <= 0:
print("validate_and_get_question_evaluation_objectid there is no state")
return False, None, None
answer_id = state_rows[0].current_qeval
qa_obj_row = question_evaluation_table(limit=1, where=f"id == {answer_id}")
if len(qa_obj_row) <= 0:
print("validate_and_get_question_evaluation_objectid There is no answer recorded")
return False, None, None
qe_obj = qa_obj_row[0]
if qe_id != qe_obj.id:
print(f"validate_and_get_question_evaluation_objectid QE {qe_id} does not belong to {qe_obj.id}")
return False, None, None
return True, qe_obj, state_rows[0]
def html_create_feedback_updown_button(qe_id, ans_id, selected=0, disabled=False):
"""
Thumbs up/down button rendering.
:param qe_id:
:param ans_id:
:param selected:
:param disabled:
:return:
"""
html_target_id = f"buttons_{ans_id}"
colors = ["grey", "blue"]
up_col = colors[0]
down_col = colors[0]
if selected == 1: up_col = colors[1]
if selected == -1: down_col = colors[1]
toggle_url = f"/toggle_up_down/{qe_id}/{ans_id}"
up = Button("👍", hx_get=f"{toggle_url}/1", disabled=disabled, target_id=html_target_id,
hx_swap="outerHTML", style=f"background-color:{up_col}")
down = Button("👎", hx_get=f"{toggle_url}/-1", disabled=disabled, target_id=html_target_id,
hx_swap="outerHTML", style=f"background-color:{down_col}")
button_row = Div(up, down, id=html_target_id, _class="box col-xs-1",
style=f"flex: 0 0 auto; display: flex; gap: 5px; margin-right: 10px;")
return button_row
def html_augment_signle_evaluation_text_with_feedback(eval_html, qe_obj, ans_id, show_feedback_buttons=False):
"""
Will plot the + / - buttons for feedback.
:param eval_html:
:param qe_obj:
:param ans_id:
:return:
"""
answer_eval_js = json.loads(qe_obj.answer_eval_text)
if show_feedback_buttons:
buttons = html_create_feedback_updown_button(qe_obj.id, ans_id, answer_eval_js[ans_id]["EVAL"],
disabled=(qe_obj.submitted == 1))
else:
buttons = Div("")
final_div = Div(eval_html, buttons,
style=" background-color: #f0f0f0; display: flex; "
"width: 98%; margin: 16px; padding: 3px; align-items: center;", cls="row")
return final_div
@rt("/toggle_up_down/{qe_id}/{ans_id}/{which}")
def get(session, qe_id:int, ans_id:int, which:int):
"""
Answer to the +/- button presses
:param session:
:param qe_id:
:param ans_id:
:param which:
:return:
"""
# print(qe_id, ans_id, which)
if which not in {-1, 1}:
print(f"The {which=} is bad")
return None
# print(f"{qe_id=} {ans_id=} {which=}")
is_ok, qe_obj, session_obj = validate_and_get_question_evaluation_objectid(session, qe_id)
if not is_ok:
print("toggle_up_down made session/object error")
return "Error"
save_to_storage(
storage.NavigationEvent(event_type="/toggle_up_down", event_session_id=session_obj.session_id,
event_params={"question_evaluation_id":qe_id, "answer_id":ans_id, "which":which}),
)
answer_eval_js = json.loads(qe_obj.answer_eval_text)
crt_selection = answer_eval_js[ans_id]["EVAL"]
input_button = which
out_selection = (input_button if crt_selection == 0 else (0 if crt_selection == input_button else input_button))
print(f"out selection: {out_selection}")
# store it back in DB
answer_eval_js[ans_id]["EVAL"] = out_selection
qe_obj.answer_eval_text = answer_eval_js
qe_obj.submitted = False # mark object as dirty
question_evaluation_table.upsert(qe_obj)
buttons= html_create_feedback_updown_button(qe_id, ans_id, selected=out_selection)
return buttons
def html_get_textual_feedback_form(qe_obj,):
if qe_obj.submitted == 1:
ph = "Thank you!"
else:
ph = "Write your general feedback here"
form = Form(Input(name="freeform_feedback", placeholder=ph),
Button("Submit", disabled=(qe_obj.submitted == 1)), hx_post=f"/submit_feedback/{qe_obj.id}",
target_id=HTML_RESULTS_AREA, hx_swap="outerHTML",)
div = Div(P("Give us a general feedback for the evaluation (optional)"), form)
return div
@rt("/submit_feedback/{qe_id}")
def post(session, qe_id:int, freeform_feedback:str):
is_ok, qe_obj, session_obj = validate_and_get_question_evaluation_objectid(session, qe_id)
if not is_ok:
print("submit_feedback made session/object error")
return "Error"
ulevel = get_user_level(session)
if ulevel < 2:
return P("Unauthorized. Log in to submit feedback.")
# Update the object
session_id = session.get("session_id", "Not set")
save_to_storage(
storage.NavigationEvent(event_type="/submit_feedback", event_session_id=session_id,
event_params={"question_evaluation_id":qe_id})
)
if len(freeform_feedback) > 10000:
freeform_feedback = freeform_feedback[:10000]
answer_eval_js = json.loads(qe_obj.answer_eval_text)
answer_eval_js[0]["explanation"] = freeform_feedback
qe_obj.submitted = True
qe_obj.answer_eval_text = json.dumps(answer_eval_js)
question_evaluation_table.upsert(qe_obj)
user_data = session.get(USER_DATA_SESSION_NAME, {})
save_to_storage(
storage.CodeSubmittedEvent(event_session_id=session["session_id"], db_question_evaluation_id=qe_obj.id,
submitted_date=session_obj.submitted_date,
received_date=session_obj.evaluated_date,
code_to_eval=qe_obj.code_text, evaluation_response=qe_obj.answer_eval_text,
has_feedback=True, feedback_date=datetime.isoformat(datetime.utcnow()),
feedback_userdata=user_data)
)
return tl_html_results_and_feedback_area(session)
####### EVALUATE CODE
def html_format_code_review_form(qe_obj, show_feedback_buttons):
"""
Formats the code review, adding fields for feedback if it is required.
:param feedback_js:
:param c_code:
:param html_id:
:return:
"""
c_code = qe_obj.code_text
enhanced_answer = json.loads(qe_obj.answer_eval_text)
list_of_citerias = []
for caug_code, caug_txt in eval_code.CODE_AUGMENTATIONS:
crit_tag = [H3(caug_code), P(caug_txt)]
# list_of_citerias.extend(crit_tag)
# yeah, I know . . .
eval_txt_fb_list = []
for k, eval_line in enumerate(enhanced_answer):
if caug_code == eval_line["criteria"]:
eval_txt = Div(P(eval_line["explanation"]), style="flex: 1; margin-right: 10px;", cls="box col-xs-11")
eval_txt_fb = html_augment_signle_evaluation_text_with_feedback(eval_txt, qe_obj, k, show_feedback_buttons)
eval_txt_fb_list.append(eval_txt_fb)
criteria_div = Div(*crit_tag, *eval_txt_fb_list, style="border: 1px solid black; margin: 10px; padding: 10px")
list_of_citerias.append(criteria_div)
if show_feedback_buttons:
textual_feedback = html_get_textual_feedback_form(qe_obj)
else:
textual_feedback = P("Log in to leave some feedback about this evaluation")
return html_render_code_output(c_code), *list_of_citerias, textual_feedback
def check_if_query_should_timeout(session_obj):
"""
Checks if the evaluation request is timed out. Will update the database to ERROR
:param eval_request_status:
:return:
"""
submitted_dt = datetime.fromisoformat(session_obj.submitted_date)
crt_time = datetime.utcnow()
time_difference = crt_time - submitted_dt
difference_in_seconds = time_difference.total_seconds()
if difference_in_seconds > EVAL_TIMEOUT_SECONDS:
session_obj.state = EVAL_STATE_TIMEDOUT
session_state_table.upsert(session_obj)
def html_default_results():
return P("Submit a piece of C code to get a Clean-Code evaluation.")
def html_waiting_for_results():
return Div(P("Working . . ."), hx_get=f"/render_answer",
hx_trigger = f"every {REFRESH_TIME}s", hx_swap="outerHTML", target_id=HTML_RESULTS_AREA,)
def html_eval_request_timed_out():
return P("Timed out. Retry in few minutes pls!")
@rt("/render_answer")
def get(session):
"""
Endpoint to render the evaluation answer. Pooled by frontend.
:param session:
:return:
"""
if 'session_id' not in session: return "render_answer No session ID"
session_id = session["session_id"]
answer_area = tl_html_results_and_feedback_area(session)
return answer_area
def get_latest_eval_request_status(session_id):
state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
if len(state_rows) <= 0:
return EVAL_STATE_NEW, None
state_obj = state_rows[0]
if state_obj.state in {EVAL_STATE_NEW, EVAL_STATE_QUERY, EVAL_STATE_TIMEDOUT, EVAL_STATE_ANSWER}:
return state_obj.state, state_obj
return EVAL_STATE_ERROR, state_obj
def html_error_results(message):
ans = P("There was an error:", P(message))
return ans
def html_render_code_output(code):
txtarea = Pre(Code(code, _class="language-c"))
return txtarea
def html_render_answer_from_db(session, show_submit_form=True):
session_id = session["session_id"]
eval_request_status, state_obj = get_latest_eval_request_status(session_id)
ulevel = get_user_level(session)
if ulevel >= 2:
show_feedback_buttons = True
else:
show_feedback_buttons = False
# state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
# print(eval_request_status, state_obj)
if eval_request_status == EVAL_STATE_NEW:
return html_default_results(),
if eval_request_status == EVAL_STATE_ANSWER:
qe_obj_lst = question_evaluation_table(limit=1, where=f"id == {state_obj.current_qeval}")
if len(qe_obj_lst) < 1:
print(f"Object id {state_obj.current_qeval} can't be found in question_evaluation_table")
return (None,)
qe_obj = qe_obj_lst[0]
return (html_format_code_review_form(qe_obj, show_feedback_buttons),
tl_html_render_inputbox(session, target_html_id=HTML_RESULTS_AREA,
region_html_id=HTML_SUBMIT_CODE_AREA) if show_submit_form else None)
if eval_request_status == EVAL_STATE_TIMEDOUT:
return html_eval_request_timed_out(),
if eval_request_status == EVAL_STATE_QUERY:
check_if_query_should_timeout(state_obj)
return html_waiting_for_results(),
if eval_request_status == EVAL_STATE_ERROR:
# TODO duplicate code! fix it!
qe_obj_lst = question_evaluation_table(limit=1, where=f"id == {state_obj.current_qeval}")
if len(qe_obj_lst) < 1:
print(f"Object id {state_obj.current_qeval} can't be found in question_evaluation_table")
return (None,)
qe_obj = qe_obj_lst[0]
return html_error_results(qe_obj.answer_eval_text),
print(f"Unknown state of the code evalation request {state_obj.state}:")
return html_error_results("Some error occured."),
# How can I timeout? Well ... TBD.
@threaded
def call_gpt_and_store_result(session_obj_id, code_to_check):
"""
Threaded function that will submit code to LLM and wait for the answer.
Communication with "main" thread is through db.
All parameters must be pickable.
:param session_obj_id:
:param code_to_check:
:return:
"""
# TODO refactor considering new join!
# print("evaluatign code")
try:
# Pesky way to get a new cursor, in a thread safe way, into the db. This code runs in another thread.
# Can we do better?
local_database = Database(DATABASE_NAME)
local_sess_state = local_database.t.session_state
local_sess_state_cls = local_sess_state.dataclass()
local_sess_obj_lst = local_sess_state(limit=1, where=f"id == {session_obj_id}")
local_sess_obj = local_sess_obj_lst[0]
# Trigger the lenghtly operation
enhanced_answer = eval_code.eval_the_piece_of_c_code(openai_client=openai_client, ccode=code_to_check)
# we create a new QA entry.
qe_obj = Question_Evaluation_cls(code_text=code_to_check, answer_eval_text=enhanced_answer, submitted=0)
qe_obj = question_evaluation_table.insert(qe_obj)
local_sess_obj.current_qeval = qe_obj.id
evaluation_date = datetime.isoformat(datetime.utcnow())
# save to persistent storage
save_to_storage(
storage.CodeSubmittedEvent(event_session_id=local_sess_obj.session_id, db_question_evaluation_id=qe_obj.id,
submitted_date=local_sess_obj.submitted_date,
received_date=evaluation_date,
code_to_eval=code_to_check, evaluation_response=json.dumps(enhanced_answer))
)
# Update the session object.
if "error" in enhanced_answer:
local_sess_obj.state = EVAL_STATE_ERROR
local_sess_obj.answer = enhanced_answer["error"]
local_sess_obj.evaluated_date = evaluation_date
else:
local_sess_obj.state = EVAL_STATE_ANSWER
local_sess_obj.evaluated_date = evaluation_date
local_sess_state.update(local_sess_obj)
except:
import traceback
traceback.print_exc()
def tl_html_results_and_feedback_area(session, show_submit_form=True):
"""
Top level component that will render the code evaluation overlapped with feedback submission form.
:param session_id:
:return:
"""
results_feedback_area = html_render_answer_from_db(session, show_submit_form)
return Div(*results_feedback_area, id=HTML_RESULTS_AREA)
######## CODE INPUT FORM
def tl_html_render_inputbox(session, target_html_id, region_html_id):
txtarea = Textarea(id="ccodetoeval", name="ccodetoeval", placeholder="Paste a piece of C code here.", rows=3)
form = Form(Group(txtarea, Button("Evaluate")),
hx_post="/submit_to_eval",
hx_swap="outerHTML",
target_id=f"{target_html_id}"
)
ulevel = get_user_level(session)
if ulevel < 1:
form = P("Log in to submit code to review.", style="background-color: #fff0f0;")
out_div = Div(form, id=region_html_id, hx_swap_oob='true')
return out_div
@rt("/submit_to_eval", methods="post")
def post(session, ccodetoeval:str):
ulevel = get_user_level(session)
if ulevel < 1:
return P("Unauthorized. Log in to submit code to review.", style="background-color: #fff0f0;")
if 'session_id' not in session:
return P("submit_to_eval. Bad call. No session ID")
session_id = session["session_id"]
save_to_storage(
storage.NavigationEvent(event_type="/submit_to_eval", event_session_id=session_id, event_params={"ccodetoeval":ccodetoeval})
)
if len(ccodetoeval) > 100 and len(ccodetoeval) < 40000:
session_obj = Session_State_cls(
session_id=session_id,
state=EVAL_STATE_QUERY,
submitted_date=datetime.isoformat(datetime.utcnow()),
)
# we insert and we get the new primary key
session_obj = session_state_table.insert(session_obj)
# will be executed in another thread with magic @threaded
call_gpt_and_store_result(session_obj.id, ccodetoeval)
return tl_html_results_and_feedback_area(session), render_clear_area(session_id, HTML_CLEAR_FORM)
def tl_html_get_samples_section():
button_row = []
head = H4("Don't have any ideas? Try one of the samples below:")
no_examples = len(eval_code.CODE_EVAL_EXAMPLES)
for k in range(no_examples):
example = eval_code.get_enhanced_sample_example(k)
button = Button(example["name"], hx_get=f"/load_example/{k}", target_id=HTML_RESULTS_AREA)
button_row.append(button)
div_buttons = Div(*button_row, _class="row")
div = Div(head, div_buttons, style="border: 1px solid black;padding: 10px; margin 1px;")
return div
@rt("/load_example/{example_id}", methods="get")
def get(session, example_id:int):
session_id = session["session_id"]
save_to_storage(
storage.NavigationEvent(event_type="/submit_to_eval", event_session_id=session_id, event_params={"example_id":example_id})
)
example = eval_code.get_enhanced_sample_example(example_id)
qe_obj = Question_Evaluation_cls(code_text=example["code"], answer_eval_text=example["eval"], submitted=0)
qe_obj = question_evaluation_table.insert(qe_obj)
session_obj = Session_State_cls(
session_id=session_id,
state=EVAL_STATE_ANSWER,
submitted_date=datetime.isoformat(datetime.utcnow()),
evaluated_date=datetime.isoformat(datetime.utcnow()),
current_qeval=qe_obj.id,
)
session_state_table.insert(session_obj)
return tl_html_results_and_feedback_area(session)
########## CLEAR FORM
def html_render_clear_area_button(html_id):
button = Button("Clear form",
hx_get="/clear_area",
hx_swap="outerHTML",
target_id=HTML_RESULTS_AREA,
)
div = Div(button, id=html_id, hx_swap_oob='true')
return div
def render_clear_area(session_id, html_id):
# return html_render_clear_area_button(html_id)
eval_request_status, _ = get_latest_eval_request_status(session_id)
if eval_request_status != EVAL_STATE_NEW:
# print("clear button: render button")
return html_render_clear_area_button(html_id)
else:
# print("clear button: render empty")
return Div(P(""), id=html_id, hx_swap_oob='true')
@rt("/clear_area", methods="get")
def get(session):
if 'session_id' not in session: return P("clear_area. Bad call. No session ID")
session_id = session["session_id"]
save_to_storage(
storage.NavigationEvent(event_type="/clear_area", event_session_id=session_id)
)
# insert a row to "cancel"/reset the current request
session_obj = Session_State_cls(
session_id=session_id,
state=EVAL_STATE_NEW,
submitted_date=datetime.isoformat(datetime.utcnow()),
)
session_state_table.insert(session_obj)
# re-issue the page, basically.
input_area = tl_html_render_inputbox(session, target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA)
results_area = tl_html_results_and_feedback_area(session)
clear_area = render_clear_area(session_id, HTML_CLEAR_FORM)
return results_area, input_area, clear_area
########## AUTHENTICATION
def html_render_login_to_get_access_part(session):
"""
Will render the Log in to get access part and set the session oauth_secret.
The button will launch a popup window and a script that will listen for login_success event and will refresh the
main page.
:param session:
:return:
"""
content = []
content.append(H4("Log in to give feedback!"))
content.append(P("We will record your name, hugginface name, hf profile page, email address. "
"We will try HARD not to make them public but . . ."))
content.append(P("Your feedback will be made public but in an anonymized form."))
popup_script = Script("""
function openPopup(url) {
var width = 500;
var height = 730;
var left = (screen.width - width) / 2;
var top = (screen.height - height) / 2;
localStorage.removeItem('login_success');
loginPopup = window.open(url, 'LoginWindow', 'width=' + width + ',height=' + height + ',left=' + left + ',top=' + top);
loginCheckInterval = setInterval(checkPopupClosed, 500);
return false;
}
function checkPopupClosed() {
console.log("Checking popup");
if (localStorage.getItem('login_success') === 'true') {
console.log("Checking popup - local storage");
clearInterval(loginCheckInterval);
localStorage.removeItem('login_success');
// Reload the page or update UI as needed
location.reload();
}
}
"""
)
content.append(popup_script)
# build the redirect URL
global SPACE_HOST
if "localhost" in SPACE_HOST:
auth_callback_url = f"http://{SPACE_HOST}/auth_callback/"
else:
auth_callback_url = f"https://{SPACE_HOST}/auth_callback/"
secret = datetime.isoformat(datetime.utcnow()) + "-" + secrets.token_urlsafe(32)
session[OAUTH_SECRET_SESSION_NAME] = secret
encoded_scopes = html.escape(OAUTH_SCOPES)
redirect_link = (f"https://huggingface.co/oauth/authorize?redirect_uri={auth_callback_url}&scope={encoded_scopes}"
f"&client_id={OAUTH_CLIENT_ID}&state={secret}&response_type=code&prompt=consent")
login_button = A(Img(src="https://huggingface.co/datasets/huggingface/badges/resolve/main/sign-in-with-huggingface-md.svg",
alt="Sign in with Hugging Face",
# style="cursor: pointer; display: none;",
id="signin", name=None),
href=redirect_link, onclick="return openPopup(this.href); return false;")
content.append(login_button)
content.append(P(" "))
div = Div(*content, id=HTML_USER_DATA, style="background-color: #f0f0f0; padding: 10px;")
return div
def html_render_welcome_user(user_info):
"""
Extracts the user data and paints the Welcome screen
:param user_info:
:return:
"""
content = []
content.append(H4(f"Welcome {user_info['name']}!"))
# content.append(Img(src=OPENID_PROVIDER_URL + user_info["picture"], alt="Picture"))
content.append(P("Your feedback will be made public but in an anonymized form."))
content.append(P(A("Logout", href=f"/logout")))
div = Div(*content, id=HTML_USER_DATA, style="background-color: #f0f0f0; padding: 10px;")
return div
@rt("/auth_callback")
def get(session, code:str=None, state:str=None, error:str=None, error_description:str=None):
"""
Endpoint that will be called by HF once the user gives (or not) consent
:param session:
:param code:
:param state:
:param error:
:param error_description:
:return:
"""
# print(session)
close_script = Script("""
localStorage.setItem('login_success', 'true');
setTimeout(function() {
window.close();
}, 1500); // 3000 milliseconds = 3 seconds
""")
return_answer = [close_script]
if error is not None:
print(f"HF OAuth returned an error: {error} {error_description}")
ans = Div(P(f"We can't log you in. Huggingface says: {error_description}"),
P("Please close this page"))
return_answer.append(ans)
return Div(*return_answer)
# validating the secret
sess_secret = session.get(OAUTH_SECRET_SESSION_NAME, None)
if sess_secret is None:
print("No session secret")
return_answer.append(P("access denied"))
return Div(*return_answer)
if sess_secret != state:
msg = f"Mismatch session secret and HF secret: {sess_secret=} {state=}"
print(msg)
return_answer.append(P("Mismatch session secret and HF secret"))
return Div(*return_answer)
# Moving on and get the token
global SPACE_HOST
if "localhost" in SPACE_HOST:
space_host = f"http://{SPACE_HOST}/auth_callback/"
else:
space_host = f"https://{SPACE_HOST}/auth_callback/"
auth_header = base64.b64encode(f"{OAUTH_CLIENT_ID}:{OAUTH_CLIENT_SECRET}".encode()).decode()
headers = {
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
}
data = {
"client_id": OAUTH_CLIENT_ID,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": space_host,
}
token_response = requests.post(HF_OAUTH_TOKEN_URL, data=data, headers=headers)
if token_response.status_code == 200:
tokens = token_response.json()
# Here you would typically store the tokens securely and/or use them
print("Succsss in getting the token")
access_token = tokens["access_token"]
user_headers = {
"Authorization": f"Bearer {access_token}"
}
response_userinfo = requests.get(HF_OAUTH_USERINFO, headers=user_headers)
if response_userinfo.status_code == 200:
user_data = response_userinfo.json()
# Set the user data in DB
# TODO Is it mildly safe to store user data in session?
session[USER_DATA_SESSION_NAME] = user_data
session[OAUTH_SECRET_SESSION_NAME] = None
# print(user_data)
else:
print(f"Error while taking the user data: {response_userinfo.text}" )
return_answer.append(P("Error logging in"))
return Div(*return_answer)
else:
print(f"We did not get the tokens: {token_response.text}")
return_answer.append(P("Error logging in"))
return Div(*return_answer)
# print(session)
return_answer.append(P("Succes! Close this page."))
save_to_storage(
storage.NavigationEvent(event_type="/auth_callback", event_session_id=session["session_id"],
event_params={"user_data":user_data})
)
return Div(*return_answer)
@rt("/logout")
def get(session):
session[OAUTH_SECRET_SESSION_NAME] = None
save_to_storage(
storage.NavigationEvent(event_type="/logout", event_session_id=session["session_id"],
event_params={"user_data":session["user_data"]})
)
session["user_data"] = None
return RedirectResponse(url="/")
def get_user_level(session):
"""
Gets the user rights level. Basically 0 for not logged in, 2 for logged in.
The DEFAULT_ANONYMOUS_USER_LEVEL can raise this level (max operation)
:param session:
:return:
"""
ulevel = 0
if session.get(USER_DATA_SESSION_NAME, None) is not None:
ulevel = 2
ulevel = max(ulevel, DEFAULT_ANONYMOUS_USER_LEVEL)
return ulevel
########## MAIN PAGE
@rt("/")
def get(session):
if 'session_id' not in session:
session['session_id'] = str(uuid.uuid4())
session_id = session["session_id"]
if len(session_state_table(where=f"session_id=='{session_id}'")) <= 0:
# old session ID, "resetting".
session['session_id'] = str(uuid.uuid4())
save_to_storage(
storage.NavigationEvent(event_type="/", event_session_id=session_id)
)
user_data = session.get(USER_DATA_SESSION_NAME, None)
if user_data is not None:
auth_area = html_render_welcome_user(user_data)
else:
auth_area = html_render_login_to_get_access_part(session) # might set some secrets here
title = Title('C code review for students')
preamble = [
auth_area,
H1("Evaluate your C code!"),
P("Enter your code in the textbox below and wait for answers."),
P("!! The data will be saved and maybe made public !!", style="background-color: #f0fff0;"),
P("Hint: Don't evaluate more than ~ 1000 LoC."),
]
input_area = tl_html_render_inputbox(session, target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA)
samples_area = tl_html_get_samples_section()
results_feedback_area = tl_html_results_and_feedback_area(session, show_submit_form=False)
clear_area = render_clear_area(session_id, HTML_CLEAR_FORM)
# print(session)
return title, Main( *preamble, input_area, samples_area, results_feedback_area, clear_area)
serve()