Spaces:
Sleeping
Sleeping
import openai | |
import json | |
import requests | |
import base64 | |
import os | |
import tempfile | |
import asyncio | |
import edge_tts | |
import time | |
import hashlib | |
import shutil | |
from typing import List, Dict, Any, Optional | |
class VirtualInterviewer: | |
def __init__(self, api_key: str): | |
"""Initialize the virtual interviewer with the OpenAI API key.""" | |
self.api_key = api_key | |
self.questions_asked = [] | |
self.user_answers = [] | |
self.conversation_history = [] | |
self.ideal_answers = {} | |
self.question_audio_paths = {} | |
# Create audio directory | |
self.audio_dir = self._create_audio_directory() | |
# Clean up any existing audio files | |
self._cleanup_audio_files() | |
# Initialize OpenAI client | |
try: | |
self.client = openai.OpenAI(api_key=api_key) | |
except Exception as e: | |
raise Exception(f"Failed to initialize OpenAI client: {str(e)}") | |
def _create_audio_directory(self) -> str: | |
"""Create a directory to store audio files.""" | |
audio_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "audio_files") | |
os.makedirs(audio_dir, exist_ok=True) | |
return audio_dir | |
def _cleanup_audio_files(self): | |
"""Delete all temporary audio files from previous sessions.""" | |
try: | |
if os.path.exists(self.audio_dir): | |
# Delete all files in the directory | |
for filename in os.listdir(self.audio_dir): | |
file_path = os.path.join(self.audio_dir, filename) | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
print(f"Cleaned up audio files in {self.audio_dir}") | |
except Exception as e: | |
print(f"Error cleaning up audio files: {str(e)}") | |
def generate_interview_questions( | |
self, | |
job_description: str, | |
interview_type: str, | |
difficulty_level: str, | |
key_topics: str, | |
num_questions: int, | |
generate_ideal_answers: bool = True | |
) -> List[str]: | |
"""Generate interview questions based on the job description and other parameters.""" | |
try: | |
# Construct the system prompt based on whether we want ideal answers or not | |
if generate_ideal_answers: | |
system_prompt = f"""You are an expert interviewer for {interview_type} interviews. | |
Generate {num_questions} {difficulty_level.lower()} difficulty interview questions for a {interview_type.lower()} interview based on the following job description: | |
Job Description: | |
{job_description} | |
Key Topics to Focus on: | |
{key_topics if key_topics else "No specific topics provided."} | |
Please provide the questions and ideal answers in the following JSON format: | |
{{ | |
"questions": [ | |
{{ | |
"question": "Question 1", | |
"ideal_answer": "Ideal answer for question 1" | |
}}, | |
... | |
] | |
}} | |
Make sure the questions are challenging but appropriate for the {difficulty_level.lower()} difficulty level. | |
The ideal answers should be comprehensive and demonstrate expertise in the subject matter. | |
""" | |
else: | |
system_prompt = f"""You are an expert interviewer for {interview_type} interviews. | |
Generate {num_questions} {difficulty_level.lower()} difficulty interview questions for a {interview_type.lower()} interview based on the following job description: | |
Job Description: | |
{job_description} | |
Key Topics to Focus on: | |
{key_topics if key_topics else "No specific topics provided."} | |
Please provide the questions in a numbered list format. | |
Make sure the questions are challenging but appropriate for the {difficulty_level.lower()} difficulty level. | |
""" | |
# Make the API call to generate questions | |
response = self.client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": f"Generate {num_questions} {interview_type.lower()} interview questions for a {difficulty_level.lower()} difficulty level."} | |
], | |
temperature=0.7, | |
max_tokens=2000 | |
) | |
# Extract the response content | |
response_content = response.choices[0].message.content | |
# Process the response based on whether we're expecting JSON or a simple list | |
if generate_ideal_answers: | |
try: | |
# Try to parse as JSON | |
json_response = self._extract_json(response_content) | |
# Extract questions and ideal answers | |
questions = [] | |
for item in json_response.get("questions", []): | |
question = item.get("question", "") | |
ideal_answer = item.get("ideal_answer", "") | |
if question: | |
questions.append(question) | |
if ideal_answer: | |
self.ideal_answers[question] = ideal_answer | |
# If we couldn't extract questions from JSON, fall back to parsing as text | |
if not questions: | |
questions = self._parse_questions(response_content, num_questions) | |
# Generate ideal answers separately | |
self._generate_ideal_answers(questions, job_description, interview_type, difficulty_level) | |
except Exception as e: | |
# If JSON parsing fails, fall back to text parsing | |
questions = self._parse_questions(response_content, num_questions) | |
# Generate ideal answers separately | |
self._generate_ideal_answers(questions, job_description, interview_type, difficulty_level) | |
else: | |
# Parse as simple text | |
questions = self._parse_questions(response_content, num_questions) | |
# Store the generated questions | |
self.questions_asked = questions | |
return questions | |
except Exception as e: | |
raise Exception(f"Failed to generate interview questions: {str(e)}") | |
def generate_question_audio(self, question: str, voice_type: str) -> str: | |
"""Generate audio for a question using edge-tts.""" | |
try: | |
# Check if we already have audio for this question | |
if question in self.question_audio_paths and os.path.exists(self.question_audio_paths[question]): | |
print(f"Using existing audio for question: {question[:30]}...") | |
return self.question_audio_paths[question] | |
# Create a unique filename based on the question content and timestamp | |
question_hash = hashlib.md5(question.encode()).hexdigest() | |
timestamp = int(time.time()) | |
filename = f"question_{question_hash}_{timestamp}.mp3" | |
output_path = os.path.join(self.audio_dir, filename) | |
# Map voice type to edge-tts voice | |
voice_mapping = { | |
"male_casual": "en-US-GuyNeural", | |
"male_formal": "en-US-ChristopherNeural", | |
"male_british": "en-GB-RyanNeural", | |
"female_casual": "en-US-JennyNeural", | |
"female_formal": "en-US-AriaNeural", | |
"female_british": "en-GB-SoniaNeural" | |
} | |
# Get the voice name from the mapping, default to female casual | |
voice = voice_mapping.get(voice_type, "en-US-JennyNeural") | |
# Generate audio using edge-tts | |
async def generate_audio(): | |
communicate = edge_tts.Communicate(question, voice) | |
await communicate.save(output_path) | |
# Run the async function | |
asyncio.run(generate_audio()) | |
print(f"Generated audio for question: {question[:30]}... at {output_path}") | |
# Store the audio path for this question | |
self.question_audio_paths[question] = output_path | |
return output_path | |
except Exception as e: | |
print(f"Error generating audio: {str(e)}") | |
return "" | |
def get_question_audio_path(self, question: str) -> str: | |
"""Get the audio path for a question.""" | |
# Check if we have an audio path for this question | |
if question in self.question_audio_paths: | |
# Verify the file exists | |
if os.path.exists(self.question_audio_paths[question]): | |
return self.question_audio_paths[question] | |
else: | |
# File doesn't exist, remove from dictionary | |
del self.question_audio_paths[question] | |
return "" | |
return "" | |
def _extract_json(self, text: str) -> Dict[str, Any]: | |
"""Extract JSON from text.""" | |
try: | |
# Try to parse the entire text as JSON | |
return json.loads(text) | |
except json.JSONDecodeError: | |
# If that fails, try to extract JSON from the text | |
import re | |
json_match = re.search(r'```json\n(.*?)\n```', text, re.DOTALL) | |
if json_match: | |
try: | |
return json.loads(json_match.group(1)) | |
except json.JSONDecodeError: | |
pass | |
# Try to find JSON between curly braces | |
json_match = re.search(r'({.*})', text, re.DOTALL) | |
if json_match: | |
try: | |
return json.loads(json_match.group(1)) | |
except json.JSONDecodeError: | |
pass | |
# If all else fails, return an empty dict | |
return {} | |
def _generate_ideal_answers(self, questions: List[str], job_description: str, interview_type: str, difficulty_level: str): | |
"""Generate ideal answers for the questions.""" | |
try: | |
# Prepare the prompt for generating ideal answers | |
prompt = f"""You are an expert in {interview_type} interviews. | |
For each of the following interview questions, provide an ideal answer that would impress the interviewer. | |
The answers should be comprehensive, demonstrate expertise, and be appropriate for a {difficulty_level.lower()} difficulty level interview. | |
Job Description: | |
{job_description} | |
Questions: | |
{json.dumps(questions)} | |
Please provide the answers in the following JSON format: | |
{{ | |
"answers": [ | |
{{ | |
"question": "Question 1", | |
"ideal_answer": "Ideal answer for question 1" | |
}}, | |
... | |
] | |
}} | |
""" | |
# Make the API call to generate ideal answers | |
response = self.client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{"role": "system", "content": "You are an expert interviewer providing ideal answers to interview questions."}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.7, | |
max_tokens=2000 | |
) | |
# Extract the response content | |
response_content = response.choices[0].message.content | |
try: | |
# Try to parse as JSON | |
json_response = self._extract_json(response_content) | |
# Extract ideal answers | |
for item in json_response.get("answers", []): | |
question = item.get("question", "") | |
ideal_answer = item.get("ideal_answer", "") | |
if question and ideal_answer: | |
# Find the matching question in our list | |
for q in questions: | |
if question.lower() in q.lower() or q.lower() in question.lower(): | |
self.ideal_answers[q] = ideal_answer | |
break | |
except Exception as e: | |
# If batch processing fails, fall back to individual processing | |
for question in questions: | |
if question not in self.ideal_answers: | |
self.ideal_answers[question] = f"Unable to generate ideal answer: {str(e)}" | |
except Exception as e: | |
# Handle any errors in the overall ideal answer generation process | |
print(f"Error generating ideal answers: {str(e)}") | |
# Ensure all questions have a fallback ideal answer | |
for question in questions: | |
if question not in self.ideal_answers: | |
self.ideal_answers[question] = "Unable to generate ideal answer due to an error." | |
def _parse_questions(self, questions_text: str, expected_count: int) -> List[str]: | |
"""Parse the questions from the text response.""" | |
lines = questions_text.strip().split('\n') | |
questions = [] | |
for line in lines: | |
line = line.strip() | |
if line and (line[0].isdigit() or line.startswith('- ')): | |
# Remove numbering or bullet points | |
cleaned_line = line.lstrip('0123456789.- ').strip() | |
if cleaned_line: | |
questions.append(cleaned_line) | |
# If we couldn't parse the expected number of questions, try a simpler approach | |
if len(questions) != expected_count: | |
questions = [line.strip() for line in lines if line.strip()][:expected_count] | |
return questions[:expected_count] # Ensure we return exactly the expected number | |
def get_next_question(self, question_index: int) -> str: | |
"""Get the next question from the list of generated questions.""" | |
if 0 <= question_index < len(self.questions_asked): | |
return self.questions_asked[question_index] | |
return "No more questions available." | |
def store_user_answer(self, question: str, answer: str): | |
"""Store the user's answer to a question.""" | |
self.user_answers.append({"question": question, "answer": answer}) | |
self.conversation_history.append({"role": "assistant", "content": question}) | |
self.conversation_history.append({"role": "user", "content": answer}) | |
def get_ideal_answer(self, question: str) -> str: | |
"""Get the ideal answer for a question.""" | |
return self.ideal_answers.get(question, "No ideal answer available for this question.") | |
def score_interview(self, job_description: str, interview_type: str, difficulty_level: str) -> Dict[str, Any]: | |
"""Score the interview based on the user's answers.""" | |
try: | |
# Prepare the data for scoring | |
questions_and_answers = [] | |
for qa in self.user_answers: | |
question = qa["question"] | |
answer = qa["answer"] | |
ideal_answer = self.get_ideal_answer(question) | |
questions_and_answers.append({ | |
"question": question, | |
"answer": answer, | |
"ideal_answer": ideal_answer | |
}) | |
# Prepare the prompt for scoring | |
prompt = f"""You are an expert interviewer for {interview_type} interviews. | |
Score the following interview answers based on the job description and difficulty level. | |
Job Description: | |
{job_description} | |
Difficulty Level: {difficulty_level} | |
For each question and answer, provide: | |
1. A score from 0 to 5 (where 5 is excellent) | |
2. Feedback on the answer | |
3. Include the ideal answer for comparison. The ideal answer should be a comprehensive and detailed answer that would impress the interviewer with bullet points. | |
Questions and Answers: | |
{json.dumps(questions_and_answers)} | |
Please provide the scores in the following JSON format: | |
{{ | |
"overall_score": 4.5, | |
"overall_feedback": "Overall feedback on the interview performance", | |
"individual_scores": [ | |
{{ | |
"question": "Question 1", | |
"answer": "User's answer to question 1", | |
"ideal_answer": "Ideal answer to question 1", | |
"score": 4, | |
"feedback": "Feedback on the answer to question 1" | |
}}, | |
... | |
] | |
}} | |
""" | |
# Make the API call to score the interview | |
response = self.client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{"role": "system", "content": "You are an expert interviewer scoring interview answers."}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.3, | |
max_tokens=2000 | |
) | |
# Extract the response content | |
response_content = response.choices[0].message.content | |
try: | |
# Try to parse as JSON | |
json_response = self._extract_json(response_content) | |
return json_response | |
except Exception as e: | |
# If JSON parsing fails, return an error | |
return { | |
"overall_score": 0, | |
"overall_feedback": f"Failed to score the interview: {str(e)}", | |
"individual_scores": [] | |
} | |
except Exception as e: | |
# If scoring fails, return an error | |
return { | |
"overall_score": 0, | |
"overall_feedback": f"Failed to score the interview: {str(e)}", | |
"individual_scores": [] | |
} | |