Spaces:
Sleeping
Sleeping
| import openai | |
| import json | |
| import requests | |
| import base64 | |
| import os | |
| import tempfile | |
| import asyncio | |
| import edge_tts | |
| import time | |
| import hashlib | |
| import shutil | |
| from typing import List, Dict, Any, Optional | |
| class VirtualInterviewer: | |
| def __init__(self, api_key: str): | |
| """Initialize the virtual interviewer with the OpenAI API key.""" | |
| self.api_key = api_key | |
| self.questions_asked = [] | |
| self.user_answers = [] | |
| self.conversation_history = [] | |
| self.ideal_answers = {} | |
| self.question_audio_paths = {} | |
| # Create audio directory | |
| self.audio_dir = self._create_audio_directory() | |
| # Clean up any existing audio files | |
| self._cleanup_audio_files() | |
| # Initialize OpenAI client | |
| try: | |
| self.client = openai.OpenAI(api_key=api_key) | |
| except Exception as e: | |
| raise Exception(f"Failed to initialize OpenAI client: {str(e)}") | |
| def _create_audio_directory(self) -> str: | |
| """Create a directory to store audio files.""" | |
| audio_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "audio_files") | |
| os.makedirs(audio_dir, exist_ok=True) | |
| return audio_dir | |
| def _cleanup_audio_files(self): | |
| """Delete all temporary audio files from previous sessions.""" | |
| try: | |
| if os.path.exists(self.audio_dir): | |
| # Delete all files in the directory | |
| for filename in os.listdir(self.audio_dir): | |
| file_path = os.path.join(self.audio_dir, filename) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| print(f"Cleaned up audio files in {self.audio_dir}") | |
| except Exception as e: | |
| print(f"Error cleaning up audio files: {str(e)}") | |
| def generate_interview_questions( | |
| self, | |
| job_description: str, | |
| interview_type: str, | |
| difficulty_level: str, | |
| key_topics: str, | |
| num_questions: int, | |
| generate_ideal_answers: bool = True | |
| ) -> List[str]: | |
| """Generate interview questions based on the job description and other parameters.""" | |
| try: | |
| # Construct the system prompt based on whether we want ideal answers or not | |
| if generate_ideal_answers: | |
| system_prompt = f"""You are an expert interviewer for {interview_type} interviews. | |
| Generate {num_questions} {difficulty_level.lower()} difficulty interview questions for a {interview_type.lower()} interview based on the following job description: | |
| Job Description: | |
| {job_description} | |
| Key Topics to Focus on: | |
| {key_topics if key_topics else "No specific topics provided."} | |
| Please provide the questions and ideal answers in the following JSON format: | |
| {{ | |
| "questions": [ | |
| {{ | |
| "question": "Question 1", | |
| "ideal_answer": "Ideal answer for question 1" | |
| }}, | |
| ... | |
| ] | |
| }} | |
| Make sure the questions are challenging but appropriate for the {difficulty_level.lower()} difficulty level. | |
| The ideal answers should be comprehensive and demonstrate expertise in the subject matter. | |
| """ | |
| else: | |
| system_prompt = f"""You are an expert interviewer for {interview_type} interviews. | |
| Generate {num_questions} {difficulty_level.lower()} difficulty interview questions for a {interview_type.lower()} interview based on the following job description: | |
| Job Description: | |
| {job_description} | |
| Key Topics to Focus on: | |
| {key_topics if key_topics else "No specific topics provided."} | |
| Please provide the questions in a numbered list format. | |
| Make sure the questions are challenging but appropriate for the {difficulty_level.lower()} difficulty level. | |
| """ | |
| # Make the API call to generate questions | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"Generate {num_questions} {interview_type.lower()} interview questions for a {difficulty_level.lower()} difficulty level."} | |
| ], | |
| temperature=0.7, | |
| max_tokens=2000 | |
| ) | |
| # Extract the response content | |
| response_content = response.choices[0].message.content | |
| # Process the response based on whether we're expecting JSON or a simple list | |
| if generate_ideal_answers: | |
| try: | |
| # Try to parse as JSON | |
| json_response = self._extract_json(response_content) | |
| # Extract questions and ideal answers | |
| questions = [] | |
| for item in json_response.get("questions", []): | |
| question = item.get("question", "") | |
| ideal_answer = item.get("ideal_answer", "") | |
| if question: | |
| questions.append(question) | |
| if ideal_answer: | |
| self.ideal_answers[question] = ideal_answer | |
| # If we couldn't extract questions from JSON, fall back to parsing as text | |
| if not questions: | |
| questions = self._parse_questions(response_content, num_questions) | |
| # Generate ideal answers separately | |
| self._generate_ideal_answers(questions, job_description, interview_type, difficulty_level) | |
| except Exception as e: | |
| # If JSON parsing fails, fall back to text parsing | |
| questions = self._parse_questions(response_content, num_questions) | |
| # Generate ideal answers separately | |
| self._generate_ideal_answers(questions, job_description, interview_type, difficulty_level) | |
| else: | |
| # Parse as simple text | |
| questions = self._parse_questions(response_content, num_questions) | |
| # Store the generated questions | |
| self.questions_asked = questions | |
| return questions | |
| except Exception as e: | |
| raise Exception(f"Failed to generate interview questions: {str(e)}") | |
| def generate_question_audio(self, question: str, voice_type: str) -> str: | |
| """Generate audio for a question using edge-tts.""" | |
| try: | |
| # Check if we already have audio for this question | |
| if question in self.question_audio_paths and os.path.exists(self.question_audio_paths[question]): | |
| print(f"Using existing audio for question: {question[:30]}...") | |
| return self.question_audio_paths[question] | |
| # Create a unique filename based on the question content and timestamp | |
| question_hash = hashlib.md5(question.encode()).hexdigest() | |
| timestamp = int(time.time()) | |
| filename = f"question_{question_hash}_{timestamp}.mp3" | |
| output_path = os.path.join(self.audio_dir, filename) | |
| # Map voice type to edge-tts voice | |
| voice_mapping = { | |
| "male_casual": "en-US-GuyNeural", | |
| "male_formal": "en-US-ChristopherNeural", | |
| "male_british": "en-GB-RyanNeural", | |
| "female_casual": "en-US-JennyNeural", | |
| "female_formal": "en-US-AriaNeural", | |
| "female_british": "en-GB-SoniaNeural" | |
| } | |
| # Get the voice name from the mapping, default to female casual | |
| voice = voice_mapping.get(voice_type, "en-US-JennyNeural") | |
| # Generate audio using edge-tts | |
| async def generate_audio(): | |
| communicate = edge_tts.Communicate(question, voice) | |
| await communicate.save(output_path) | |
| # Run the async function | |
| asyncio.run(generate_audio()) | |
| print(f"Generated audio for question: {question[:30]}... at {output_path}") | |
| # Store the audio path for this question | |
| self.question_audio_paths[question] = output_path | |
| return output_path | |
| except Exception as e: | |
| print(f"Error generating audio: {str(e)}") | |
| return "" | |
| def get_question_audio_path(self, question: str) -> str: | |
| """Get the audio path for a question.""" | |
| # Check if we have an audio path for this question | |
| if question in self.question_audio_paths: | |
| # Verify the file exists | |
| if os.path.exists(self.question_audio_paths[question]): | |
| return self.question_audio_paths[question] | |
| else: | |
| # File doesn't exist, remove from dictionary | |
| del self.question_audio_paths[question] | |
| return "" | |
| return "" | |
| def _extract_json(self, text: str) -> Dict[str, Any]: | |
| """Extract JSON from text.""" | |
| try: | |
| # Try to parse the entire text as JSON | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| # If that fails, try to extract JSON from the text | |
| import re | |
| json_match = re.search(r'```json\n(.*?)\n```', text, re.DOTALL) | |
| if json_match: | |
| try: | |
| return json.loads(json_match.group(1)) | |
| except json.JSONDecodeError: | |
| pass | |
| # Try to find JSON between curly braces | |
| json_match = re.search(r'({.*})', text, re.DOTALL) | |
| if json_match: | |
| try: | |
| return json.loads(json_match.group(1)) | |
| except json.JSONDecodeError: | |
| pass | |
| # If all else fails, return an empty dict | |
| return {} | |
| def _generate_ideal_answers(self, questions: List[str], job_description: str, interview_type: str, difficulty_level: str): | |
| """Generate ideal answers for the questions.""" | |
| try: | |
| # Prepare the prompt for generating ideal answers | |
| prompt = f"""You are an expert in {interview_type} interviews. | |
| For each of the following interview questions, provide an ideal answer that would impress the interviewer. | |
| The answers should be comprehensive, demonstrate expertise, and be appropriate for a {difficulty_level.lower()} difficulty level interview. | |
| Job Description: | |
| {job_description} | |
| Questions: | |
| {json.dumps(questions)} | |
| Please provide the answers in the following JSON format: | |
| {{ | |
| "answers": [ | |
| {{ | |
| "question": "Question 1", | |
| "ideal_answer": "Ideal answer for question 1" | |
| }}, | |
| ... | |
| ] | |
| }} | |
| """ | |
| # Make the API call to generate ideal answers | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert interviewer providing ideal answers to interview questions."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.7, | |
| max_tokens=2000 | |
| ) | |
| # Extract the response content | |
| response_content = response.choices[0].message.content | |
| try: | |
| # Try to parse as JSON | |
| json_response = self._extract_json(response_content) | |
| # Extract ideal answers | |
| for item in json_response.get("answers", []): | |
| question = item.get("question", "") | |
| ideal_answer = item.get("ideal_answer", "") | |
| if question and ideal_answer: | |
| # Find the matching question in our list | |
| for q in questions: | |
| if question.lower() in q.lower() or q.lower() in question.lower(): | |
| self.ideal_answers[q] = ideal_answer | |
| break | |
| except Exception as e: | |
| # If batch processing fails, fall back to individual processing | |
| for question in questions: | |
| if question not in self.ideal_answers: | |
| self.ideal_answers[question] = f"Unable to generate ideal answer: {str(e)}" | |
| except Exception as e: | |
| # Handle any errors in the overall ideal answer generation process | |
| print(f"Error generating ideal answers: {str(e)}") | |
| # Ensure all questions have a fallback ideal answer | |
| for question in questions: | |
| if question not in self.ideal_answers: | |
| self.ideal_answers[question] = "Unable to generate ideal answer due to an error." | |
| def _parse_questions(self, questions_text: str, expected_count: int) -> List[str]: | |
| """Parse the questions from the text response.""" | |
| lines = questions_text.strip().split('\n') | |
| questions = [] | |
| for line in lines: | |
| line = line.strip() | |
| if line and (line[0].isdigit() or line.startswith('- ')): | |
| # Remove numbering or bullet points | |
| cleaned_line = line.lstrip('0123456789.- ').strip() | |
| if cleaned_line: | |
| questions.append(cleaned_line) | |
| # If we couldn't parse the expected number of questions, try a simpler approach | |
| if len(questions) != expected_count: | |
| questions = [line.strip() for line in lines if line.strip()][:expected_count] | |
| return questions[:expected_count] # Ensure we return exactly the expected number | |
| def get_next_question(self, question_index: int) -> str: | |
| """Get the next question from the list of generated questions.""" | |
| if 0 <= question_index < len(self.questions_asked): | |
| return self.questions_asked[question_index] | |
| return "No more questions available." | |
| def store_user_answer(self, question: str, answer: str): | |
| """Store the user's answer to a question.""" | |
| self.user_answers.append({"question": question, "answer": answer}) | |
| self.conversation_history.append({"role": "assistant", "content": question}) | |
| self.conversation_history.append({"role": "user", "content": answer}) | |
| def get_ideal_answer(self, question: str) -> str: | |
| """Get the ideal answer for a question.""" | |
| return self.ideal_answers.get(question, "No ideal answer available for this question.") | |
| def score_interview(self, job_description: str, interview_type: str, difficulty_level: str) -> Dict[str, Any]: | |
| """Score the interview based on the user's answers.""" | |
| try: | |
| # Prepare the data for scoring | |
| questions_and_answers = [] | |
| for qa in self.user_answers: | |
| question = qa["question"] | |
| answer = qa["answer"] | |
| ideal_answer = self.get_ideal_answer(question) | |
| questions_and_answers.append({ | |
| "question": question, | |
| "answer": answer, | |
| "ideal_answer": ideal_answer | |
| }) | |
| # Prepare the prompt for scoring | |
| prompt = f"""You are an expert interviewer for {interview_type} interviews. | |
| Score the following interview answers based on the job description and difficulty level. | |
| Job Description: | |
| {job_description} | |
| Difficulty Level: {difficulty_level} | |
| For each question and answer, provide: | |
| 1. A score from 0 to 5 (where 5 is excellent) | |
| 2. Feedback on the answer | |
| 3. Include the ideal answer for comparison. The ideal answer should be a comprehensive and detailed answer that would impress the interviewer with bullet points. | |
| Questions and Answers: | |
| {json.dumps(questions_and_answers)} | |
| Please provide the scores in the following JSON format: | |
| {{ | |
| "overall_score": 4.5, | |
| "overall_feedback": "Overall feedback on the interview performance", | |
| "individual_scores": [ | |
| {{ | |
| "question": "Question 1", | |
| "answer": "User's answer to question 1", | |
| "ideal_answer": "Ideal answer to question 1", | |
| "score": 4, | |
| "feedback": "Feedback on the answer to question 1" | |
| }}, | |
| ... | |
| ] | |
| }} | |
| """ | |
| # Make the API call to score the interview | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert interviewer scoring interview answers."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.3, | |
| max_tokens=2000 | |
| ) | |
| # Extract the response content | |
| response_content = response.choices[0].message.content | |
| try: | |
| # Try to parse as JSON | |
| json_response = self._extract_json(response_content) | |
| return json_response | |
| except Exception as e: | |
| # If JSON parsing fails, return an error | |
| return { | |
| "overall_score": 0, | |
| "overall_feedback": f"Failed to score the interview: {str(e)}", | |
| "individual_scores": [] | |
| } | |
| except Exception as e: | |
| # If scoring fails, return an error | |
| return { | |
| "overall_score": 0, | |
| "overall_feedback": f"Failed to score the interview: {str(e)}", | |
| "individual_scores": [] | |
| } | |