interviewer / tests /candidate.py
IliaLarchenko's picture
Huge refactoring
e12b285
raw
history blame
No virus
5.17 kB
import json
import os
import random
import string
import time
from collections import defaultdict
from typing import Dict, Optional, Tuple
from openai import OpenAI
from api.llm import LLMManager
from utils.config import Config
from resources.data import fixed_messages, topic_lists
from resources.prompts import prompts
from tests.testing_prompts import candidate_prompt
def complete_interview(
interview_type: str,
exp_name: str,
llm_config: Optional[Config] = None,
requirements: str = "",
difficulty: str = "",
topic: str = "",
model: str = "gpt-3.5-turbo",
pause: int = 0,
) -> Tuple[str, Dict]:
"""
Complete an interview and record the results.
:param interview_type: Type of interview to complete.
:param exp_name: Experiment name for file saving.
:param llm_config: Optional LLM configuration.
:param requirements: Additional requirements for the interview.
:param difficulty: Difficulty level for the interview.
:param topic: Topic for the interview.
:param model: Model to use for the candidate.
:param pause: Pause duration between requests to prevent rate limits.
:return: Tuple containing the file path and interview data.
"""
client = OpenAI(base_url="https://api.openai.com/v1")
config = Config()
if llm_config:
config.llm = llm_config
llm = LLMManager(config, prompts)
llm_name = config.llm.name
print(f"Starting evaluation interviewer LLM: {llm_name}, candidate LLM: {model}, interview type: {interview_type}")
# Select a random topic or difficulty if not provided
topic = topic or random.choice(topic_lists[interview_type])
difficulty = difficulty or random.choice(["easy", "medium", "hard"])
problem_statement_text = llm.get_problem_full(requirements, difficulty, topic, interview_type)
interview_data = defaultdict(
lambda: None,
{
"interviewer_llm": llm_name,
"candidate_llm": model,
"inputs": {
"interview_type": interview_type,
"difficulty": difficulty,
"topic": topic,
"requirements": requirements,
},
"problem_statement": problem_statement_text,
"transcript": [],
"feedback": None,
"average_response_time_seconds": 0,
},
)
# Initialize interviewer and candidate messages
messages_interviewer = llm.init_bot(problem_statement_text, interview_type)
chat_display = [[None, fixed_messages["start"]]]
messages_candidate = [
{"role": "system", "content": candidate_prompt},
{"role": "user", "content": f"Your problem: {problem_statement_text}"},
{"role": "user", "content": chat_display[-1][1]},
]
response_times = []
previous_code = ""
for _ in range(30):
response = client.chat.completions.create(
model=model, messages=messages_candidate, temperature=1, response_format={"type": "json_object"}
)
response_json = json.loads(response.choices[0].message.content)
code = response_json.get("code_and_notes", "")
candidate_message = response_json.get("message", "")
if not code and not candidate_message:
print("No message or code in response")
continue
messages_candidate.append({"role": "assistant", "content": response.choices[0].message.content})
if code:
interview_data["transcript"].append(f"CANDIDATE CODE AND NOTES: {code}")
elif candidate_message:
interview_data["transcript"].append(f"CANDIDATE MESSAGE: {candidate_message}")
chat_display.append([candidate_message, None])
if response_json.get("finished") and not response_json.get("question"):
break
send_time = time.time()
messages_interviewer, chat_display, previous_code = llm.send_request_full(code, previous_code, messages_interviewer, chat_display)
response_times.append(time.time() - send_time)
messages_candidate.append({"role": "user", "content": chat_display[-1][1]})
message_split = messages_interviewer[-1]["content"].split("#NOTES#")
interview_data["transcript"].append(f"INTERVIEWER MESSAGE: {message_split[0]}")
if len(message_split) > 1:
interview_data["transcript"].append(f"INTERVIEWER HIDDEN NOTE: {message_split[1]}")
time.sleep(pause) # to prevent exceeding rate limits
interview_data["feedback"] = llm.end_interview_full(problem_statement_text, messages_interviewer, interview_type)
interview_data["average_response_time_seconds"] = round(sum(response_times) / len(response_times), 2) if response_times else 0
current_time = time.strftime("%Y%m%d-%H%M%S")
random_suffix = "".join(random.choices(string.ascii_letters + string.digits, k=10))
file_path = os.path.join("records", exp_name, f"{current_time}-{random_suffix}.json")
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as file:
json.dump(interview_data, file, indent=4)
return file_path, interview_data