import json import os import random import string import time from collections import defaultdict from typing import Dict, Optional, Tuple from openai import OpenAI from api.llm import LLMManager from utils.config import Config from resources.data import fixed_messages, topic_lists from resources.prompts import prompts from tests.testing_prompts import candidate_prompt def complete_interview( interview_type: str, exp_name: str, llm_config: Optional[Config] = None, requirements: str = "", difficulty: str = "", topic: str = "", model: str = "gpt-3.5-turbo", pause: int = 0, ) -> Tuple[str, Dict]: """ Complete an interview and record the results. :param interview_type: Type of interview to complete. :param exp_name: Experiment name for file saving. :param llm_config: Optional LLM configuration. :param requirements: Additional requirements for the interview. :param difficulty: Difficulty level for the interview. :param topic: Topic for the interview. :param model: Model to use for the candidate. :param pause: Pause duration between requests to prevent rate limits. :return: Tuple containing the file path and interview data. """ client = OpenAI(base_url="https://api.openai.com/v1") config = Config() if llm_config: config.llm = llm_config llm = LLMManager(config, prompts) llm_name = config.llm.name print(f"Starting evaluation interviewer LLM: {llm_name}, candidate LLM: {model}, interview type: {interview_type}") # Select a random topic or difficulty if not provided topic = topic or random.choice(topic_lists[interview_type]) difficulty = difficulty or random.choice(["easy", "medium", "hard"]) problem_statement_text = llm.get_problem_full(requirements, difficulty, topic, interview_type) interview_data = defaultdict( lambda: None, { "interviewer_llm": llm_name, "candidate_llm": model, "inputs": { "interview_type": interview_type, "difficulty": difficulty, "topic": topic, "requirements": requirements, }, "problem_statement": problem_statement_text, "transcript": [], "feedback": None, "average_response_time_seconds": 0, }, ) # Initialize interviewer and candidate messages messages_interviewer = llm.init_bot(problem_statement_text, interview_type) chat_display = [[None, fixed_messages["start"]]] messages_candidate = [ {"role": "system", "content": candidate_prompt}, {"role": "user", "content": f"Your problem: {problem_statement_text}"}, {"role": "user", "content": chat_display[-1][1]}, ] response_times = [] previous_code = "" for _ in range(30): response = client.chat.completions.create( model=model, messages=messages_candidate, temperature=1, response_format={"type": "json_object"} ) response_json = json.loads(response.choices[0].message.content) code = response_json.get("code_and_notes", "") candidate_message = response_json.get("message", "") if not code and not candidate_message: print("No message or code in response") continue messages_candidate.append({"role": "assistant", "content": response.choices[0].message.content}) if code: interview_data["transcript"].append(f"CANDIDATE CODE AND NOTES: {code}") elif candidate_message: interview_data["transcript"].append(f"CANDIDATE MESSAGE: {candidate_message}") chat_display.append([candidate_message, None]) if response_json.get("finished") and not response_json.get("question"): break send_time = time.time() messages_interviewer, chat_display, previous_code = llm.send_request_full(code, previous_code, messages_interviewer, chat_display) response_times.append(time.time() - send_time) messages_candidate.append({"role": "user", "content": chat_display[-1][1]}) message_split = messages_interviewer[-1]["content"].split("#NOTES#") interview_data["transcript"].append(f"INTERVIEWER MESSAGE: {message_split[0]}") if len(message_split) > 1: interview_data["transcript"].append(f"INTERVIEWER HIDDEN NOTE: {message_split[1]}") time.sleep(pause) # to prevent exceeding rate limits interview_data["feedback"] = llm.end_interview_full(problem_statement_text, messages_interviewer, interview_type) interview_data["average_response_time_seconds"] = round(sum(response_times) / len(response_times), 2) if response_times else 0 current_time = time.strftime("%Y%m%d-%H%M%S") random_suffix = "".join(random.choices(string.ascii_letters + string.digits, k=10)) file_path = os.path.join("records", exp_name, f"{current_time}-{random_suffix}.json") os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "w") as file: json.dump(interview_data, file, indent=4) return file_path, interview_data