mbosse99's picture
added assessment tool
5067887
import json
import os
import uuid
from datetime import datetime, timedelta
import requests
import streamlit as st
from azure.cosmos import CosmosClient
from dotenv import load_dotenv
from ParamClasses import Appointment, AppointmentDBItem, Attendee, CreateParams
load_dotenv()
def create_meeting(input_params) -> dict:
try:
url = f"{os.environ.get('BASE_API')}/create/zoom-meeting"
header = {"x-api-key": os.environ.get("API_KEY")}
response = requests.post(url=url, data=input_params, headers=header)
response.raise_for_status()
print(response.json())
return response.json()
except Exception as e:
print(f"Fehler beim erstellen des Termins: {str(e)}")
st.error("Something went wrong, please contact the site admin.", icon="🚨")
def adjust_datetime(original_datetime_str, offset_option):
# Konvertiere den originalen DateTime-String in ein datetime-Objekt
original_datetime = datetime.strptime(
original_datetime_str, "%Y-%m-%dT%H:%M:%S.%fZ"
)
# Überprüfe das Vorzeichen im Offset-String
if offset_option.startswith("+"):
# Wenn das Vorzeichen ein Pluszeichen ist, negiere den Offset
offset_option = "-" + offset_option[1:]
elif offset_option.startswith("-"):
offset_option = "+" + offset_option[1:]
else:
# Wenn kein Vorzeichen vorhanden ist, füge ein Minuszeichen hinzu
offset_option = "" + offset_option
# Konvertiere die Offset-Option von String zu integer
offset_hours = int(offset_option)
# Erzeuge ein timedelta-Objekt mit dem entsprechenden Offset
offset_delta = timedelta(hours=offset_hours)
# Passe das ursprüngliche datetime-Objekt an
adjusted_datetime = original_datetime + offset_delta
# Formatieren und zurückgeben
adjusted_datetime_str = adjusted_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
return adjusted_datetime_str
def write_interview_db_object(interview_data):
try:
db_client: CosmosClient = st.session_state["cosmos_db"]
database = db_client.get_database_client("appointment-database")
container = database.get_container_client("appointments")
container.create_item(body=interview_data)
except Exception as e:
print(f"Fehler beim erstellen des Appointment DB items: {str(e)}")
st.error("Something went wrong, please contact the site admin.", icon="🚨")
def write_assessment_db_object(interview_data):
try:
db_client: CosmosClient = st.session_state["cosmos_db"]
database = db_client.get_database_client("assessment-database")
container = database.get_container_client("assessments")
container.create_item(body=interview_data)
except Exception as e:
print(f"Fehler beim erstellen des Assessment DB items: {str(e)}")
st.error("Something went wrong, please contact the site admin.", icon="🚨")
def create_button_handler():
with st.spinner("Creating the appointment..."):
start_date_utc_str = datetime.strptime(
str(st.session_state["date_input"])
+ " "
+ str(st.session_state["time_input"]),
"%Y-%m-%d %H:%M:%S",
).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
start_date_str = adjust_datetime(
start_date_utc_str, st.session_state["time_zone_option"]
)
end_date = datetime.strptime(
str(st.session_state["date_input"])
+ " "
+ str(st.session_state["time_input"]),
"%Y-%m-%d %H:%M:%S",
) + timedelta(minutes=st.session_state["duration_input"])
end_date_utc_str = end_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end_date_str = adjust_datetime(
end_date_utc_str, st.session_state["time_zone_option"]
)
request_params = json.dumps(
{
"appointment": {"start_time": start_date_str, "end_time": end_date_str},
"subject": st.session_state["subject_input"],
"recruiter": {
"email": "", # st.session_state["recruiter_mail"],
"name": "", # st.session_state["recruiter_mail"]
},
"client": {
"email": "", # st.session_state["client_mail"],
"name": "", # st.session_state["client_mail"]
},
"candidate": {
"email": "", # st.session_state["candidate_mail"],
"name": "", # st.session_state["candidate_mail"]
},
}
)
appointment_response = create_meeting(request_params)
st.session_state["appointment_response"] = appointment_response
if st.session_state["assessment_toggle"]:
assessment_id = str(uuid.uuid4())
st.session_state["assessment_db_id"] = assessment_id
db_item = {
"id": assessment_id,
"zoom_meeting_id": str(appointment_response["zoom_meeting"]["id"]),
"assessment_title": st.session_state["subject_input"],
"start_time": start_date_str,
"end_time": end_date_str,
"callback_url": "http://127.0.0.1:8000/test",
"authorization_token": "asdf",
"meeting_url": appointment_response["zoom_meeting"]["start_url"],
"environment": str(["us"]),
"process_status": "assessment_scheduled",
"recruiter": {
"email": "", # str(st.session_state["recruiter_mail"]),
"name": "", # str(st.session_state["recruiter_mail"])
},
"client": {
"email": "", # str(st.session_state["client_mail"]),
"name": "", # str(st.session_state["client_mail"])
},
"candidate": {
"email": "", # str(st.session_state["candidate_mail"]),
"name": "", # str(st.session_state["candidate_mail"])
},
"interview_transcript": [],
"questions": [],
"coding_tasks": [],
"transcript_summary": "",
}
write_assessment_db_object(db_item)
else:
db_item = {
"id": str(uuid.uuid4()),
"zoom_meeting_id": str(appointment_response["zoom_meeting"]["id"]),
"process_id": str(uuid.uuid4()),
"job_title": st.session_state["subject_input"],
"start_time": start_date_str,
"end_time": end_date_str,
"meeting_url": appointment_response["zoom_meeting"]["start_url"],
"environment": str(["us"]),
"process_status": "interview_scheduled",
"recruiter": {
"email": "", # str(st.session_state["recruiter_mail"]),
"name": "", # str(st.session_state["recruiter_mail"])
},
"client": {
"email": "", # str(st.session_state["client_mail"]),
"name": "", # str(st.session_state["client_mail"])
},
"candidate": {
"email": "", # str(st.session_state["candidate_mail"]),
"name": "", # str(st.session_state["candidate_mail"])
},
"summary_recruiter": "",
"summary_client": "",
"summary_candidate": "",
}
write_interview_db_object(db_item)
if "appointment_response" not in st.session_state:
st.session_state["appointment_response"] = None
if "cosmos_db" not in st.session_state:
# Cosmos DB Client erstellen
client = CosmosClient(os.environ.get("DB_CONNECTION"), os.environ.get("DB_KEY"))
st.session_state["cosmos_db"] = client
if "assessment_db_id" not in st.session_state:
st.session_state["assessment_db_id"] = None
if "assessment_toggle" not in st.session_state:
st.session_state["assessment_toggle"] = True
col1, col2 = st.columns([2, 1])
col1.title("Appointment Tool")
col2.image(
"https://www.workgenius.com/wp-content/uploads/2023/03/WorkGenius_navy-1.svg"
)
st.write("Please enter the date, time and duration for the appointment to be created.")
st.date_input("Date of the appointment", format="DD/MM/YYYY", key="date_input")
now = datetime.now()
rounded_now = now + timedelta(minutes=30 - now.minute % 30)
st.time_input("Starting time of the appointment", value=rounded_now, key="time_input")
st.selectbox(
"Please select your time zone (based on UTC time)",
options=[
"+10",
"+9",
"+8",
"+7",
"+6",
"+5",
"+4",
"+3",
"+2",
"+1",
"0",
"-1",
"-2",
"-3",
"-4",
"-5",
"-6",
"-7",
"-8",
"-9",
"-10",
],
index=10,
key="time_zone_option",
)
st.select_slider(
"Duration of the appointment in minutes",
[15, 30, 45, 60],
value=30,
key="duration_input",
)
st.text_input("Enter the subject of the meeting", key="subject_input")
# recruiter_mail = st.text_input("Please enter the mail of the recruiter", key="recruiter_mail")
# client_mail = st.text_input("Please enter the mail of the client", key="client_mail")
# candidate_mail = st.text_input("Please enter the mail of the candidate", key="candidate_mail")
# st.toggle(
# "Activate to create an assessment appointment, otherwise a candidate interview is going to be created",
# key="assessment_toggle",
# )
st.button(
"Create appointment",
key="create_button",
disabled=False if st.session_state["subject_input"] else True,
on_click=create_button_handler,
)
if st.session_state["appointment_response"]:
st.success("The appointment was created correctly.")
st.write(
"Interviewer link: "
+ os.environ.get("BASE_API")
+ "/meeting-consent?redirect="
+ st.session_state["appointment_response"]["zoom_meeting"]["start_url"].replace(
"https://us06web.zoom.us/", ""
)
)
st.write(
"Interviewee link: "
+ os.environ.get("BASE_API")
+ "/meeting-consent?redirect="
+ st.session_state["appointment_response"]["zoom_meeting"]["join_url"].replace(
"https://us06web.zoom.us/", ""
)
)
if st.session_state["assessment_toggle"]:
st.write(
"Here is the link for the assessment tool: https://wg-assessment-app.azurewebsites.net/assessment/"
+ st.session_state["assessment_db_id"]
)
st.write(
"Here is the link for the code upload: https://wg-assessment-app.azurewebsites.net/candidate/"
+ st.session_state["assessment_db_id"]
)