c-reviewer / main.py
ml-visoft's picture
Trying to activate backup
15040e6
raw
history blame
24 kB
import json
import uuid
import os
import pathlib
from uuid import uuid4
from datetime import datetime
import dataclasses
from fasthtml.common import *
from fastcore.all import typedispatch
from sqlite_minutils.db import Database
from huggingface_hub import CommitScheduler
import eval_code
import storage
# the secrets. Will be loaded from HF, or for docker --env-file or from IDE
OAUTH_CLIENT_ID = os.environ.get('OAUTH_CLIENT_ID')
OAUTH_SCOPES = os.environ.get('OAUTH_SCOPES')
OAUTH_CLIENT_SECRET = os.environ.get('OAUTH_CLIENT_SECRET')
OPENID_PROVIDER_URL = os.environ.get('OPENID_PROVIDER_URL')
SPACE_HOST = os.environ.get('SPACE_HOST')
HF_DATASET_AUTH_TOKEN = os.environ.get('HF_DATASET_AUTH_TOKEN', "none")
assert HF_DATASET_AUTH_TOKEN != "none", "Please set the corrent ENV variables!!!"
LOCAL_STORAGE_PATH = ""
FILE_EVENTS = f"events-{datetime.utcnow()}-{uuid4()}.jsonl"
if "localhost" in SPACE_HOST:
DATABASE_NAME = "data/sessions_meta.db"
LOCAL_STORAGE_PATH = Path("data/persistent/")
pathlib.Path(DATABASE_NAME).unlink(missing_ok=True)
else:
DATABASE_NAME = "/tmp/cache/sessions_meta.db"
LOCAL_STORAGE_PATH = Path("/tmp/cache/persistent")
LOCAL_STORAGE_PATH.mkdir(exist_ok=True, parents=True)
EVENTS_FILE_PATH = LOCAL_STORAGE_PATH / FILE_EVENTS
scheduler = None
try:
scheduler = CommitScheduler(
repo_id="ml-visoft/c-reviewer",
repo_type="dataset",
folder_path=LOCAL_STORAGE_PATH,
every=5,
path_in_repo="raw_data",
token=HF_DATASET_AUTH_TOKEN,
squash_history=False
)
except:
import traceback
traceback.print_exc()
global_database = Database(DATABASE_NAME)
global_database_tables = global_database.t
# We store session specific feedback from registered users.
# Will be later used to submit/storage the answer.
question_evaluation_table = global_database_tables.question_answer
if question_evaluation_table not in global_database_tables:
question_evaluation_table.create(id=int, code_text=str,
answer_eval_text=str, submitted=int, pk='id')
Question_Evaluation_cls = question_evaluation_table.dataclass()
# We store real-time session IDs and the state of the questions. Will link to question_answer
session_state_table = global_database_tables.session_state
if session_state_table not in global_database_tables:
# session_id stored in cookies
# see EVAL_STATUS_x below for state
session_state_table.create(id=int, session_id=str, state=int, submitted=datetime,
completed=datetime, current_qeval=int, pk='id',)
# Can't really nail the fk specs
# foreign_keys=[("current_qeval", question_evaluation_table, "id")])
Session_State_cls = session_state_table.dataclass()
EVAL_STATE_NEW=0
EVAL_STATE_QUERY=1
EVAL_STATE_TIMEDOUT=2
EVAL_STATE_ANSWER=3
EVAL_STATE_ERROR=4
# Constants to name the various HTML ids in the code
HTML_SUBMIT_CODE_AREA = "submit_code_area"
HTML_RESULTS_AREA = "prompt_response"
HTML_CLEAR_FORM = "clear_the_form"
HTML_SUBMIT_FEEDBACK = "submit_feedback"
hdrs = (HighlightJS(langs=['python', 'javascript', 'html', 'css']),)
if "localhost" in SPACE_HOST:
# Are we hacking locally?
print("Localhost detected in SPACE_HOST. App started in debug+live mode!")
app, rt = fast_app(debug=True, live=True, hdrs=hdrs)
REFRESH_TIME = 0.1
else:
app, rt = fast_app(debug=False, live=False, hdrs=hdrs)
REFRESH_TIME = 1
################# STORAGE
def untyped_save_to_storage(dc, filename):
if scheduler is None: return
with scheduler.lock:
js_str = json.dumps(dataclasses.asdict(dc)) + "\n"
with open(filename, "a") as f:
f.write(js_str)
@typedispatch
def save_to_storage(nav_event:storage.NavigationEvent):
untyped_save_to_storage(nav_event, EVENTS_FILE_PATH)
def validate_and_get_question_evaluation_objectid(session, qe_id:int):
if 'session_id' not in session:
print("validate_and_get_question_evaluation_objectid bad session data")
return None
session_id = session["session_id"]
state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
if len(state_rows) <= 0:
print("validate_and_get_question_evaluation_objectid there is no state")
return False, None
answer_id = state_rows[0].current_qeval
qa_obj_row = question_evaluation_table(limit=1, where=f"id == {answer_id}")
if len(qa_obj_row) <= 0:
print("validate_and_get_question_evaluation_objectid There is no answer recorded")
return False, None
qe_obj = qa_obj_row[0]
if qe_id != qe_obj.id:
print("validate_and_get_question_evaluation_objectid QE {qe_id} does not belong to {qe_obj.id}")
return False, None
return True, qe_obj
def html_create_feedback_updown_button(qe_id, ans_id, selected=0, disabled=False):
html_target_id = f"buttons_{ans_id}"
colors = ["grey", "blue"]
up_col = colors[0]
down_col = colors[0]
if selected == 1: up_col = colors[1]
if selected == -1: down_col = colors[1]
toggle_url = f"/toggle_up_down/{qe_id}/{ans_id}/"
up = Button("+", hx_post=f"{toggle_url}?which=1", disabled=disabled, hx_swap="outerHTML",
hx_target="#" + html_target_id, style=f"background-color:{up_col}")
down = Button("-", hx_post=f"{toggle_url}?which=-1", disabled=disabled, hx_swap="outerHTML",
hx_target="#" + html_target_id, style=f"background-color:{down_col}")
button_row = Div(up, down, _id=html_target_id)
return button_row
def html_augment_evaluation_text_with_feedback(eval_html, qe_id, ans_id, selected=0):
"""
Will plot the + / - buttons for feedback.
:param eval_html:
:param qe_id:
:param ans_id:
:return:
"""
buttons = html_create_feedback_updown_button(qe_id, ans_id, selected)
final_div = Div(eval_html, buttons, style=" background-color: #f0f0f0;")
return final_div
@rt("/toggle_up_down/{qe_id}/{ans_id}")
def post(session, qe_id:int, ans_id:int, which:int):
"""
Answer to the +/- button presses
:param session:
:param qe_id:
:param ans_id:
:param which:
:return:
"""
print(qe_id, ans_id, which)
if which not in {-1, 1}:
print(f"The {which=} is bad")
return None
print(f"{qe_id=} {ans_id=} {which=}")
# if 'session_id' not in session:
# print("toggle_up_down bad session data")
# return None
# session_id = session["session_id"]
# state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
# if len(state_rows) <= 0:
# return None
# answer_id = state_rows[0].current_qeval
# qa_obj_row = question_evaluation_table(limit=1, where=f"id == {answer_id}")
# if len(qa_obj_row) <= 0:
# return None
# qe_obj = qa_obj_row[0]
# if qe_id != qe_obj.id:
# print(f"QE {qe_id} does not belong to {qe_obj.id}")
# return None
is_ok, qe_obj = validate_and_get_question_evaluation_objectid(session, qe_id)
if not is_ok:
print("toggle_up_down made session/object error")
return "Error"
# save_to_storage(
# storage.NavigationEvent(event_type="/", event_session_id=session_id, event_params={"qe_id":qe_id})
# )
answer_eval_js = json.loads(qe_obj.answer_eval_text)
crt_selection = answer_eval_js[ans_id]["EVAL"]
input_button = which
out_selection = (input_button if crt_selection == 0 else (0 if crt_selection == input_button else input_button))
print(f"out selection: {out_selection}")
# store it back in DB
answer_eval_js[ans_id]["EVAL"] = out_selection
qe_obj.answer_eval_text = answer_eval_js
qe_obj.submitted = False # mark object as dirty
question_evaluation_table.upsert(qe_obj)
buttons= html_create_feedback_updown_button(qe_id, ans_id, selected=out_selection)
return buttons
def html_get_textual_feedback_form(qe_obj, thank=False):
if thank:
ph = "Thank you!"
else:
ph = "Write your general feedback here"
form = Form(Input(name="freeform_feedback", placeholder=ph),
Button("Submit", disabled=(qe_obj.submitted == 1)), hx_post=f"/submit_feedback/{qe_obj.id}",
hx_target="#" + HTML_SUBMIT_FEEDBACK, hx_swap="outerHTML",)
div = Div(P("Give us a general feedback for the evaluation (optional)"), form, id=HTML_SUBMIT_FEEDBACK)
return div
@rt("/submit_feedback/{qe_id}")
def post(session, qe_id:int, freeform_feedback:str):
# Update the object
# session_id = session.get("session_id", "Not set")
# save_to_storage(
# storage.NavigationEvent(event_type="/submit_feedback", event_session_id=session_id,
# event_params={"qe_id":qe_id})
# )
is_ok, qe_obj = validate_and_get_question_evaluation_objectid(session, qe_id)
if not is_ok:
print("submit_feedback made session/object error")
return "Error"
answer_eval_js = json.loads(qe_obj.answer_eval_text)
answer_eval_js[0]["explanation"] = freeform_feedback
qe_obj.submitted = True
question_evaluation_table.upsert(qe_obj)
return html_get_textual_feedback_form(qe_obj, thank=True)
def html_format_code_review_form(qe_obj, html_id=""):
"""
Formats the code review, adding fields for feedback if it is required.
:param feedback_js:
:param c_code:
:param html_id:
:return:
"""
c_code = qe_obj.code_text
enhanced_answer = json.loads(qe_obj.answer_eval_text)
list_of_citerias = []
for caug_code, caug_txt in eval_code.CODE_AUGMENTATIONS:
crit_tag = [H3(caug_code), P(caug_txt)]
list_of_citerias.extend(crit_tag)
# yeah, I know . . .
for k, eval_line in enumerate(enhanced_answer):
if caug_code == eval_line["criteria"]:
eval_txt = P(eval_line["explanation"])
eval_txt_fb = html_augment_evaluation_text_with_feedback(eval_txt, qe_obj.id, k)
list_of_citerias.append(eval_txt_fb)
textual_feedback = html_get_textual_feedback_form(qe_obj)
return Div(html_render_code_output(c_code), *list_of_citerias, textual_feedback, _id=html_id)
def html_default_results(html_id):
return Div(P("This is where criterias will show up once the code is evaluated"), _id=html_id)
def html_waiting_for_results(html_id):
return Div(P("Working . . ."), _id=html_id,
hx_get=f"/render_answer",
hx_trigger = f"every {REFRESH_TIME}s",
hx_swap = "outerHTML",
)
def get_latest_eval_request_status(session_id):
state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
if len(state_rows) <= 0:
return EVAL_STATE_NEW, None
state_obj = state_rows[0]
if state_obj.state in {EVAL_STATE_NEW, EVAL_STATE_QUERY, EVAL_STATE_ANSWER}:
return state_obj.state, state_obj
return EVAL_STATE_ERROR, state_obj
def html_error_results(message, html_id):
div = Div(P("There was an error:", P(message)), _id=html_id)
return div
def html_render_answer_from_db(session_id, html_id):
eval_request_status, state_obj = get_latest_eval_request_status(session_id)
# state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC")
# print(eval_request_status, state_obj)
if eval_request_status == EVAL_STATE_NEW:
return html_default_results(html_id), #, html_render_inputbox(HTML_RESULTS_AREA, HTML_SUBMIT_CODE_AREA)
if eval_request_status == EVAL_STATE_ANSWER:
qe_obj_lst = question_evaluation_table(limit=1, where=f"id == {state_obj.current_qeval}")
if len(qe_obj_lst) < 1:
print(f"Object id {state_obj.current_qeval} can't be found in question_evaluation_table")
return (None,)
qe_obj = qe_obj_lst[0]
return (html_format_code_review_form(qe_obj, html_id),
html_render_inputbox(target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA)) #, html_render_code_output(HTML_SUBMIT_CODE_AREA, state_obj.code)
if eval_request_status == EVAL_STATE_QUERY:
return html_waiting_for_results(html_id),
return html_error_results(state_obj.answer, html_id), #, html_render_code_output(HTML_SUBMIT_CODE_AREA, state_obj.code)
# How can I timeout? Well ... TBD.
@threaded
def call_gpt_and_store_result(session_obj_id, code_to_check):
"""
Threaded function that will submit code to LLM and wait for the answer.
Communication with "main" thread is through db.
All parameters must be pickable.
:param session_obj_id:
:param code_to_check:
:return:
"""
# TODO refactor considering new join!
# print("evaluatign code")
try:
# Pesky way to get a new cursor, in a thread safe way, into the db. This code runs in another thread.
# Can we do better?
local_database = Database(DATABASE_NAME)
local_sess_state = local_database.t.session_state
local_sess_state_cls = local_sess_state.dataclass()
local_sess_obj_lst = local_sess_state(limit=1, where=f"id == {session_obj_id}")
local_sess_obj = local_sess_obj_lst[0]
# Trigger the lenghtly operation
enhanced_answer = eval_code.eval_the_piece_of_c_code(openai_client=None, ccode=code_to_check)
# we create a new QA entry.
qa_obj = Question_Evaluation_cls(code_text=code_to_check, answer_eval_text=enhanced_answer, submitted=0)
qa_obj = question_evaluation_table.insert(qa_obj)
local_sess_obj.current_qeval = qa_obj.id
# TODO save the outcome in a table, where it will be backed-up later. SqlLite is volatile.
if "error" in enhanced_answer:
local_sess_obj.state = EVAL_STATE_ERROR
local_sess_obj.answer = enhanced_answer["error"]
local_sess_obj.completed = datetime.utcnow()
else:
local_sess_obj.state = EVAL_STATE_ANSWER
local_sess_obj.completed = datetime.utcnow()
local_sess_state.update(local_sess_obj)
except:
import traceback
traceback.print_exc()
def html_render_inputbox(target_html_id, region_html_id):
txtarea = Textarea(id="ccodetoeval", name="ccodetoeval", placeholder="Enter a piece of C code", rows=3)
form = Form(Group(txtarea, Button("Evaluate")),
hx_post="/submit_to_eval",
hx_swap="outerHTML",
target_id=target_html_id
)
return Div(form, _id=region_html_id, hx_swap_oob='true')
def html_render_code_output(code):
txtarea = Pre(Code(code))
return txtarea
def render_conditional_inputbox_results(session_id):
eval_request_status, _ = get_latest_eval_request_status(session_id)
pass
def html_render_clear_area_button(html_id):
button = Button("Clear form",
hx_get="/clear_area",
hx_swap="outerHTML",
target_id=HTML_RESULTS_AREA,
)
div = Div(button, _id=html_id, hx_swap_oob='true')
return div
def render_clear_area(session_id, html_id):
# return html_render_clear_area_button(html_id)
eval_request_status, _ = get_latest_eval_request_status(session_id)
if eval_request_status != EVAL_STATE_NEW:
# print("clear button: render button")
return html_render_clear_area_button(html_id)
else:
# print("clear button: render empty")
return Div(P(""), _id=html_id, hx_swap_oob='true')
@rt("/")
def get(session):
print(session)
if 'session_id' not in session:
session['session_id'] = str(uuid.uuid4())
session_id = session["session_id"]
save_to_storage(
storage.NavigationEvent(event_type="/", event_session_id=session_id)
)
title = Title('C code review for students')
preamble = [H1("Evaluate your C code!"),
P("Enter your code in the textbox below and wait for answers."),
P("!! The data will be saved and maybe made public !!"),
]
# ############
# # !!! FOR DEBUGGING PURPOSES !!!
#
# # insert an answer in db.
# code = """
# #include <stdio.h>
# int main() {
# // printf() displays the string inside quotation
# printf("Hello, World!");
# return 0;
# }
# """
# answer = eval_code.eval_the_piece_of_c_code(None, None)
# enhanced_answer = eval_code.add_evaluation_fields_on_js_answer(answer, CODE_AUGMENTATIONS)
# session_obj = Session_State_cls(
# session_id=session_id,
# state=EVAL_STATE_ANSWER,
# submitted=datetime.utcnow(),
# completed=datetime.utcnow(),
# )
#
# # we create a new QA entry.
# qa_obj = Question_Evaluation_cls(code_text=code,
# answer_eval_text=enhanced_answer, submitted=0)
# qa_obj = question_evaluation_table.insert(qa_obj)
# session_obj.current_qeval = qa_obj.id
# session_obj = session_state_table.insert(session_obj)
#
# ############
input_area = html_render_inputbox(target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA)
results_area = html_render_answer_from_db(session_id, HTML_RESULTS_AREA)
clear_area = render_clear_area(session_id, HTML_CLEAR_FORM)
return title, Main(*preamble, input_area, results_area, clear_area)
@rt("/render_answer")
def get(session):
if 'session_id' not in session: return "render_answer No session ID"
session_id = session["session_id"]
answer_area = html_render_answer_from_db(session_id, HTML_RESULTS_AREA)
return answer_area
@rt("/submit_to_eval", methods="post")
def post(session, ccodetoeval:str):
print(session)
if 'session_id' not in session:
return P("submit_to_eval. Bad call. No session ID")
session_id = session["session_id"]
session_obj = Session_State_cls(
session_id=session_id,
state=EVAL_STATE_QUERY,
submitted=datetime.utcnow(),
)
# we insert and we get the new primary key
session_obj = session_state_table.insert(session_obj)
# will be executed in another thread with magic @threaded
call_gpt_and_store_result(session_obj.id, ccodetoeval)
return (*html_render_answer_from_db(session_id, HTML_RESULTS_AREA),
render_clear_area(session_id, HTML_CLEAR_FORM)
)
@rt("/clear_area", methods="get")
def get(session):
if 'session_id' not in session: return P("clear_area. Bad call. No session ID")
session_id = session["session_id"]
save_to_storage(
storage.NavigationEvent(event_type="/clear_area", event_session_id=session_id)
)
# insert a row to "cancel"/reset the current request
session_obj = Session_State_cls(
session_id=session_id,
state=EVAL_STATE_NEW,
submitted=datetime.utcnow(),
)
session_state_table.insert(session_obj)
# re-issue forms
input_area = html_render_inputbox(target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA)
results_area = html_render_answer_from_db(session_id, HTML_RESULTS_AREA)
clear_area = render_clear_area(session_id, HTML_CLEAR_FORM)
print(results_area)
return *results_area, input_area, clear_area
## This code is for reference, for OAuth. BIG PITA, will be added later.
#
# js_hf_imports = \
# """
# {
# "imports": {
# "@huggingface/hub": "https://cdn.jsdelivr.net/npm/@huggingface/hub@0.13.0/+esm"
# }
# }
# """
#
# js_block_hf_auth = \
# """
# import { oauthLoginUrl, oauthHandleRedirectIfPresent } from "@huggingface/hub";
# console.log("huggingface env", window.huggingface);
# let oauthResult = localStorage.getItem("oauth");
# if (oauthResult) {
# try {
# oauthResult = JSON.parse(oauthResult);
# } catch {
# oauthResult = null;
# }
# }
#
# oauthResult ||= await oauthHandleRedirectIfPresent();
# if (oauthResult) {
# document.querySelector("pre").textContent = JSON.stringify(oauthResult, null, 2);
# localStorage.setItem("oauth", JSON.stringify(oauthResult));
# document.getElementById("signout").style.removeProperty("display");
# document.getElementById("signout").onclick = async function() {
# localStorage.removeItem("oauth");
# window.location.href = window.location.href.replace(/\?.*$/, '');
# window.location.reload();
# }
# } else {
# document.getElementById("signin").style.removeProperty("display");
# document.getElementById("signin").onclick = async function() {
# // prompt=consent to re-trigger the consent screen instead of silently redirecting
# window.location.href = (await oauthLoginUrl({scopes: window.huggingface.variables.OAUTH_SCOPES})) + "&prompt=consent";
# }
# }
# """
#
# js_block_hf_auth2 = \
# """
# import { oauthLoginUrl, oauthHandleRedirectIfPresent } from "@huggingface/hub";
# let oauthResult = localStorage.getItem("oauth");
# if (oauthResult) {
# try {
# oauthResult = JSON.parse(oauthResult);
# } catch {
# oauthResult = null;
# }
# }
#
# console.log("OAuth result", oauthResult);
# oauthResult ||= await oauthHandleRedirectIfPresent();
#
# if (oauthResult) {
# document.querySelector("pre").textContent = JSON.stringify(oauthResult, null, 2);
# localStorage.setItem("oauth", JSON.stringify(oauthResult));
# console.log("There is an oauth result", oauthResult);
# document.getElementById("signout").style.removeProperty("display");
# document.getElementById("signout").onclick = async function() {
# localStorage.removeItem("oauth");
# window.location.href = window.location.href.replace(/\?.*$/, '');
# window.location.reload();
# }
#
# } else {
# console.log("No OAuth result. Setting the loging button event");
# document.getElementById("signin").style.removeProperty("display");
# document.getElementById("signin").onclick = async function() {
# // prompt=consent to re-trigger the consent screen instead of silently redirecting
# window.location.href = window.hf_redirect_url;
# }
# }
# """
#
#
# def get_hf_user_data():
# pass
# @rt('/', methods="get")
# def get():
# global SPACE_HOST
# if "localhost" in SPACE_HOST:
# space_host = f"htpp://{SPACE_HOST}/"
# else:
# space_host = f"https://{SPACE_HOST}/"
# hf_redirect_url_func = (
# f"import {{ oauthLoginUrl, }} from '@huggingface/hub'; \n"
# f"window.hf_redirect_url = await oauthLoginUrl({{clientId:'{OAUTH_CLIENT_ID}',redirectUrl:'{space_host}',"
# f"scopes:'{OAUTH_SCOPES}'}}) + '&prompt=consent';\n"
# f"console.log(window.hf_redirect_url);")
#
# header = Head(Title("C code reviewing"),
# Script(src="https://unpkg.com/es-module-shims@1.7.0/dist/es-module-shims.js"),
# Script(js_hf_imports, type="importmap"),
# )
# content = Body(Div(P("Some content!")),
# A(f"Space host: {space_host}", href=space_host),
# Pre(""),
# Script(hf_redirect_url_func, type="module"),
# Img(src="https://huggingface.co/datasets/huggingface/badges/resolve/main/sign-in-with-huggingface-xl-dark.svg",
# alt="Sign in with Hugging Face",
# style="cursor: pointer; display: none;",
# _id="signin", name=None),
# Button("Sign out", _id="signout", style="display: none", name=None),
# A("Authenticate!", href="authorized"),
# Script(js_block_hf_auth2, type="module"))
# full_page = Html(header, content)
# return full_page
# @rt('/change')
# def get(): return P('Nice to be here!')
# @rt('/authorized', methods="get")
# def authorized():
# content = Body(P("Do we have authenticated user?"))
# return content
serve()