import json import uuid import os import pathlib from uuid import uuid4 from datetime import datetime import dataclasses from fasthtml.common import * from fastcore.all import typedispatch from sqlite_minutils.db import Database from huggingface_hub import CommitScheduler import eval_code import storage # the secrets. Will be loaded from HF, or for docker --env-file or from IDE OAUTH_CLIENT_ID = os.environ.get('OAUTH_CLIENT_ID') OAUTH_SCOPES = os.environ.get('OAUTH_SCOPES') OAUTH_CLIENT_SECRET = os.environ.get('OAUTH_CLIENT_SECRET') OPENID_PROVIDER_URL = os.environ.get('OPENID_PROVIDER_URL') SPACE_HOST = os.environ.get('SPACE_HOST') HF_DATASET_AUTH_TOKEN = os.environ.get('HF_DATASET_AUTH_TOKEN', "none") assert HF_DATASET_AUTH_TOKEN != "none", "Please set the corrent ENV variables!!!" LOCAL_STORAGE_PATH = "" FILE_EVENTS = f"events-{datetime.utcnow()}-{uuid4()}.json" if "localhost" in SPACE_HOST: DATABASE_NAME = "data/sessions_meta.db" LOCAL_STORAGE_PATH = Path("data/persistent/") pathlib.Path(DATABASE_NAME).unlink(missing_ok=True) else: DATABASE_NAME = "/tmp/cache/sessions_meta.db" LOCAL_STORAGE_PATH = Path("/tmp/cache/persistent") LOCAL_STORAGE_PATH.mkdir(exist_ok=True, parents=True) EVENTS_FILE_PATH = LOCAL_STORAGE_PATH / FILE_EVENTS scheduler = None # scheduler = CommitScheduler( # repo_id="ml-visoft/c-reviewer", # repo_type="dataset", # folder_path=LOCAL_STORAGE_PATH, # every=100, # path_in_repo="raw_data", # token=HF_DATASET_AUTH_TOKEN, # allow_patterns=".jslines", # squash_history=False # ) global_database = Database(DATABASE_NAME) global_database_tables = global_database.t # We store session specific feedback from registered users. # Will be later used to submit/storage the answer. question_evaluation_table = global_database_tables.question_answer if question_evaluation_table not in global_database_tables: question_evaluation_table.create(id=int, code_text=str, answer_eval_text=str, submitted=int, pk='id') Question_Evaluation_cls = question_evaluation_table.dataclass() # We store real-time session IDs and the state of the questions. Will link to question_answer session_state_table = global_database_tables.session_state if session_state_table not in global_database_tables: # session_id stored in cookies # see EVAL_STATUS_x below for state session_state_table.create(id=int, session_id=str, state=int, submitted=datetime, completed=datetime, current_qeval=int, pk='id',) # Can't really nail the fk specs # foreign_keys=[("current_qeval", question_evaluation_table, "id")]) Session_State_cls = session_state_table.dataclass() EVAL_STATE_NEW=0 EVAL_STATE_QUERY=1 EVAL_STATE_TIMEDOUT=2 EVAL_STATE_ANSWER=3 EVAL_STATE_ERROR=4 # Constants to name the various HTML ids in the code HTML_SUBMIT_CODE_AREA = "submit_code_area" HTML_RESULTS_AREA = "prompt_response" HTML_CLEAR_FORM = "clear_the_form" HTML_SUBMIT_FEEDBACK = "submit_feedback" hdrs = (HighlightJS(langs=['python', 'javascript', 'html', 'css']),) if "localhost" in SPACE_HOST: # Are we hacking locally? print("Localhost detected in SPACE_HOST. App started in debug+live mode!") app, rt = fast_app(debug=True, live=True, hdrs=hdrs) REFRESH_TIME = 0.1 else: app, rt = fast_app(debug=False, live=False, hdrs=hdrs) REFRESH_TIME = 1 ################# STORAGE def untyped_save_to_storage(dc, filename): if scheduler is None: return with scheduler.lock: js_str = json.dumps(dataclasses.asdict(dc)) + "\n" with open(filename, "a") as f: f.write(js_str) @typedispatch def save_to_storage(nav_event:storage.NavigationEvent): untyped_save_to_storage(nav_event, EVENTS_FILE_PATH) def validate_and_get_question_evaluation_objectid(session, qe_id:int): if 'session_id' not in session: print("validate_and_get_question_evaluation_objectid bad session data") return None session_id = session["session_id"] state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") if len(state_rows) <= 0: print("validate_and_get_question_evaluation_objectid there is no state") return False, None answer_id = state_rows[0].current_qeval qa_obj_row = question_evaluation_table(limit=1, where=f"id == {answer_id}") if len(qa_obj_row) <= 0: print("validate_and_get_question_evaluation_objectid There is no answer recorded") return False, None qe_obj = qa_obj_row[0] if qe_id != qe_obj.id: print("validate_and_get_question_evaluation_objectid QE {qe_id} does not belong to {qe_obj.id}") return False, None return True, qe_obj def html_create_feedback_updown_button(qe_id, ans_id, selected=0, disabled=False): html_target_id = f"buttons_{ans_id}" colors = ["grey", "blue"] up_col = colors[0] down_col = colors[0] if selected == 1: up_col = colors[1] if selected == -1: down_col = colors[1] toggle_url = f"/toggle_up_down/{qe_id}/{ans_id}/" up = Button("+", hx_put=f"{toggle_url}?which=1", disabled=disabled, hx_swap="outerHTML", hx_target="#" + html_target_id, style=f"background-color:{up_col}") down = Button("-", hx_put=f"{toggle_url}?which=-1", disabled=disabled, hx_swap="outerHTML", hx_target="#" + html_target_id, style=f"background-color:{down_col}") button_row = Div(up, down, _id=html_target_id) return button_row def html_augment_evaluation_text_with_feedback(eval_html, qe_id, ans_id, selected=0): """ Will plot the + / - buttons for feedback. :param eval_html: :param qe_id: :param ans_id: :return: """ buttons = html_create_feedback_updown_button(qe_id, ans_id, selected) final_div = Div(eval_html, buttons, style=" background-color: #f0f0f0;") return final_div @rt("/toggle_up_down/{qe_id}/{ans_id}") def put(session, qe_id:int, ans_id:int, which:int): """ Answer to the +/- button presses :param session: :param qe_id: :param ans_id: :param which: :return: """ print(qe_id, ans_id, which) if which not in {-1, 1}: print(f"The {which=} is bad") return None # if 'session_id' not in session: # print("toggle_up_down bad session data") # return None # session_id = session["session_id"] # state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") # if len(state_rows) <= 0: # return None # answer_id = state_rows[0].current_qeval # qa_obj_row = question_evaluation_table(limit=1, where=f"id == {answer_id}") # if len(qa_obj_row) <= 0: # return None # qe_obj = qa_obj_row[0] # if qe_id != qe_obj.id: # print(f"QE {qe_id} does not belong to {qe_obj.id}") # return None is_ok, qe_obj = validate_and_get_question_evaluation_objectid(session, qe_id) if not is_ok: print("toggle_up_down made session/object error") return "Error" # save_to_storage( # storage.NavigationEvent(event_type="/", event_session_id=session_id, event_params={"qe_id":qe_id}) # ) answer_eval_js = json.loads(qe_obj.answer_eval_text) crt_selection = answer_eval_js[ans_id]["EVAL"] input_button = which out_selection = (input_button if crt_selection == 0 else (0 if crt_selection == input_button else input_button)) print(f"out selection: {out_selection}") # store it back in DB answer_eval_js[ans_id]["EVAL"] = out_selection qe_obj.answer_eval_text = answer_eval_js qe_obj.submitted = False # mark object as dirty question_evaluation_table.upsert(qe_obj) buttons= html_create_feedback_updown_button(qe_id, ans_id, selected=out_selection) return buttons def html_get_textual_feedback_form(qe_obj, thank=False): if thank: ph = "Thank you!" else: ph = "Write your general feedback here" form = Form(Input(name="freeform_feedback", placeholder=ph), Button("Submit", disabled=(qe_obj.submitted == 1)), hx_post=f"/submit_feedback/{qe_obj.id}", hx_target="#" + HTML_SUBMIT_FEEDBACK, hx_swap="outerHTML",) div = Div(P("Give us a general feedback for the evaluation (optional)"), form, id=HTML_SUBMIT_FEEDBACK) return div @rt("/submit_feedback/{qe_id}") def post(session, qe_id:int, freeform_feedback:str): # Update the object # session_id = session.get("session_id", "Not set") # save_to_storage( # storage.NavigationEvent(event_type="/submit_feedback", event_session_id=session_id, # event_params={"qe_id":qe_id}) # ) is_ok, qe_obj = validate_and_get_question_evaluation_objectid(session, qe_id) if not is_ok: print("submit_feedback made session/object error") return "Error" answer_eval_js = json.loads(qe_obj.answer_eval_text) answer_eval_js[0]["explanation"] = freeform_feedback qe_obj.submitted = True question_evaluation_table.upsert(qe_obj) return html_get_textual_feedback_form(qe_obj, thank=True) def html_format_code_review_form(qe_obj, html_id=""): """ Formats the code review, adding fields for feedback if it is required. :param feedback_js: :param c_code: :param html_id: :return: """ c_code = qe_obj.code_text enhanced_answer = json.loads(qe_obj.answer_eval_text) list_of_citerias = [] for caug_code, caug_txt in eval_code.CODE_AUGMENTATIONS: crit_tag = [H3(caug_code), P(caug_txt)] list_of_citerias.extend(crit_tag) # yeah, I know . . . for k, eval_line in enumerate(enhanced_answer): if caug_code == eval_line["criteria"]: eval_txt = P(eval_line["explanation"]) eval_txt_fb = html_augment_evaluation_text_with_feedback(eval_txt, qe_obj.id, k) list_of_citerias.append(eval_txt_fb) textual_feedback = html_get_textual_feedback_form(qe_obj) return Div(html_render_code_output(c_code), *list_of_citerias, textual_feedback, _id=html_id) def html_default_results(html_id): return Div(P("This is where criterias will show up once the code is evaluated"), _id=html_id) def html_waiting_for_results(html_id): return Div(P("Working . . ."), _id=html_id, hx_get=f"/render_answer", hx_trigger = f"every {REFRESH_TIME}s", hx_swap = "outerHTML", ) def get_latest_eval_request_status(session_id): state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") if len(state_rows) <= 0: return EVAL_STATE_NEW, None state_obj = state_rows[0] if state_obj.state in {EVAL_STATE_NEW, EVAL_STATE_QUERY, EVAL_STATE_ANSWER}: return state_obj.state, state_obj return EVAL_STATE_ERROR, state_obj def html_error_results(message, html_id): div = Div(P("There was an error:", P(message)), _id=html_id) return div def html_render_answer_from_db(session_id, html_id): eval_request_status, state_obj = get_latest_eval_request_status(session_id) # state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") # print(eval_request_status, state_obj) if eval_request_status == EVAL_STATE_NEW: return html_default_results(html_id), #, html_render_inputbox(HTML_RESULTS_AREA, HTML_SUBMIT_CODE_AREA) if eval_request_status == EVAL_STATE_ANSWER: qe_obj_lst = question_evaluation_table(limit=1, where=f"id == {state_obj.current_qeval}") if len(qe_obj_lst) < 1: print(f"Object id {state_obj.current_qeval} can't be found in question_evaluation_table") return (None,) qe_obj = qe_obj_lst[0] return (html_format_code_review_form(qe_obj, html_id), html_render_inputbox(target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA)) #, html_render_code_output(HTML_SUBMIT_CODE_AREA, state_obj.code) if eval_request_status == EVAL_STATE_QUERY: return html_waiting_for_results(html_id), return html_error_results(state_obj.answer, html_id), #, html_render_code_output(HTML_SUBMIT_CODE_AREA, state_obj.code) # How can I timeout? Well ... TBD. @threaded def call_gpt_and_store_result(session_obj_id, code_to_check): """ Threaded function that will submit code to LLM and wait for the answer. Communication with "main" thread is through db. All parameters must be pickable. :param session_obj_id: :param code_to_check: :return: """ # TODO refactor considering new join! # print("evaluatign code") try: # Pesky way to get a new cursor, in a thread safe way, into the db. This code runs in another thread. # Can we do better? local_database = Database(DATABASE_NAME) local_sess_state = local_database.t.session_state local_sess_state_cls = local_sess_state.dataclass() local_sess_obj_lst = local_sess_state(limit=1, where=f"id == {session_obj_id}") local_sess_obj = local_sess_obj_lst[0] # Trigger the lenghtly operation enhanced_answer = eval_code.eval_the_piece_of_c_code(openai_client=None, ccode=code_to_check) # we create a new QA entry. qa_obj = Question_Evaluation_cls(code_text=code_to_check, answer_eval_text=enhanced_answer, submitted=0) qa_obj = question_evaluation_table.insert(qa_obj) local_sess_obj.current_qeval = qa_obj.id # TODO save the outcome in a table, where it will be backed-up later. SqlLite is volatile. if "error" in enhanced_answer: local_sess_obj.state = EVAL_STATE_ERROR local_sess_obj.answer = enhanced_answer["error"] local_sess_obj.completed = datetime.utcnow() else: local_sess_obj.state = EVAL_STATE_ANSWER local_sess_obj.completed = datetime.utcnow() local_sess_state.update(local_sess_obj) except: import traceback traceback.print_exc() def html_render_inputbox(target_html_id, region_html_id): txtarea = Textarea(id="ccodetoeval", name="ccodetoeval", placeholder="Enter a piece of C code", rows=3) form = Form(Group(txtarea, Button("Evaluate")), hx_post="/submit_to_eval", hx_swap="outerHTML", target_id=target_html_id ) return Div(form, _id=region_html_id, hx_swap_oob='true') def html_render_code_output(code): txtarea = Pre(Code(code)) return txtarea def render_conditional_inputbox_results(session_id): eval_request_status, _ = get_latest_eval_request_status(session_id) pass def html_render_clear_area_button(html_id): button = Button("Clear form", hx_get="/clear_area", hx_swap="outerHTML", target_id=HTML_RESULTS_AREA, ) div = Div(button, _id=html_id, hx_swap_oob='true') return div def render_clear_area(session_id, html_id): # return html_render_clear_area_button(html_id) eval_request_status, _ = get_latest_eval_request_status(session_id) if eval_request_status != EVAL_STATE_NEW: # print("clear button: render button") return html_render_clear_area_button(html_id) else: # print("clear button: render empty") return Div(P(""), _id=html_id, hx_swap_oob='true') @rt("/") def get(session): print(session) if 'session_id' not in session: session['session_id'] = str(uuid.uuid4()) session_id = session["session_id"] save_to_storage( storage.NavigationEvent(event_type="/", event_session_id=session_id) ) title = Title('C code review for students') preamble = [H1("Evaluate your C code!"), P("Enter your code in the textbox below and wait for answers."), P("!! The data will be saved and maybe made public !!"), ] # ############ # # !!! FOR DEBUGGING PURPOSES !!! # # # insert an answer in db. # code = """ # #include # int main() { # // printf() displays the string inside quotation # printf("Hello, World!"); # return 0; # } # """ # answer = eval_code.eval_the_piece_of_c_code(None, None) # enhanced_answer = eval_code.add_evaluation_fields_on_js_answer(answer, CODE_AUGMENTATIONS) # session_obj = Session_State_cls( # session_id=session_id, # state=EVAL_STATE_ANSWER, # submitted=datetime.utcnow(), # completed=datetime.utcnow(), # ) # # # we create a new QA entry. # qa_obj = Question_Evaluation_cls(code_text=code, # answer_eval_text=enhanced_answer, submitted=0) # qa_obj = question_evaluation_table.insert(qa_obj) # session_obj.current_qeval = qa_obj.id # session_obj = session_state_table.insert(session_obj) # # ############ input_area = html_render_inputbox(target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA) results_area = html_render_answer_from_db(session_id, HTML_RESULTS_AREA) clear_area = render_clear_area(session_id, HTML_CLEAR_FORM) return title, Main(*preamble, input_area, results_area, clear_area) @rt("/render_answer") def get(session): if 'session_id' not in session: return "render_answer No session ID" session_id = session["session_id"] answer_area = html_render_answer_from_db(session_id, HTML_RESULTS_AREA) return answer_area @rt("/submit_to_eval", methods="post") def post(session, ccodetoeval:str): print(session) if 'session_id' not in session: return P("submit_to_eval. Bad call. No session ID") session_id = session["session_id"] session_obj = Session_State_cls( session_id=session_id, state=EVAL_STATE_QUERY, submitted=datetime.utcnow(), ) # we insert and we get the new primary key session_obj = session_state_table.insert(session_obj) # will be executed in another thread with magic @threaded call_gpt_and_store_result(session_obj.id, ccodetoeval) return (*html_render_answer_from_db(session_id, HTML_RESULTS_AREA), render_clear_area(session_id, HTML_CLEAR_FORM) ) @rt("/clear_area", methods="get") def get(session): if 'session_id' not in session: return P("clear_area. Bad call. No session ID") session_id = session["session_id"] save_to_storage( storage.NavigationEvent(event_type="/clear_area", event_session_id=session_id) ) # insert a row to "cancel"/reset the current request session_obj = Session_State_cls( session_id=session_id, state=EVAL_STATE_NEW, submitted=datetime.utcnow(), ) session_state_table.insert(session_obj) # re-issue forms input_area = html_render_inputbox(target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA) results_area = html_render_answer_from_db(session_id, HTML_RESULTS_AREA) clear_area = render_clear_area(session_id, HTML_CLEAR_FORM) print(results_area) return *results_area, input_area, clear_area ## This code is for reference, for OAuth. BIG PITA, will be added later. # # js_hf_imports = \ # """ # { # "imports": { # "@huggingface/hub": "https://cdn.jsdelivr.net/npm/@huggingface/hub@0.13.0/+esm" # } # } # """ # # js_block_hf_auth = \ # """ # import { oauthLoginUrl, oauthHandleRedirectIfPresent } from "@huggingface/hub"; # console.log("huggingface env", window.huggingface); # let oauthResult = localStorage.getItem("oauth"); # if (oauthResult) { # try { # oauthResult = JSON.parse(oauthResult); # } catch { # oauthResult = null; # } # } # # oauthResult ||= await oauthHandleRedirectIfPresent(); # if (oauthResult) { # document.querySelector("pre").textContent = JSON.stringify(oauthResult, null, 2); # localStorage.setItem("oauth", JSON.stringify(oauthResult)); # document.getElementById("signout").style.removeProperty("display"); # document.getElementById("signout").onclick = async function() { # localStorage.removeItem("oauth"); # window.location.href = window.location.href.replace(/\?.*$/, ''); # window.location.reload(); # } # } else { # document.getElementById("signin").style.removeProperty("display"); # document.getElementById("signin").onclick = async function() { # // prompt=consent to re-trigger the consent screen instead of silently redirecting # window.location.href = (await oauthLoginUrl({scopes: window.huggingface.variables.OAUTH_SCOPES})) + "&prompt=consent"; # } # } # """ # # js_block_hf_auth2 = \ # """ # import { oauthLoginUrl, oauthHandleRedirectIfPresent } from "@huggingface/hub"; # let oauthResult = localStorage.getItem("oauth"); # if (oauthResult) { # try { # oauthResult = JSON.parse(oauthResult); # } catch { # oauthResult = null; # } # } # # console.log("OAuth result", oauthResult); # oauthResult ||= await oauthHandleRedirectIfPresent(); # # if (oauthResult) { # document.querySelector("pre").textContent = JSON.stringify(oauthResult, null, 2); # localStorage.setItem("oauth", JSON.stringify(oauthResult)); # console.log("There is an oauth result", oauthResult); # document.getElementById("signout").style.removeProperty("display"); # document.getElementById("signout").onclick = async function() { # localStorage.removeItem("oauth"); # window.location.href = window.location.href.replace(/\?.*$/, ''); # window.location.reload(); # } # # } else { # console.log("No OAuth result. Setting the loging button event"); # document.getElementById("signin").style.removeProperty("display"); # document.getElementById("signin").onclick = async function() { # // prompt=consent to re-trigger the consent screen instead of silently redirecting # window.location.href = window.hf_redirect_url; # } # } # """ # # # def get_hf_user_data(): # pass # @rt('/', methods="get") # def get(): # global SPACE_HOST # if "localhost" in SPACE_HOST: # space_host = f"htpp://{SPACE_HOST}/" # else: # space_host = f"https://{SPACE_HOST}/" # hf_redirect_url_func = ( # f"import {{ oauthLoginUrl, }} from '@huggingface/hub'; \n" # f"window.hf_redirect_url = await oauthLoginUrl({{clientId:'{OAUTH_CLIENT_ID}',redirectUrl:'{space_host}'," # f"scopes:'{OAUTH_SCOPES}'}}) + '&prompt=consent';\n" # f"console.log(window.hf_redirect_url);") # # header = Head(Title("C code reviewing"), # Script(src="https://unpkg.com/es-module-shims@1.7.0/dist/es-module-shims.js"), # Script(js_hf_imports, type="importmap"), # ) # content = Body(Div(P("Some content!")), # A(f"Space host: {space_host}", href=space_host), # Pre(""), # Script(hf_redirect_url_func, type="module"), # Img(src="https://huggingface.co/datasets/huggingface/badges/resolve/main/sign-in-with-huggingface-xl-dark.svg", # alt="Sign in with Hugging Face", # style="cursor: pointer; display: none;", # _id="signin", name=None), # Button("Sign out", _id="signout", style="display: none", name=None), # A("Authenticate!", href="authorized"), # Script(js_block_hf_auth2, type="module")) # full_page = Html(header, content) # return full_page # @rt('/change') # def get(): return P('Nice to be here!') # @rt('/authorized', methods="get") # def authorized(): # content = Body(P("Do we have authenticated user?")) # return content serve()