evalbot-interview-analysis / process_interview.py
norhan12's picture
Update process_interview.py
8f777a3 verified
import os
import torch
import numpy as np
import uuid
import requests
import time
import json
from pydub import AudioSegment
import wave
from nemo.collections.asr.models import EncDecSpeakerLabelModel
from pinecone import Pinecone, ServerlessSpec
import librosa
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import re
from typing import Dict, List, Tuple
import logging
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib import colors
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
import io
from transformers import AutoTokenizer, AutoModel
import spacy
import google.generativeai as genai
import joblib
from concurrent.futures import ThreadPoolExecutor
from reportlab.lib.enums import TA_CENTER
import subprocess
from contextlib import contextmanager
import tempfile
import multiprocessing
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.getLogger("nemo_logging").setLevel(logging.ERROR)
logging.getLogger("nemo").setLevel(logging.ERROR)
# Configuration
AUDIO_DIR = "./Uploads"
OUTPUT_DIR = "./processed_audio"
BASE_URL = "https://norhan12-evalbot-interview-analysis.hf.space"
os.makedirs(OUTPUT_DIR, exist_ok=True)
# API Keys
PINECONE_KEY = os.getenv("PINECONE_KEY")
ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
# Validate environment variables
def validate_env_vars():
required_keys = ['PINECONE_KEY', 'ASSEMBLYAI_KEY', 'GEMINI_API_KEY']
missing = [key for key in required_keys if not os.getenv(key)]
if missing:
raise ValueError(f"Missing environment variables: {', '.join(missing)}")
validate_env_vars()
# Initialize services
def initialize_services():
try:
pc = Pinecone(api_key=PINECONE_KEY)
index_name = "interview-speaker-embeddings"
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=192,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
index = pc.Index(index_name)
genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel('gemini-1.5-flash')
return index, gemini_model
except Exception as e:
logger.error(f"Error initializing services: {str(e)}")
raise
index, gemini_model = initialize_services()
# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {device}")
def load_speaker_model():
try:
torch.set_num_threads(5)
model = EncDecSpeakerLabelModel.from_pretrained(
"nvidia/speakerverification_en_titanet_large",
map_location=device
)
model.eval()
return model
except Exception as e:
logger.error(f"Model loading failed: {str(e)}")
raise RuntimeError("Could not load speaker verification model")
def load_models():
speaker_model = load_speaker_model()
nlp = spacy.load("en_core_web_sm")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
llm_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device)
llm_model.eval()
return speaker_model, nlp, tokenizer, llm_model
speaker_model, nlp, tokenizer, llm_model = load_models()
@contextmanager
def temp_audio_file():
temp_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
try:
yield temp_file.name
finally:
try:
os.remove(temp_file.name)
except OSError as e:
logger.warning(f"Failed to delete temp file {temp_file.name}: {e}")
def convert_to_wav(input_path: str, output_dir: str = OUTPUT_DIR) -> str:
try:
os.makedirs(output_dir, exist_ok=True)
temp_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
output_path = temp_file.name
command = [
'ffmpeg', '-y',
'-i', input_path,
'-vn',
'-acodec', 'pcm_s16le',
'-ar', '16000',
'-ac', '1',
output_path
]
subprocess.run(command, check=True, capture_output=True, text=True)
if not os.path.exists(output_path):
raise FileNotFoundError(f"FFmpeg failed to create WAV file: {output_path}")
size_in_mb = os.path.getsize(output_path) / (1024*1024)
logger.info(f"WAV file size: {size_in_mb:.2f} MB")
return output_path
except Exception as e:
logger.error(f"Audio conversion failed: {str(e)}")
raise
def extract_prosodic_features(audio_path: str, start_ms: int, end_ms: int) -> Dict:
try:
audio = AudioSegment.from_file(audio_path)
segment = audio[start_ms:end_ms]
with temp_audio_file() as temp_path:
segment.export(temp_path, format="wav")
y, sr = librosa.load(temp_path, sr=16000)
pitches = librosa.piptrack(y=y, sr=sr)[0]
pitches = pitches[pitches > 0]
features = {
'duration': (end_ms - start_ms) / 1000,
'mean_pitch': float(np.mean(pitches)) if len(pitches) > 0 else 0.0,
'min_pitch': float(np.min(pitches)) if len(pitches) > 0 else 0.0,
'max_pitch': float(np.max(pitches)) if len(pitches) > 0 else 0.0,
'pitch_sd': float(np.std(pitches)) if len(pitches) > 0 else 0.0,
'intensityMean': float(np.mean(librosa.feature.rms(y=y)[0])),
'intensityMin': float(np.min(librosa.feature.rms(y=y)[0])),
'intensityMax': float(np.max(librosa.feature.rms(y=y)[0])),
'intensitySD': float(np.std(librosa.feature.rms(y=y)[0])),
}
return features
except Exception as e:
logger.error(f"Feature extraction failed: {str(e)}")
return {
'duration': 0.0,
'mean_pitch': 0.0,
'min_pitch': 0.0,
'max_pitch': 0.0,
'pitch_sd': 0.0,
'intensityMean': 0.0,
'intensityMin': 0.0,
'intensityMax': 0.0,
'intensitySD': 0.0,
}
def transcribe(audio_path: str) -> Dict:
try:
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
logger.debug(f"Uploading audio file: {audio_path}")
with open(audio_path, 'rb') as f:
upload_response = requests.post(
"https://api.assemblyai.com/v2/upload",
headers={"authorization": ASSEMBLYAI_KEY},
data=f
)
upload_response.raise_for_status()
audio_url = upload_response.json()['upload_url']
transcript_response = requests.post(
"https://api.assemblyai.com/v2/transcript",
headers={"authorization": ASSEMBLYAI_KEY},
json={
"audio_url": audio_url,
"speaker_labels": True,
"filter_profanity": True
}
)
transcript_response.raise_for_status()
transcript_id = transcript_response.json()['id']
start_time = time.time()
max_polling_time = 600 # 10 minutes
while True:
if time.time() - start_time > max_polling_time:
raise TimeoutError("Transcription timed out after 10 minutes")
result = requests.get(
f"https://api.assemblyai.com/v2/transcript/{transcript_id}",
headers={"authorization": ASSEMBLYAI_KEY}
).json()
if result['status'] == 'completed':
return result
elif result['status'] == 'error':
raise Exception(result['error'])
time.sleep(5)
except Exception as e:
logger.error(f"Transcription failed: {str(e)}")
raise
def process_utterance(utterance, full_audio, wav_file):
try:
start = utterance['start']
end = utterance['end']
duration_ms = end - start
if duration_ms < 500:
logger.warning(f"Skipping utterance with duration {duration_ms}ms (too short): '{utterance['text'][:20]}...'")
return {
**utterance,
'speaker': 'Unknown',
'speaker_id': 'unknown',
'embedding': None
}
segment = full_audio[start:end]
with temp_audio_file() as temp_path:
segment.export(temp_path, format="wav")
y, sr = librosa.load(temp_path, sr=16000)
with torch.no_grad():
embedding = speaker_model.get_embedding(temp_path).cpu().numpy()
embedding_list = embedding.flatten().tolist()
if not any(embedding_list):
logger.warning(f"Invalid embedding for utterance: '{utterance['text'][:20]}...'")
return {
**utterance,
'speaker': 'Unknown',
'speaker_id': 'unknown',
'embedding': None
}
query_result = index.query(
vector=embedding_list,
top_k=1,
include_metadata=True
)
if query_result['matches'] and query_result['matches'][0]['score'] > 0.7:
speaker_id = query_result['matches'][0]['id']
speaker_name = query_result['matches'][0]['metadata']['speaker_name']
else:
speaker_id = f"unknown_{uuid.uuid4().hex[:6]}"
speaker_name = f"Speaker_{speaker_id[-4:]}"
index.upsert([(speaker_id, embedding_list, {"speaker_name": speaker_name})])
logger.debug(f"Processed utterance: duration={duration_ms}ms, speaker={speaker_name}, text='{utterance['text'][:20]}...'")
return {
**utterance,
'speaker': speaker_name,
'speaker_id': speaker_id,
'embedding': embedding_list
}
except Exception as e:
logger.error(f"Utterance processing failed: {str(e)}", exc_info=True)
return {
**utterance,
'speaker': 'Unknown',
'speaker_id': 'unknown',
'embedding': None
}
def identify_speakers(transcript: Dict, wav_file: str) -> List[Dict]:
try:
full_audio = AudioSegment.from_wav(wav_file)
utterances = transcript['utterances']
max_workers = min(len(utterances), multiprocessing.cpu_count())
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(process_utterance, utterance, full_audio, wav_file)
for utterance in utterances
]
results = [f.result() for f in futures]
return results
except Exception as e:
logger.error(f"Speaker identification failed: {str(e)}")
raise
def train_role_classifier(utterances: List[Dict]):
try:
texts = [u['text'] for u in utterances] # تم حذف الـ 'u' الزائدة
vectorizer = TfidfVectorizer(max_features=500, ngram_range=(1, 2))
X_text = vectorizer.fit_transform(texts)
features = []
labels = []
for i, utterance in enumerate(utterances):
prosodic = utterance['prosodic_features']
feat = [
prosodic['duration'],
prosodic['mean_pitch'],
prosodic['min_pitch'],
prosodic['max_pitch'],
prosodic['pitch_sd'],
prosodic['intensityMean'],
prosodic['intensityMin'],
prosodic['intensityMax'],
prosodic['intensitySD'],
]
feat.extend(X_text[i].toarray()[0].tolist())
doc = nlp(utterance['text'])
feat.extend([
int(utterance['text'].endswith('?')),
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())),
len(utterance['text'].split()),
sum(1 for token in doc if token.pos_ == 'VERB'),
sum(1 for token in doc if token.pos_ == 'NOUN')
])
features.append(feat)
labels.append(0 if i % 2 == 0 else 1)
scaler = StandardScaler()
X = scaler.fit_transform(features)
clf = RandomForestClassifier(
n_estimators=150,
max_depth=10,
random_state=42,
class_weight='balanced'
)
clf.fit(X, labels)
joblib.dump(clf, os.path.join(OUTPUT_DIR, 'role_classifier.pkl'))
joblib.dump(vectorizer, os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl'))
joblib.dump(scaler, os.path.join(OUTPUT_DIR, 'feature_scaler.pkl'))
return clf, vectorizer, scaler
except Exception as e:
logger.error(f"Classifier training failed: {str(e)}")
raise
def classify_roles(utterances: List[Dict], clf, vectorizer, scaler):
try:
texts = [u['text'] for u in utterances]
X_text = vectorizer.transform(texts)
results = []
for i, utterance in enumerate(utterances):
prosodic = utterance['prosodic_features']
feat = [
prosodic['duration'],
prosodic['mean_pitch'],
prosodic['min_pitch'],
prosodic['max_pitch'],
prosodic['pitch_sd'],
prosodic['intensityMean'],
prosodic['intensityMin'],
prosodic['intensityMax'],
prosodic['intensitySD'],
]
feat.extend(X_text[i].toarray()[0].tolist())
doc = nlp(utterance['text'])
feat.extend([
int(utterance['text'].endswith('?')),
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())),
len(utterance['text'].split()),
sum(1 for token in doc if token.pos_ == 'VERB'),
sum(1 for token in doc if token.pos_ == 'NOUN')
])
X = scaler.transform([feat])
role = 'Interviewer' if clf.predict(X)[0] == 0 else 'Interviewee'
results.append({**utterance, 'role': role})
return results
except Exception as e:
logger.error(f"Role classification failed: {str(e)}")
raise
def analyze_interviewee_voice(audio_path: str, utterances: List[Dict]) -> Dict:
try:
y, sr = librosa.load(audio_path, sr=16000)
interviewee_utterances = [u for u in utterances if u['role'] == 'Interviewee']
if not interviewee_utterances:
return {'error': 'No interviewee utterances found'}
segments = []
for u in interviewee_utterances:
start = int(u['start'] * sr / 1000)
end = int(u['end'] * sr / 1000)
segments.append(y[start:end])
combined_audio = np.concatenate(segments)
total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances)
total_words = sum(len(u['text'].split()) for u in interviewee_utterances)
speaking_rate = total_words / total_duration if total_duration > 0 else 0
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean']
filler_count = sum(
sum(u['text'].lower().count(fw) for fw in filler_words)
for u in interviewee_utterances
)
filler_ratio = filler_count / total_words if total_words > 0 else 0
all_words = ' '.join(u['text'].lower() for u in interviewee_utterances).split()
word_counts = {}
for i in range(len(all_words) - 1):
bigram = (all_words[i], all_words[i + 1])
word_counts[bigram] = word_counts.get(bigram, 0) + 1
repetition_score = sum(1 for count in word_counts.values() if count > 1) / len(word_counts) if word_counts else 0
pitches = []
for segment in segments:
f0, voiced_flag, _ = librosa.pyin(segment, fmin=80, fmax=300, sr=sr)
pitches.extend(f0[voiced_flag])
pitch_mean = np.mean(pitches) if len(pitches) > 0 else 0
pitch_std = np.std(pitches) if len(pitches) > 0 else 0
jitter = np.mean(np.abs(np.diff(pitches))) / pitch_mean if len(pitches) > 1 and pitch_mean > 0 else 0
intensities = []
for segment in segments:
rms = librosa.feature.rms(y=segment)[0]
intensities.extend(rms)
intensity_mean = np.mean(intensities) if intensities else 0
intensity_std = np.std(intensities) if intensities else 0
shimmer = np.mean(np.abs(np.diff(intensities))) / intensity_mean if len(intensities) > 1 and intensity_mean > 0 else 0
anxiety_score = 0.6 * (pitch_std / pitch_mean) + 0.4 * (jitter + shimmer) if pitch_mean > 0 else 0
confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio))
hesitation_score = filler_ratio + repetition_score
anxiety_level = 'high' if anxiety_score > 0.15 else 'moderate' if anxiety_score > 0.07 else 'low'
confidence_level = 'high' if confidence_score > 0.7 else 'moderate' if confidence_score > 0.5 else 'low'
fluency_level = 'fluent' if (filler_ratio < 0.05 and repetition_score < 0.1) else 'moderate' if (
filler_ratio < 0.1 and repetition_score < 0.2) else 'disfluent'
return {
'speaking_rate': float(round(speaking_rate, 2)),
'filler_ratio': float(round(filler_ratio, 4)),
'repetition_score': float(round(repetition_score, 4)),
'pitch_analysis': {
'mean': float(round(pitch_mean, 2)),
'std_dev': float(round(pitch_std, 2)),
'jitter': float(round(jitter, 4))
},
'intensity_analysis': {
'mean': float(round(intensity_mean, 2)),
'std_dev': float(round(intensity_std, 2)),
'shimmer': float(round(shimmer, 4))
},
'composite_scores': {
'anxiety': float(round(anxiety_score, 4)),
'confidence': float(round(confidence_score, 4)),
'hesitation': float(round(hesitation_score, 4))
},
'interpretation': {
'anxiety_level': anxiety_level,
'confidence_level': confidence_level,
'fluency_level': fluency_level
}
}
except Exception as e:
logger.error(f"Voice analysis failed: {str(e)}")
return {'error': str(e)}
def generate_voice_interpretation(analysis: Dict) -> str:
if 'error' in analysis:
return "Voice analysis not available."
interpretation_lines = []
interpretation_lines.append("Voice Analysis Summary:")
interpretation_lines.append(f"- Speaking Rate: {analysis['speaking_rate']} words/sec (average)")
interpretation_lines.append(f"- Filler Words: {analysis['filler_ratio'] * 100:.1f}% of words")
interpretation_lines.append(f"- Repetition Score: {analysis['repetition_score']:.3f}")
interpretation_lines.append(
f"- Anxiety Level: {analysis['interpretation']['anxiety_level'].upper()} (score: {analysis['composite_scores']['anxiety']:.3f})")
interpretation_lines.append(
f"- Confidence Level: {analysis['interpretation']['confidence_level'].upper()} (score: {analysis['composite_scores']['confidence']:.3f})")
interpretation_lines.append(f"- Fluency: {analysis['interpretation']['fluency_level'].upper()}")
interpretation_lines.append("")
interpretation_lines.append("Detailed Interpretation:")
interpretation_lines.append("1. A higher speaking rate indicates faster speech, which can suggest nervousness or enthusiasm.")
interpretation_lines.append("2. Filler words and repetitions reduce speech clarity and professionalism.")
interpretation_lines.append("3. Anxiety is measured through pitch variability and voice instability.")
interpretation_lines.append("4. Confidence is assessed through voice intensity and stability.")
interpretation_lines.append("5. Fluency combines filler words and repetition metrics.")
return "\n".join(interpretation_lines)
def generate_anxiety_confidence_chart(composite_scores: Dict, chart_path_or_buffer):
try:
labels = ['Anxiety', 'Confidence']
scores = [composite_scores.get('anxiety', 0), composite_scores.get('confidence', 0)]
fig, ax = plt.subplots(figsize=(5, 3))
bars = ax.bar(labels, scores, color=['#FF6B6B', '#4ECDC4'], edgecolor='black', width=0.6)
ax.set_ylabel('Score (Normalized)', fontsize=12)
ax.set_title('Vocal Dynamics: Anxiety vs. Confidence', fontsize=14, pad=15)
ax.set_ylim(0, 1.2)
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2, height + 0.05, f"{height:.2f}",
ha='center', color='black', fontweight='bold', fontsize=11)
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.savefig(chart_path_or_buffer, format='png', bbox_inches='tight', dpi=200)
plt.close(fig)
except Exception as e:
logger.error(f"Error generating chart: {str(e)}")
import re
from typing import Dict
def calculate_acceptance_probability(analysis_data: Dict) -> float:
"""
Calculates an acceptance probability based on voice analysis and content strength.
Combines multiple voice features and analyzes Gemini report text for strengths extraction.
"""
voice = analysis_data.get('voice_analysis', {})
# Handle error case early
if 'error' in voice:
return 0.0
# Define weights
w_confidence = 0.4
w_anxiety = -0.3
w_fluency = 0.2
w_speaking_rate = 0.1
w_filler_repetition = -0.1
w_content_strengths = 0.2
# Extract voice features
confidence_score = voice.get('composite_scores', {}).get('confidence', 0.0)
anxiety_score = voice.get('composite_scores', {}).get('anxiety', 0.0)
fluency_level = voice.get('interpretation', {}).get('fluency_level', 'disfluent')
speaking_rate = voice.get('speaking_rate', 0.0)
filler_ratio = voice.get('filler_ratio', 0.0)
repetition_score = voice.get('repetition_score', 0.0)
# Map fluency level to numeric score
fluency_map = {'fluent': 1.0, 'moderate': 0.5, 'disfluent': 0.0}
fluency_val = fluency_map.get(fluency_level, 0.0)
# Speaking rate scoring
ideal_speaking_rate = 2.5 # words/sec
speaking_rate_deviation = abs(speaking_rate - ideal_speaking_rate)
speaking_rate_score = max(0, 1 - (speaking_rate_deviation / ideal_speaking_rate))
# Filler and repetition combined score
filler_repetition_composite = (filler_ratio + repetition_score) / 2
filler_repetition_score = max(0, 1 - filler_repetition_composite)
# Content strength extraction from Gemini report text
gemini_report = analysis_data.get('gemini_report_text', '')
strength_count = len(re.findall(r'Strengths?:', gemini_report, re.IGNORECASE))
content_strength_val = min(1.0, strength_count / 5.0) if strength_count else 0.5
# Calculate raw score
raw_score = (
confidence_score * w_confidence +
(1 - anxiety_score) * abs(w_anxiety) + # lower anxiety = better score
fluency_val * w_fluency +
speaking_rate_score * w_speaking_rate +
filler_repetition_score * abs(w_filler_repetition) +
content_strength_val * w_content_strengths
)
# Normalize score to 0-1
max_possible_score = sum([
w_confidence,
abs(w_anxiety),
w_fluency,
w_speaking_rate,
abs(w_filler_repetition),
w_content_strengths
])
normalized_score = (raw_score / max_possible_score) if max_possible_score > 0 else 0.5
acceptance_probability = max(0.0, min(1.0, normalized_score))
return float(f"{acceptance_probability * 100:.2f}")
def generate_report(analysis_data: Dict) -> str:
try:
voice = analysis_data.get('voice_analysis', {})
voice_interpretation = generate_voice_interpretation(voice)
interviewee_responses = [
f"- {u['text']}"
for u in analysis_data['transcript']
if u.get('role') == 'Interviewee'
] or ["- No interviewee responses available."]
full_responses_text = "\n".join([u['text'] for u in analysis_data['transcript'] if u.get('role') == 'Interviewee'])
acceptance_prob = analysis_data.get('acceptance_probability', 50.0)
acceptance_line = f"\n**Suitability Score: {acceptance_prob:.2f}%**\n"
if acceptance_prob >= 80:
acceptance_line += "HR Verdict: Outstanding candidate, recommended for immediate advancement."
elif acceptance_prob >= 60:
acceptance_line += "HR Verdict: Strong candidate, suitable for further evaluation."
elif acceptance_prob >= 40:
acceptance_line += "HR Verdict: Moderate potential, needs additional assessment."
else:
acceptance_line += "HR Verdict: Limited fit, significant improvement required."
prompt = f"""
You are EvalBot, a highly experienced senior HR analyst generating a comprehensive interview evaluation report based on both objective metrics and full interviewee responses.
Your task:
- Analyze deeply based on actual responses provided below. Avoid generic analysis.
- Use only insights that can be inferred from the answers or provided metrics.
- Maintain professional, HR-standard language with clear structure and bullet points.
- Avoid redundancy or overly generic feedback.
- The responses are real interviewee answers, treat them as high-priority source.
{acceptance_line}
### Interviewee Full Responses:
{full_responses_text}
### Metrics Summary:
- Duration: {analysis_data['text_analysis']['total_duration']:.2f} seconds
- Speaker Turns: {analysis_data['text_analysis']['speaker_turns']}
- Speaking Rate: {voice.get('speaking_rate', 'N/A')} words/sec
- Filler Words: {voice.get('filler_ratio', 0) * 100:.1f}%
- Confidence Level: {voice.get('interpretation', {}).get('confidence_level', 'N/A')}
- Anxiety Level: {voice.get('interpretation', {}).get('anxiety_level', 'N/A')}
- Fluency Level: {voice.get('interpretation', {}).get('fluency_level', 'N/A')}
- Voice Interpretation Summary: {voice_interpretation}
### Report Sections to Generate:
**1. Executive Summary**
- 3 bullets summarizing performance, key strengths, and hiring recommendation.
- Mention relevant metrics when applicable.
**2. Communication and Vocal Dynamics**
- Analyze delivery: speaking rate, filler words, confidence, anxiety, fluency.
- Provide 3-4 insightful bullets.
- Give 1 actionable improvement recommendation for workplace communication.
**3. Competency and Content**
- Identify 5-8 strengths (use HR competencies: leadership, teamwork, problem-solving, etc.).
- For each: provide short explanation + concrete example inferred from responses.
- Identify 5-10 weaknesses or development areas.
- For each weakness: provide actionable, practical feedback.
**4. Role Fit and Potential**
- Analyze role fit, cultural fit, growth potential in 3 bullets.
- Use examples from responses whenever possible.
**5. Recommendations**
- Provide 5 actionable recommendations categorized into:
- Communication Skills
- Content Delivery
- Professional Presentation
- Each recommendation should include a short improvement strategy/example.
**Next Steps for Hiring Managers**
- Provide 5 clear next steps: next round, training, assessment, mentorship, role fit review.
Ensure each section is clearly titled exactly as requested above.
Avoid repetition between sections.
Use professional HR tone.
Begin the full analysis now.
"""
response = gemini_model.generate_content(prompt)
clean_text = re.sub(r'[^\x20-\x7E\n]+', '', response.text)
return clean_text
except Exception as e:
logger.error(f"Report generation failed: {str(e)}")
return f"Error generating report: {str(e)}"
def convert_markdown_to_rml(text):
return re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', text)
def header_footer(canvas, doc):
canvas.saveState()
canvas.setFont('Helvetica', 8)
canvas.setFillColor(colors.HexColor('#666666'))
canvas.drawString(doc.leftMargin, 0.5*inch, f"Page {doc.page} | EvalBot HR Interview Report | Confidential")
canvas.drawRightString(doc.width + doc.leftMargin, 0.5*inch, time.strftime('%B %d, %Y'))
canvas.setStrokeColor(colors.HexColor('#0050BC'))
canvas.setLineWidth(0.8)
canvas.line(doc.leftMargin, doc.height + 0.9*inch, doc.width + doc.leftMargin, doc.height + 0.9*inch)
canvas.setFont('Helvetica-Bold', 9)
canvas.drawString(doc.leftMargin, doc.height + 0.95*inch, "Candidate Interview Analysis")
canvas.restoreState()
def create_pdf_report(analysis_data: Dict, output_path: str, gemini_report_text: str) -> bool:
try:
doc = SimpleDocTemplate(
output_path,
pagesize=letter,
rightMargin=0.75 * inch,
leftMargin=0.75 * inch,
topMargin=1 * inch,
bottomMargin=1 * inch
)
styles = getSampleStyleSheet()
cover_title = ParagraphStyle(name='CoverTitle', fontSize=24, leading=28, spaceAfter=20, alignment=TA_CENTER, textColor=colors.HexColor('#003087'), fontName='Helvetica-Bold')
h1 = ParagraphStyle(name='Heading1', fontSize=16, leading=20, spaceAfter=14, alignment=TA_CENTER, textColor=colors.HexColor('#003087'), fontName='Helvetica-Bold')
h2 = ParagraphStyle(name='Heading2', fontSize=12, leading=15, spaceBefore=10, spaceAfter=8, textColor=colors.HexColor('#0050BC'), fontName='Helvetica-Bold')
body_text = ParagraphStyle(name='BodyText', fontSize=10, leading=14, spaceAfter=6, fontName='Helvetica', textColor=colors.HexColor('#333333'))
bullet_style = ParagraphStyle(name='Bullet', parent=body_text, leftIndent=20, bulletIndent=10, bulletFontName='Helvetica', bulletFontSize=10)
story = []
story.append(Spacer(1, 2 * inch))
story.append(Paragraph("Candidate Interview Analysis Report", cover_title))
story.append(Spacer(1, 0.5 * inch))
story.append(Paragraph(f"Candidate ID: {analysis_data.get('user_id', 'N/A')}", body_text))
story.append(Paragraph(f"Generated: {time.strftime('%B %d, %Y')}", body_text))
story.append(Spacer(1, 0.5 * inch))
story.append(Paragraph("Confidential", ParagraphStyle(name='Confidential', fontSize=12, alignment=TA_CENTER, textColor=colors.HexColor('#D32F2F'), fontName='Helvetica-Bold')))
story.append(PageBreak())
story.append(Paragraph("Interview Evaluation Summary", h1))
story.append(Spacer(1, 0.3 * inch))
acceptance_prob = analysis_data.get('acceptance_probability', 50.0)
prob_color = colors.HexColor('#2E7D32') if acceptance_prob >= 80 else (colors.HexColor('#F57C00') if acceptance_prob >= 60 else colors.HexColor('#D32F2F'))
story.append(Paragraph(f"Suitability Score: <font size=14 color='{prob_color.hexval()}'><b>{acceptance_prob:.2f}%</b></font>", h1))
story.append(Spacer(1, 0.3 * inch))
composite_scores = analysis_data.get('voice_analysis', {}).get('composite_scores', {})
if composite_scores:
chart_buffer = io.BytesIO()
generate_anxiety_confidence_chart(composite_scores, chart_buffer)
chart_buffer.seek(0)
chart_img = Image(chart_buffer, width=4*inch, height=2.5*inch)
story.append(Paragraph("Vocal Dynamics: Anxiety vs. Confidence", h2))
story.append(Spacer(1, 0.2 * inch))
story.append(chart_img)
story.append(Spacer(1, 0.3 * inch))
story.append(Paragraph("Full Interview Report", h2))
story.append(Spacer(1, 0.2 * inch))
for line in gemini_report_text.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('**') and line.endswith('**'):
header_text = convert_markdown_to_rml(line[2:-2])
story.append(Spacer(1, 12))
story.append(Paragraph(header_text, h2))
story.append(Spacer(1, 6))
elif line.startswith('- ') or line.startswith('* '):
content = convert_markdown_to_rml(line[2:])
story.append(Paragraph(f'• {content}', bullet_style))
else:
content = convert_markdown_to_rml(line)
story.append(Paragraph(content, body_text))
story.append(Spacer(1, 4))
doc.build(story, onFirstPage=header_footer, onLaterPages=header_footer)
if not os.access(output_path, os.W_OK) or not os.path.exists(output_path):
raise IOError(f"PDF file not accessible or created: {output_path}")
logger.info(f"PDF report generated successfully: {output_path}")
return True
except Exception as e:
logger.error(f"PDF generation failed: {str(e)}", exc_info=True)
return False
def convert_to_serializable(obj):
if isinstance(obj, np.generic):
return obj.item()
elif isinstance(obj, (np.float32, np.float64, np.int32, np.int64)):
return obj.item()
elif isinstance(obj, torch.Tensor):
return obj.cpu().numpy().tolist()
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, dict):
return {key: convert_to_serializable(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [convert_to_serializable(item) for item in obj]
else:
try:
json.dumps(obj)
return obj
except (TypeError, OverflowError):
logger.warning(f"Non-serializable type encountered: {type(obj)}. Converting to str.")
return str(obj)
def process_interview(audio_path: str, user_id: str = "candidate-123") -> Dict:
try:
logger.info(f"Starting processing for {audio_path} (User ID: {user_id})")
wav_file = convert_to_wav(audio_path)
logger.debug(f"Created WAV file: {wav_file}")
logger.info("Starting transcription")
transcript = transcribe(wav_file)
logger.info("Transcript result: %s", transcript)
if not transcript or 'utterances' not in transcript or not transcript['utterances']:
logger.error("Transcription failed or returned empty utterances")
raise ValueError("Transcription failed or returned empty utterances")
logger.info("Extracting prosodic features")
for utterance in transcript['utterances']:
utterance['prosodic_features'] = extract_prosodic_features(
wav_file,
utterance['start'],
utterance['end']
)
logger.info("Identifying speakers")
utterances_with_speakers = identify_speakers(transcript, wav_file)
logger.info("Classifying roles")
if os.path.exists(os.path.join(OUTPUT_DIR, 'role_classifier.pkl')):
clf = joblib.load(os.path.join(OUTPUT_DIR, 'role_classifier.pkl'))
vectorizer = joblib.load(os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl'))
scaler = joblib.load(os.path.join(OUTPUT_DIR, 'feature_scaler.pkl'))
else:
clf, vectorizer, scaler = train_role_classifier(utterances_with_speakers)
classified_utterances = classify_roles(utterances_with_speakers, clf, vectorizer, scaler)
logger.info("Analyzing interviewee voice")
voice_analysis = analyze_interviewee_voice(wav_file, classified_utterances)
analysis_data = {
'user_id': user_id,
'transcript': classified_utterances,
'speakers': list(set(u['speaker'] for u in classified_utterances)),
'voice_analysis': voice_analysis,
'text_analysis': {
'total_duration': sum(u['prosodic_features']['duration'] for u in classified_utterances),
'speaker_turns': len(classified_utterances)
}
}
acceptance_probability = calculate_acceptance_probability(analysis_data)
analysis_data['acceptance_probability'] = acceptance_probability
logger.info("Generating report text using Gemini")
gemini_report_text = generate_report(analysis_data)
base_name = f"{user_id}_{os.path.splitext(os.path.basename(audio_path))[0].split('_', 1)[1]}"
pdf_path = os.path.join(OUTPUT_DIR, f"{base_name}_report.pdf")
if not create_pdf_report(analysis_data, pdf_path, gemini_report_text=gemini_report_text):
logger.error(f"Failed to create PDF report: {pdf_path}")
raise RuntimeError("PDF report generation failed")
try:
json_path = os.path.join(OUTPUT_DIR, f"{base_name}_analysis.json")
with open(json_path, 'w') as f:
logger.debug(f"Serializing analysis_data with keys: {list(analysis_data.keys())}")
serializable_data = convert_to_serializable(analysis_data)
json.dump(serializable_data, f, indent=2)
except Exception as e:
logger.error(f"Failed to serialize analysis_data to JSON: {str(e)}", exc_info=True)
raise
os.remove(wav_file)
logger.info(f"Processing completed for {audio_path} (User ID: {user_id})")
return {
'summary': f"User ID: {user_id}\nspeakers: {', '.join(analysis_data['speakers'])}",
'json_path': json_path,
'pdf_path': pdf_path
}
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
if 'wav_file' in locals() and os.path.exists(wav_file):
os.remove(wav_file)
raise