import json import uuid import os import pathlib from uuid import uuid4 from datetime import datetime import dataclasses import html import base64 import requests import secrets from fasthtml.common import * from fastcore.all import typedispatch from sqlite_minutils.db import Database from huggingface_hub import CommitScheduler import eval_code import storage # the secrets. Will be loaded from HF, or for docker --env-file or from IDE OAUTH_CLIENT_ID = os.environ.get('OAUTH_CLIENT_ID') OAUTH_SCOPES = os.environ.get('OAUTH_SCOPES') OAUTH_CLIENT_SECRET = os.environ.get('OAUTH_CLIENT_SECRET') OPENID_PROVIDER_URL = os.environ.get("OPENID_PROVIDER_URL", "https://huggingface.co") SPACE_HOST = os.environ.get('SPACE_HOST') HF_DATASET_AUTH_TOKEN = os.environ.get('HF_DATASET_AUTH_TOKEN', "(none)") OPENAI_KEY = os.environ.get('OPENAI_KEY', "(none)") try: DEFAULT_ANONYMOUS_USER_LEVEL = int(os.environ.get('DEFAULT_ANONYMOUS_USER_LEVEL', 0)) except: DEFAULT_ANONYMOUS_USER_LEVEL = 0 LOCAL_STORAGE_PATH = "" datetime_now = datetime.isoformat(datetime.utcnow()) FILE_EVENTS_NAME = f"events-{datetime_now}-{uuid4()}.jsonl" FILE_SUBMITTED_NAME = f"submitted-{datetime_now}-{uuid4()}.jsonl" if "localhost" in SPACE_HOST: IS_LOCALHOST = True else: IS_LOCALHOST = False if IS_LOCALHOST: DATABASE_NAME = "data/sessions_meta.db" LOCAL_STORAGE_PATH = Path("data/persistent/") # pathlib.Path(DATABASE_NAME).unlink(missing_ok=True) print(f"Database {DATABASE_NAME=} exists? {pathlib.Path(DATABASE_NAME).exists()}") else: DATABASE_NAME = "/tmp/cache/sessions_meta.db" LOCAL_STORAGE_PATH = Path("/tmp/cache/persistent") LOCAL_STORAGE_PATH.mkdir(exist_ok=True, parents=True) EVENTS_FILE_PATH = LOCAL_STORAGE_PATH / FILE_EVENTS_NAME SUBMITTED_FILE_PATH = LOCAL_STORAGE_PATH / FILE_SUBMITTED_NAME scheduler = None try: if HF_DATASET_AUTH_TOKEN != "(none)": scheduler = CommitScheduler( repo_id="ml-visoft/c-reviewer", repo_type="dataset", folder_path=LOCAL_STORAGE_PATH, every=10, path_in_repo="raw_data", token=HF_DATASET_AUTH_TOKEN, squash_history=False ) except: import traceback traceback.print_exc() global_database = Database(DATABASE_NAME) global_database_tables = global_database.t # We store session specific feedback from registered users. # Will be later used to submit/storage the answer. question_evaluation_table = global_database_tables.question_answer if question_evaluation_table not in global_database_tables: question_evaluation_table.create(id=int, code_text=str, answer_eval_text=str, submitted=int, pk='id') Question_Evaluation_cls = question_evaluation_table.dataclass() # We store real-time session IDs and the state of the questions. Will link to question_answer session_state_table = global_database_tables.session_state if session_state_table not in global_database_tables: # session_id stored in cookies # see EVAL_STATUS_x below for state session_state_table.create(id=int, session_id=str, state=int, submitted_date=str, evaluated_date=str, current_qeval=int, pk='id',) # Can't really nail the fk specs # foreign_keys=[("current_qeval", question_evaluation_table, "id")]) Session_State_cls = session_state_table.dataclass() EVAL_STATE_NEW=0 EVAL_STATE_QUERY=1 EVAL_STATE_TIMEDOUT=2 EVAL_STATE_ANSWER=3 EVAL_STATE_ERROR=4 # Constants to name the various HTML ids in the code HTML_SUBMIT_CODE_AREA = "submit_code_area" HTML_RESULTS_AREA = "prompt_response" HTML_CLEAR_FORM = "clear_the_form" HTML_USER_DATA = "login_user_data" OAUTH_SECRET_SESSION_NAME = "oauth_secret" USER_DATA_SESSION_NAME = "user_data" def get_openid_configuration(): """ Nicely read the HF openid endpoints and configure them. :return: """ config_url = OPENID_PROVIDER_URL + "/.well-known/openid-configuration" try: response = requests.get(config_url) response.raise_for_status() # Raises an HTTPError for bad responses (4xx or 5xx) config = response.json() token_endpoint = config.get('token_endpoint') userinfo_endpoint = config.get('userinfo_endpoint') return token_endpoint, userinfo_endpoint except requests.RequestException as e: print(f"An error occurred: {e}") return None, None # Use the function HF_OAUTH_TOKEN_URL, HF_OAUTH_USERINFO = get_openid_configuration() if HF_OAUTH_TOKEN_URL and HF_OAUTH_USERINFO: print(f"Token Endpoint: {HF_OAUTH_TOKEN_URL}") print(f"UserInfo Endpoint: {HF_OAUTH_USERINFO}") else: print("Failed to retrieve the endpoints.") hdrs = ( HighlightJS(langs=['python', 'c', 'c++']), Link(rel="stylesheet", href="https://cdnjs.cloudflare.com/ajax/libs/flexboxgrid/6.3.1/flexboxgrid.min.css", type="text/css"), # Meta(name="htmx-config", content='{"defaultSwapStyle":"outerHTML"}') ) if IS_LOCALHOST: # Are we hacking locally? print("Localhost detected in SPACE_HOST. App started in debug+live mode!") app, rt = fast_app(debug=True, live=True, hdrs=hdrs) REFRESH_TIME = 0.1 EVAL_TIMEOUT_SECONDS = 5 else: app, rt = fast_app(debug=False, live=False, hdrs=hdrs) REFRESH_TIME = 1 EVAL_TIMEOUT_SECONDS = 15 openai_client = eval_code.get_the_openai_client(OPENAI_KEY) ################# STORAGE def untyped_save_to_storage(dc, filename): def __write_to_file(_filename, _dc): js_str = json.dumps(dataclasses.asdict(_dc)) + "\n" with open(_filename, "a") as f: f.write(js_str) if scheduler is None and IS_LOCALHOST: # this is on local system, dump without checking for backup strategy __write_to_file(filename, dc) return with scheduler.lock: __write_to_file(filename, dc) @typedispatch def save_to_storage(nav_event:storage.NavigationEvent): untyped_save_to_storage(nav_event, EVENTS_FILE_PATH) @typedispatch def save_to_storage(eval_event:storage.CodeSubmittedEvent): untyped_save_to_storage(eval_event, SUBMITTED_FILE_PATH) ########## EVALUATION FEEDBACK def validate_and_get_question_evaluation_objectid(session, qe_id:int): """ Interrogates the tables to see the current answer AND current feedback if any :param session: :param qe_id: :return: """ if 'session_id' not in session: print("validate_and_get_question_evaluation_objectid bad session data") return False, None, None session_id = session["session_id"] state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") if len(state_rows) <= 0: print("validate_and_get_question_evaluation_objectid there is no state") return False, None, None answer_id = state_rows[0].current_qeval qa_obj_row = question_evaluation_table(limit=1, where=f"id == {answer_id}") if len(qa_obj_row) <= 0: print("validate_and_get_question_evaluation_objectid There is no answer recorded") return False, None, None qe_obj = qa_obj_row[0] if qe_id != qe_obj.id: print(f"validate_and_get_question_evaluation_objectid QE {qe_id} does not belong to {qe_obj.id}") return False, None, None return True, qe_obj, state_rows[0] def html_create_feedback_updown_button(qe_id, ans_id, selected=0, disabled=False): """ Thumbs up/down button rendering. :param qe_id: :param ans_id: :param selected: :param disabled: :return: """ html_target_id = f"buttons_{ans_id}" colors = ["grey", "blue"] up_col = colors[0] down_col = colors[0] if selected == 1: up_col = colors[1] if selected == -1: down_col = colors[1] toggle_url = f"/toggle_up_down/{qe_id}/{ans_id}" up = Button("👍", hx_get=f"{toggle_url}/1", disabled=disabled, target_id=html_target_id, hx_swap="outerHTML", style=f"background-color:{up_col}") down = Button("👎", hx_get=f"{toggle_url}/-1", disabled=disabled, target_id=html_target_id, hx_swap="outerHTML", style=f"background-color:{down_col}") button_row = Div(up, down, id=html_target_id, _class="box col-xs-1", style=f"flex: 0 0 auto; display: flex; gap: 5px; margin-right: 10px;") return button_row def html_augment_signle_evaluation_text_with_feedback(eval_html, qe_obj, ans_id, show_feedback_buttons=False): """ Will plot the + / - buttons for feedback. :param eval_html: :param qe_obj: :param ans_id: :return: """ answer_eval_js = json.loads(qe_obj.answer_eval_text) if show_feedback_buttons: buttons = html_create_feedback_updown_button(qe_obj.id, ans_id, answer_eval_js[ans_id]["EVAL"], disabled=(qe_obj.submitted == 1)) else: buttons = Div("") final_div = Div(eval_html, buttons, style=" background-color: #f0f0f0; display: flex; " "width: 98%; margin: 16px; padding: 3px; align-items: center;", cls="row") return final_div @rt("/toggle_up_down/{qe_id}/{ans_id}/{which}") def get(session, qe_id:int, ans_id:int, which:int): """ Answer to the +/- button presses :param session: :param qe_id: :param ans_id: :param which: :return: """ # print(qe_id, ans_id, which) if which not in {-1, 1}: print(f"The {which=} is bad") return None # print(f"{qe_id=} {ans_id=} {which=}") is_ok, qe_obj, session_obj = validate_and_get_question_evaluation_objectid(session, qe_id) if not is_ok: print("toggle_up_down made session/object error") return "Error" save_to_storage( storage.NavigationEvent(event_type="/toggle_up_down", event_session_id=session_obj.session_id, event_params={"question_evaluation_id":qe_id, "answer_id":ans_id, "which":which}), ) answer_eval_js = json.loads(qe_obj.answer_eval_text) crt_selection = answer_eval_js[ans_id]["EVAL"] input_button = which out_selection = (input_button if crt_selection == 0 else (0 if crt_selection == input_button else input_button)) print(f"out selection: {out_selection}") # store it back in DB answer_eval_js[ans_id]["EVAL"] = out_selection qe_obj.answer_eval_text = answer_eval_js qe_obj.submitted = False # mark object as dirty question_evaluation_table.upsert(qe_obj) buttons= html_create_feedback_updown_button(qe_id, ans_id, selected=out_selection) return buttons def html_get_textual_feedback_form(qe_obj,): if qe_obj.submitted == 1: ph = "Thank you!" else: ph = "Write your general feedback here" form = Form(Input(name="freeform_feedback", placeholder=ph), Button("Submit", disabled=(qe_obj.submitted == 1)), hx_post=f"/submit_feedback/{qe_obj.id}", target_id=HTML_RESULTS_AREA, hx_swap="outerHTML",) div = Div(P("Give us a general feedback for the evaluation (optional)"), form) return div @rt("/submit_feedback/{qe_id}") def post(session, qe_id:int, freeform_feedback:str): is_ok, qe_obj, session_obj = validate_and_get_question_evaluation_objectid(session, qe_id) if not is_ok: print("submit_feedback made session/object error") return "Error" ulevel = get_user_level(session) if ulevel < 2: return P("Unauthorized. Log in to submit feedback.") # Update the object session_id = session.get("session_id", "Not set") save_to_storage( storage.NavigationEvent(event_type="/submit_feedback", event_session_id=session_id, event_params={"question_evaluation_id":qe_id}) ) if len(freeform_feedback) > 10000: freeform_feedback = freeform_feedback[:10000] answer_eval_js = json.loads(qe_obj.answer_eval_text) answer_eval_js[0]["explanation"] = freeform_feedback qe_obj.submitted = True qe_obj.answer_eval_text = json.dumps(answer_eval_js) question_evaluation_table.upsert(qe_obj) user_data = session.get(USER_DATA_SESSION_NAME, {}) save_to_storage( storage.CodeSubmittedEvent(event_session_id=session["session_id"], db_question_evaluation_id=qe_obj.id, submitted_date=session_obj.submitted_date, received_date=session_obj.evaluated_date, code_to_eval=qe_obj.code_text, evaluation_response=qe_obj.answer_eval_text, has_feedback=True, feedback_date=datetime.isoformat(datetime.utcnow()), feedback_userdata=user_data) ) return tl_html_results_and_feedback_area(session) ####### EVALUATE CODE def html_format_code_review_form(qe_obj, show_feedback_buttons): """ Formats the code review, adding fields for feedback if it is required. :param feedback_js: :param c_code: :param html_id: :return: """ c_code = qe_obj.code_text enhanced_answer = json.loads(qe_obj.answer_eval_text) list_of_citerias = [] for caug_code, caug_txt in eval_code.CODE_AUGMENTATIONS: crit_tag = [H3(caug_code), P(caug_txt)] # list_of_citerias.extend(crit_tag) # yeah, I know . . . eval_txt_fb_list = [] for k, eval_line in enumerate(enhanced_answer): if caug_code == eval_line["criteria"]: eval_txt = Div(P(eval_line["explanation"]), style="flex: 1; margin-right: 10px;", cls="box col-xs-11") eval_txt_fb = html_augment_signle_evaluation_text_with_feedback(eval_txt, qe_obj, k, show_feedback_buttons) eval_txt_fb_list.append(eval_txt_fb) criteria_div = Div(*crit_tag, *eval_txt_fb_list, style="border: 1px solid black; margin: 10px; padding: 10px") list_of_citerias.append(criteria_div) if show_feedback_buttons: textual_feedback = html_get_textual_feedback_form(qe_obj) else: textual_feedback = P("Log in to leave some feedback about this evaluation") return html_render_code_output(c_code), *list_of_citerias, textual_feedback def check_if_query_should_timeout(session_obj): """ Checks if the evaluation request is timed out. Will update the database to ERROR :param eval_request_status: :return: """ submitted_dt = datetime.fromisoformat(session_obj.submitted_date) crt_time = datetime.utcnow() time_difference = crt_time - submitted_dt difference_in_seconds = time_difference.total_seconds() if difference_in_seconds > EVAL_TIMEOUT_SECONDS: session_obj.state = EVAL_STATE_TIMEDOUT session_state_table.upsert(session_obj) def html_default_results(): return P("Submit a piece of C code to get a Clean-Code evaluation.") def html_waiting_for_results(): return Div(P("Working . . ."), hx_get=f"/render_answer", hx_trigger = f"every {REFRESH_TIME}s", hx_swap="outerHTML", target_id=HTML_RESULTS_AREA,) def html_eval_request_timed_out(): return P("Timed out. Retry in few minutes pls!") @rt("/render_answer") def get(session): """ Endpoint to render the evaluation answer. Pooled by frontend. :param session: :return: """ if 'session_id' not in session: return "render_answer No session ID" session_id = session["session_id"] answer_area = tl_html_results_and_feedback_area(session) return answer_area def get_latest_eval_request_status(session_id): state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") if len(state_rows) <= 0: return EVAL_STATE_NEW, None state_obj = state_rows[0] if state_obj.state in {EVAL_STATE_NEW, EVAL_STATE_QUERY, EVAL_STATE_TIMEDOUT, EVAL_STATE_ANSWER}: return state_obj.state, state_obj return EVAL_STATE_ERROR, state_obj def html_error_results(message): ans = P("There was an error:", P(message)) return ans def html_render_code_output(code): txtarea = Pre(Code(code, _class="language-c")) return txtarea def html_render_answer_from_db(session, show_submit_form=True): session_id = session["session_id"] eval_request_status, state_obj = get_latest_eval_request_status(session_id) ulevel = get_user_level(session) if ulevel >= 2: show_feedback_buttons = True else: show_feedback_buttons = False # state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") # print(eval_request_status, state_obj) if eval_request_status == EVAL_STATE_NEW: return html_default_results(), if eval_request_status == EVAL_STATE_ANSWER: qe_obj_lst = question_evaluation_table(limit=1, where=f"id == {state_obj.current_qeval}") if len(qe_obj_lst) < 1: print(f"Object id {state_obj.current_qeval} can't be found in question_evaluation_table") return (None,) qe_obj = qe_obj_lst[0] return (html_format_code_review_form(qe_obj, show_feedback_buttons), tl_html_render_inputbox(session, target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA) if show_submit_form else None) if eval_request_status == EVAL_STATE_TIMEDOUT: return html_eval_request_timed_out(), if eval_request_status == EVAL_STATE_QUERY: check_if_query_should_timeout(state_obj) return html_waiting_for_results(), if eval_request_status == EVAL_STATE_ERROR: # TODO duplicate code! fix it! qe_obj_lst = question_evaluation_table(limit=1, where=f"id == {state_obj.current_qeval}") if len(qe_obj_lst) < 1: print(f"Object id {state_obj.current_qeval} can't be found in question_evaluation_table") return (None,) qe_obj = qe_obj_lst[0] return html_error_results(qe_obj.answer_eval_text), print(f"Unknown state of the code evalation request {state_obj.state}:") return html_error_results("Some error occured."), # How can I timeout? Well ... TBD. @threaded def call_gpt_and_store_result(session_obj_id, code_to_check): """ Threaded function that will submit code to LLM and wait for the answer. Communication with "main" thread is through db. All parameters must be pickable. :param session_obj_id: :param code_to_check: :return: """ # TODO refactor considering new join! # print("evaluatign code") try: # Pesky way to get a new cursor, in a thread safe way, into the db. This code runs in another thread. # Can we do better? local_database = Database(DATABASE_NAME) local_sess_state = local_database.t.session_state local_sess_state_cls = local_sess_state.dataclass() local_sess_obj_lst = local_sess_state(limit=1, where=f"id == {session_obj_id}") local_sess_obj = local_sess_obj_lst[0] # Trigger the lenghtly operation enhanced_answer = eval_code.eval_the_piece_of_c_code(openai_client=openai_client, ccode=code_to_check) # we create a new QA entry. qe_obj = Question_Evaluation_cls(code_text=code_to_check, answer_eval_text=enhanced_answer, submitted=0) qe_obj = question_evaluation_table.insert(qe_obj) local_sess_obj.current_qeval = qe_obj.id evaluation_date = datetime.isoformat(datetime.utcnow()) # save to persistent storage save_to_storage( storage.CodeSubmittedEvent(event_session_id=local_sess_obj.session_id, db_question_evaluation_id=qe_obj.id, submitted_date=local_sess_obj.submitted_date, received_date=evaluation_date, code_to_eval=code_to_check, evaluation_response=json.dumps(enhanced_answer)) ) # Update the session object. if "error" in enhanced_answer: local_sess_obj.state = EVAL_STATE_ERROR local_sess_obj.answer = enhanced_answer["error"] local_sess_obj.evaluated_date = evaluation_date else: local_sess_obj.state = EVAL_STATE_ANSWER local_sess_obj.evaluated_date = evaluation_date local_sess_state.update(local_sess_obj) except: import traceback traceback.print_exc() def tl_html_results_and_feedback_area(session, show_submit_form=True): """ Top level component that will render the code evaluation overlapped with feedback submission form. :param session_id: :return: """ results_feedback_area = html_render_answer_from_db(session, show_submit_form) return Div(*results_feedback_area, id=HTML_RESULTS_AREA) ######## CODE INPUT FORM def tl_html_render_inputbox(session, target_html_id, region_html_id): txtarea = Textarea(id="ccodetoeval", name="ccodetoeval", placeholder="Paste a standalone C program here. Larger than a 'Hello world', smaller than 1000 LoC.", rows=3) form = Form(Group(txtarea, Button("Evaluate")), hx_post="/submit_to_eval", hx_swap="outerHTML", target_id=f"{target_html_id}" ) ulevel = get_user_level(session) if ulevel < 1: form = P("Log in to submit code to review.", style="background-color: #fff0f0;") out_div = Div(form, id=region_html_id, hx_swap_oob='true') return out_div @rt("/submit_to_eval", methods="post") def post(session, ccodetoeval:str): ulevel = get_user_level(session) if ulevel < 1: return P("Unauthorized. Log in to submit code to review.", style="background-color: #fff0f0;") if 'session_id' not in session: return P("submit_to_eval. Bad call. No session ID") session_id = session["session_id"] save_to_storage( storage.NavigationEvent(event_type="/submit_to_eval", event_session_id=session_id, event_params={"ccodetoeval":ccodetoeval}) ) if len(ccodetoeval) > 100 and len(ccodetoeval) < 40000: session_obj = Session_State_cls( session_id=session_id, state=EVAL_STATE_QUERY, submitted_date=datetime.isoformat(datetime.utcnow()), ) # we insert and we get the new primary key session_obj = session_state_table.insert(session_obj) # will be executed in another thread with magic @threaded call_gpt_and_store_result(session_obj.id, ccodetoeval) return tl_html_results_and_feedback_area(session), render_clear_area(session_id, HTML_CLEAR_FORM) def tl_html_get_samples_section(): button_row = [] head = H4("Don't have any ideas? Try one of the samples below:") no_examples = len(eval_code.CODE_EVAL_EXAMPLES) for k in range(no_examples): example = eval_code.get_enhanced_sample_example(k) button = Button(example["name"], hx_get=f"/load_example/{k}", target_id=HTML_RESULTS_AREA) button_row.append(button) div_buttons = Div(*button_row, _class="row") div = Div(head, div_buttons, style="border: 1px solid black;padding: 10px; margin 1px;") return div @rt("/load_example/{example_id}", methods="get") def get(session, example_id:int): session_id = session["session_id"] save_to_storage( storage.NavigationEvent(event_type="/submit_to_eval", event_session_id=session_id, event_params={"example_id":example_id}) ) example = eval_code.get_enhanced_sample_example(example_id) qe_obj = Question_Evaluation_cls(code_text=example["code"], answer_eval_text=example["eval"], submitted=0) qe_obj = question_evaluation_table.insert(qe_obj) session_obj = Session_State_cls( session_id=session_id, state=EVAL_STATE_ANSWER, submitted_date=datetime.isoformat(datetime.utcnow()), evaluated_date=datetime.isoformat(datetime.utcnow()), current_qeval=qe_obj.id, ) session_state_table.insert(session_obj) return tl_html_results_and_feedback_area(session) ########## CLEAR FORM def html_render_clear_area_button(html_id): button = Button("Clear form", hx_get="/clear_area", hx_swap="outerHTML", target_id=HTML_RESULTS_AREA, ) div = Div(button, id=html_id, hx_swap_oob='true') return div def render_clear_area(session_id, html_id): # return html_render_clear_area_button(html_id) eval_request_status, _ = get_latest_eval_request_status(session_id) if eval_request_status != EVAL_STATE_NEW: # print("clear button: render button") return html_render_clear_area_button(html_id) else: # print("clear button: render empty") return Div(P(""), id=html_id, hx_swap_oob='true') @rt("/clear_area", methods="get") def get(session): if 'session_id' not in session: return P("clear_area. Bad call. No session ID") session_id = session["session_id"] save_to_storage( storage.NavigationEvent(event_type="/clear_area", event_session_id=session_id) ) # insert a row to "cancel"/reset the current request session_obj = Session_State_cls( session_id=session_id, state=EVAL_STATE_NEW, submitted_date=datetime.isoformat(datetime.utcnow()), ) session_state_table.insert(session_obj) # re-issue the page, basically. input_area = tl_html_render_inputbox(session, target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA) results_area = tl_html_results_and_feedback_area(session) clear_area = render_clear_area(session_id, HTML_CLEAR_FORM) return results_area, input_area, clear_area ########## AUTHENTICATION def html_render_login_to_get_access_part(session): """ Will render the Log in to get access part and set the session oauth_secret. The button will launch a popup window and a script that will listen for login_success event and will refresh the main page. :param session: :return: """ content = [] content.append(H4("Log in to give feedback!")) content.append(P("We will record your name, hugginface name, hf profile page, email address. " "We will try HARD not to make them public but . . .")) content.append(P("Your feedback will be made public but in an anonymized form.")) popup_script = Script(""" function openPopup(url) { var width = 500; var height = 730; var left = (screen.width - width) / 2; var top = (screen.height - height) / 2; localStorage.removeItem('login_success'); loginPopup = window.open(url, 'LoginWindow', 'width=' + width + ',height=' + height + ',left=' + left + ',top=' + top); loginCheckInterval = setInterval(checkPopupClosed, 500); return false; } function checkPopupClosed() { console.log("Checking popup"); if (localStorage.getItem('login_success') === 'true') { console.log("Checking popup - local storage"); clearInterval(loginCheckInterval); localStorage.removeItem('login_success'); // Reload the page or update UI as needed location.reload(); } } """ ) content.append(popup_script) # build the redirect URL global SPACE_HOST if "localhost" in SPACE_HOST: auth_callback_url = f"http://{SPACE_HOST}/auth_callback/" else: auth_callback_url = f"https://{SPACE_HOST}/auth_callback/" secret = datetime.isoformat(datetime.utcnow()) + "-" + secrets.token_urlsafe(32) session[OAUTH_SECRET_SESSION_NAME] = secret encoded_scopes = html.escape(OAUTH_SCOPES) redirect_link = (f"https://huggingface.co/oauth/authorize?redirect_uri={auth_callback_url}&scope={encoded_scopes}" f"&client_id={OAUTH_CLIENT_ID}&state={secret}&response_type=code&prompt=consent") login_button = A(Img(src="https://huggingface.co/datasets/huggingface/badges/resolve/main/sign-in-with-huggingface-md.svg", alt="Sign in with Hugging Face", # style="cursor: pointer; display: none;", id="signin", name=None), href=redirect_link, onclick="return openPopup(this.href); return false;") content.append(login_button) content.append(P(" ")) div = Div(*content, id=HTML_USER_DATA, style="background-color: #f0f0f0; padding: 10px;") return div def html_render_welcome_user(user_info): """ Extracts the user data and paints the Welcome screen :param user_info: :return: """ content = [] content.append(H4(f"Welcome {user_info['name']}!")) # content.append(Img(src=OPENID_PROVIDER_URL + user_info["picture"], alt="Picture")) content.append(P("Your feedback will be made public but in an anonymized form.")) content.append(P(A("Logout", href=f"/logout"))) div = Div(*content, id=HTML_USER_DATA, style="background-color: #f0f0f0; padding: 10px;") return div @rt("/auth_callback") def get(session, code:str=None, state:str=None, error:str=None, error_description:str=None): """ Endpoint that will be called by HF once the user gives (or not) consent :param session: :param code: :param state: :param error: :param error_description: :return: """ # print(session) close_script = Script(""" localStorage.setItem('login_success', 'true'); setTimeout(function() { window.close(); }, 1500); // 3000 milliseconds = 3 seconds """) return_answer = [close_script] if error is not None: print(f"HF OAuth returned an error: {error} {error_description}") ans = Div(P(f"We can't log you in. Huggingface says: {error_description}"), P("Please close this page")) return_answer.append(ans) return Div(*return_answer) # validating the secret sess_secret = session.get(OAUTH_SECRET_SESSION_NAME, None) if sess_secret is None: print("No session secret") return_answer.append(P("access denied")) return Div(*return_answer) if sess_secret != state: msg = f"Mismatch session secret and HF secret: {sess_secret=} {state=}" print(msg) return_answer.append(P("Mismatch session secret and HF secret")) return Div(*return_answer) # Moving on and get the token global SPACE_HOST if "localhost" in SPACE_HOST: space_host = f"http://{SPACE_HOST}/auth_callback/" else: space_host = f"https://{SPACE_HOST}/auth_callback/" auth_header = base64.b64encode(f"{OAUTH_CLIENT_ID}:{OAUTH_CLIENT_SECRET}".encode()).decode() headers = { "Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded", } data = { "client_id": OAUTH_CLIENT_ID, "code": code, "grant_type": "authorization_code", "redirect_uri": space_host, } token_response = requests.post(HF_OAUTH_TOKEN_URL, data=data, headers=headers) if token_response.status_code == 200: tokens = token_response.json() # Here you would typically store the tokens securely and/or use them print("Succsss in getting the token") access_token = tokens["access_token"] user_headers = { "Authorization": f"Bearer {access_token}" } response_userinfo = requests.get(HF_OAUTH_USERINFO, headers=user_headers) if response_userinfo.status_code == 200: user_data = response_userinfo.json() # Set the user data in DB # TODO Is it mildly safe to store user data in session? session[USER_DATA_SESSION_NAME] = user_data session[OAUTH_SECRET_SESSION_NAME] = None # print(user_data) else: print(f"Error while taking the user data: {response_userinfo.text}" ) return_answer.append(P("Error logging in")) return Div(*return_answer) else: print(f"We did not get the tokens: {token_response.text}") return_answer.append(P("Error logging in")) return Div(*return_answer) # print(session) return_answer.append(P("Succes! Close this page.")) save_to_storage( storage.NavigationEvent(event_type="/auth_callback", event_session_id=session["session_id"], event_params={"user_data":user_data}) ) return Div(*return_answer) @rt("/logout") def get(session): session[OAUTH_SECRET_SESSION_NAME] = None save_to_storage( storage.NavigationEvent(event_type="/logout", event_session_id=session["session_id"], event_params={"user_data":session["user_data"]}) ) session["user_data"] = None return RedirectResponse(url="/") def get_user_level(session): """ Gets the user rights level. Basically 0 for not logged in, 2 for logged in. The DEFAULT_ANONYMOUS_USER_LEVEL can raise this level (max operation) :param session: :return: """ ulevel = 0 if session.get(USER_DATA_SESSION_NAME, None) is not None: ulevel = 2 ulevel = max(ulevel, DEFAULT_ANONYMOUS_USER_LEVEL) return ulevel ########## MAIN PAGE @rt("/") def get(session): if 'session_id' not in session: session['session_id'] = str(uuid.uuid4()) session_id = session["session_id"] if len(session_state_table(where=f"session_id=='{session_id}'")) <= 0: # old session ID, "resetting". session['session_id'] = str(uuid.uuid4()) save_to_storage( storage.NavigationEvent(event_type="/", event_session_id=session_id) ) user_data = session.get(USER_DATA_SESSION_NAME, None) if user_data is not None: auth_area = html_render_welcome_user(user_data) else: auth_area = html_render_login_to_get_access_part(session) # might set some secrets here title = Title('C code review for students') preamble = [ auth_area, H1("Evaluate your C code!"), P("Enter your code in the textbox below and wait for answers."), P("!! The data will be saved and maybe made public !!", style="background-color: #f0fff0;"), P("Hint: Don't evaluate more than ~ 1000 LoC."), ] input_area = tl_html_render_inputbox(session, target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA) samples_area = tl_html_get_samples_section() results_feedback_area = tl_html_results_and_feedback_area(session, show_submit_form=False) clear_area = render_clear_area(session_id, HTML_CLEAR_FORM) # print(session) return title, Main( *preamble, input_area, samples_area, results_feedback_area, clear_area) serve()