|
import os |
|
import torch |
|
import numpy as np |
|
import uuid |
|
import requests |
|
import time |
|
import json |
|
from pydub import AudioSegment |
|
import wave |
|
from nemo.collections.asr.models import EncDecSpeakerLabelModel |
|
from pinecone import Pinecone, ServerlessSpec |
|
import librosa |
|
import pandas as pd |
|
from sklearn.ensemble import RandomForestClassifier |
|
from sklearn.preprocessing import StandardScaler |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
import re |
|
from typing import Dict, List, Tuple |
|
import logging |
|
from reportlab.lib.pagesizes import letter |
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak |
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
from reportlab.lib.units import inch |
|
from reportlab.lib import colors |
|
import matplotlib.pyplot as plt |
|
import matplotlib |
|
matplotlib.use('Agg') |
|
import io |
|
from transformers import AutoTokenizer, AutoModel |
|
import spacy |
|
import google.generativeai as genai |
|
import joblib |
|
from concurrent.futures import ThreadPoolExecutor |
|
from reportlab.lib.enums import TA_CENTER |
|
import subprocess |
|
from contextlib import contextmanager |
|
import tempfile |
|
import multiprocessing |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
logging.getLogger("nemo_logging").setLevel(logging.ERROR) |
|
logging.getLogger("nemo").setLevel(logging.ERROR) |
|
|
|
|
|
AUDIO_DIR = "./Uploads" |
|
OUTPUT_DIR = "./processed_audio" |
|
BASE_URL = "https://norhan12-evalbot-interview-analysis.hf.space" |
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
|
|
PINECONE_KEY = os.getenv("PINECONE_KEY") |
|
ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY") |
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
|
|
|
def validate_env_vars(): |
|
required_keys = ['PINECONE_KEY', 'ASSEMBLYAI_KEY', 'GEMINI_API_KEY'] |
|
missing = [key for key in required_keys if not os.getenv(key)] |
|
if missing: |
|
raise ValueError(f"Missing environment variables: {', '.join(missing)}") |
|
|
|
validate_env_vars() |
|
|
|
|
|
def initialize_services(): |
|
try: |
|
pc = Pinecone(api_key=PINECONE_KEY) |
|
index_name = "interview-speaker-embeddings" |
|
if index_name not in pc.list_indexes().names(): |
|
pc.create_index( |
|
name=index_name, |
|
dimension=192, |
|
metric="cosine", |
|
spec=ServerlessSpec(cloud="aws", region="us-east-1") |
|
) |
|
index = pc.Index(index_name) |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
gemini_model = genai.GenerativeModel('gemini-1.5-flash') |
|
return index, gemini_model |
|
except Exception as e: |
|
logger.error(f"Error initializing services: {str(e)}") |
|
raise |
|
|
|
index, gemini_model = initialize_services() |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
logger.info(f"Using device: {device}") |
|
|
|
def load_speaker_model(): |
|
try: |
|
torch.set_num_threads(5) |
|
model = EncDecSpeakerLabelModel.from_pretrained( |
|
"nvidia/speakerverification_en_titanet_large", |
|
map_location=device |
|
) |
|
model.eval() |
|
return model |
|
except Exception as e: |
|
logger.error(f"Model loading failed: {str(e)}") |
|
raise RuntimeError("Could not load speaker verification model") |
|
|
|
def load_models(): |
|
speaker_model = load_speaker_model() |
|
nlp = spacy.load("en_core_web_sm") |
|
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") |
|
llm_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device) |
|
llm_model.eval() |
|
return speaker_model, nlp, tokenizer, llm_model |
|
|
|
speaker_model, nlp, tokenizer, llm_model = load_models() |
|
|
|
@contextmanager |
|
def temp_audio_file(): |
|
temp_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) |
|
try: |
|
yield temp_file.name |
|
finally: |
|
try: |
|
os.remove(temp_file.name) |
|
except OSError as e: |
|
logger.warning(f"Failed to delete temp file {temp_file.name}: {e}") |
|
|
|
def convert_to_wav(input_path: str, output_dir: str = OUTPUT_DIR) -> str: |
|
try: |
|
os.makedirs(output_dir, exist_ok=True) |
|
temp_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) |
|
output_path = temp_file.name |
|
command = [ |
|
'ffmpeg', '-y', |
|
'-i', input_path, |
|
'-vn', |
|
'-acodec', 'pcm_s16le', |
|
'-ar', '16000', |
|
'-ac', '1', |
|
output_path |
|
] |
|
subprocess.run(command, check=True, capture_output=True, text=True) |
|
if not os.path.exists(output_path): |
|
raise FileNotFoundError(f"FFmpeg failed to create WAV file: {output_path}") |
|
size_in_mb = os.path.getsize(output_path) / (1024*1024) |
|
logger.info(f"WAV file size: {size_in_mb:.2f} MB") |
|
return output_path |
|
except Exception as e: |
|
logger.error(f"Audio conversion failed: {str(e)}") |
|
raise |
|
|
|
def extract_prosodic_features(audio_path: str, start_ms: int, end_ms: int) -> Dict: |
|
try: |
|
audio = AudioSegment.from_file(audio_path) |
|
segment = audio[start_ms:end_ms] |
|
with temp_audio_file() as temp_path: |
|
segment.export(temp_path, format="wav") |
|
y, sr = librosa.load(temp_path, sr=16000) |
|
pitches = librosa.piptrack(y=y, sr=sr)[0] |
|
pitches = pitches[pitches > 0] |
|
features = { |
|
'duration': (end_ms - start_ms) / 1000, |
|
'mean_pitch': float(np.mean(pitches)) if len(pitches) > 0 else 0.0, |
|
'min_pitch': float(np.min(pitches)) if len(pitches) > 0 else 0.0, |
|
'max_pitch': float(np.max(pitches)) if len(pitches) > 0 else 0.0, |
|
'pitch_sd': float(np.std(pitches)) if len(pitches) > 0 else 0.0, |
|
'intensityMean': float(np.mean(librosa.feature.rms(y=y)[0])), |
|
'intensityMin': float(np.min(librosa.feature.rms(y=y)[0])), |
|
'intensityMax': float(np.max(librosa.feature.rms(y=y)[0])), |
|
'intensitySD': float(np.std(librosa.feature.rms(y=y)[0])), |
|
} |
|
return features |
|
except Exception as e: |
|
logger.error(f"Feature extraction failed: {str(e)}") |
|
return { |
|
'duration': 0.0, |
|
'mean_pitch': 0.0, |
|
'min_pitch': 0.0, |
|
'max_pitch': 0.0, |
|
'pitch_sd': 0.0, |
|
'intensityMean': 0.0, |
|
'intensityMin': 0.0, |
|
'intensityMax': 0.0, |
|
'intensitySD': 0.0, |
|
} |
|
|
|
def transcribe(audio_path: str) -> Dict: |
|
try: |
|
if not os.path.exists(audio_path): |
|
raise FileNotFoundError(f"Audio file not found: {audio_path}") |
|
logger.debug(f"Uploading audio file: {audio_path}") |
|
with open(audio_path, 'rb') as f: |
|
upload_response = requests.post( |
|
"https://api.assemblyai.com/v2/upload", |
|
headers={"authorization": ASSEMBLYAI_KEY}, |
|
data=f |
|
) |
|
upload_response.raise_for_status() |
|
audio_url = upload_response.json()['upload_url'] |
|
transcript_response = requests.post( |
|
"https://api.assemblyai.com/v2/transcript", |
|
headers={"authorization": ASSEMBLYAI_KEY}, |
|
json={ |
|
"audio_url": audio_url, |
|
"speaker_labels": True, |
|
"filter_profanity": True |
|
} |
|
) |
|
transcript_response.raise_for_status() |
|
transcript_id = transcript_response.json()['id'] |
|
start_time = time.time() |
|
max_polling_time = 600 |
|
while True: |
|
if time.time() - start_time > max_polling_time: |
|
raise TimeoutError("Transcription timed out after 10 minutes") |
|
result = requests.get( |
|
f"https://api.assemblyai.com/v2/transcript/{transcript_id}", |
|
headers={"authorization": ASSEMBLYAI_KEY} |
|
).json() |
|
if result['status'] == 'completed': |
|
return result |
|
elif result['status'] == 'error': |
|
raise Exception(result['error']) |
|
time.sleep(5) |
|
except Exception as e: |
|
logger.error(f"Transcription failed: {str(e)}") |
|
raise |
|
|
|
def process_utterance(utterance, full_audio, wav_file): |
|
try: |
|
start = utterance['start'] |
|
end = utterance['end'] |
|
duration_ms = end - start |
|
if duration_ms < 500: |
|
logger.warning(f"Skipping utterance with duration {duration_ms}ms (too short): '{utterance['text'][:20]}...'") |
|
return { |
|
**utterance, |
|
'speaker': 'Unknown', |
|
'speaker_id': 'unknown', |
|
'embedding': None |
|
} |
|
segment = full_audio[start:end] |
|
with temp_audio_file() as temp_path: |
|
segment.export(temp_path, format="wav") |
|
y, sr = librosa.load(temp_path, sr=16000) |
|
with torch.no_grad(): |
|
embedding = speaker_model.get_embedding(temp_path).cpu().numpy() |
|
embedding_list = embedding.flatten().tolist() |
|
if not any(embedding_list): |
|
logger.warning(f"Invalid embedding for utterance: '{utterance['text'][:20]}...'") |
|
return { |
|
**utterance, |
|
'speaker': 'Unknown', |
|
'speaker_id': 'unknown', |
|
'embedding': None |
|
} |
|
query_result = index.query( |
|
vector=embedding_list, |
|
top_k=1, |
|
include_metadata=True |
|
) |
|
if query_result['matches'] and query_result['matches'][0]['score'] > 0.7: |
|
speaker_id = query_result['matches'][0]['id'] |
|
speaker_name = query_result['matches'][0]['metadata']['speaker_name'] |
|
else: |
|
speaker_id = f"unknown_{uuid.uuid4().hex[:6]}" |
|
speaker_name = f"Speaker_{speaker_id[-4:]}" |
|
index.upsert([(speaker_id, embedding_list, {"speaker_name": speaker_name})]) |
|
logger.debug(f"Processed utterance: duration={duration_ms}ms, speaker={speaker_name}, text='{utterance['text'][:20]}...'") |
|
return { |
|
**utterance, |
|
'speaker': speaker_name, |
|
'speaker_id': speaker_id, |
|
'embedding': embedding_list |
|
} |
|
except Exception as e: |
|
logger.error(f"Utterance processing failed: {str(e)}", exc_info=True) |
|
return { |
|
**utterance, |
|
'speaker': 'Unknown', |
|
'speaker_id': 'unknown', |
|
'embedding': None |
|
} |
|
|
|
def identify_speakers(transcript: Dict, wav_file: str) -> List[Dict]: |
|
try: |
|
full_audio = AudioSegment.from_wav(wav_file) |
|
utterances = transcript['utterances'] |
|
max_workers = min(len(utterances), multiprocessing.cpu_count()) |
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
futures = [ |
|
executor.submit(process_utterance, utterance, full_audio, wav_file) |
|
for utterance in utterances |
|
] |
|
results = [f.result() for f in futures] |
|
return results |
|
except Exception as e: |
|
logger.error(f"Speaker identification failed: {str(e)}") |
|
raise |
|
|
|
|
|
|
|
|
|
def train_role_classifier(utterances: List[Dict]): |
|
try: |
|
texts = [u['text'] for u in utterances] |
|
vectorizer = TfidfVectorizer(max_features=500, ngram_range=(1, 2)) |
|
X_text = vectorizer.fit_transform(texts) |
|
|
|
features = [] |
|
labels = [] |
|
|
|
for i, utterance in enumerate(utterances): |
|
prosodic = utterance['prosodic_features'] |
|
feat = [ |
|
prosodic['duration'], |
|
prosodic['mean_pitch'], |
|
prosodic['min_pitch'], |
|
prosodic['max_pitch'], |
|
prosodic['pitch_sd'], |
|
prosodic['intensityMean'], |
|
prosodic['intensityMin'], |
|
prosodic['intensityMax'], |
|
prosodic['intensitySD'], |
|
] |
|
|
|
feat.extend(X_text[i].toarray()[0].tolist()) |
|
|
|
doc = nlp(utterance['text']) |
|
feat.extend([ |
|
int(utterance['text'].endswith('?')), |
|
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())), |
|
len(utterance['text'].split()), |
|
sum(1 for token in doc if token.pos_ == 'VERB'), |
|
sum(1 for token in doc if token.pos_ == 'NOUN') |
|
]) |
|
|
|
features.append(feat) |
|
labels.append(0 if i % 2 == 0 else 1) |
|
|
|
scaler = StandardScaler() |
|
X = scaler.fit_transform(features) |
|
|
|
clf = RandomForestClassifier( |
|
n_estimators=150, |
|
max_depth=10, |
|
random_state=42, |
|
class_weight='balanced' |
|
) |
|
clf.fit(X, labels) |
|
|
|
joblib.dump(clf, os.path.join(OUTPUT_DIR, 'role_classifier.pkl')) |
|
joblib.dump(vectorizer, os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl')) |
|
joblib.dump(scaler, os.path.join(OUTPUT_DIR, 'feature_scaler.pkl')) |
|
|
|
return clf, vectorizer, scaler |
|
except Exception as e: |
|
logger.error(f"Classifier training failed: {str(e)}") |
|
raise |
|
|
|
|
|
def classify_roles(utterances: List[Dict], clf, vectorizer, scaler): |
|
try: |
|
texts = [u['text'] for u in utterances] |
|
X_text = vectorizer.transform(texts) |
|
|
|
results = [] |
|
for i, utterance in enumerate(utterances): |
|
prosodic = utterance['prosodic_features'] |
|
feat = [ |
|
prosodic['duration'], |
|
prosodic['mean_pitch'], |
|
prosodic['min_pitch'], |
|
prosodic['max_pitch'], |
|
prosodic['pitch_sd'], |
|
prosodic['intensityMean'], |
|
prosodic['intensityMin'], |
|
prosodic['intensityMax'], |
|
prosodic['intensitySD'], |
|
] |
|
|
|
feat.extend(X_text[i].toarray()[0].tolist()) |
|
|
|
doc = nlp(utterance['text']) |
|
feat.extend([ |
|
int(utterance['text'].endswith('?')), |
|
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())), |
|
len(utterance['text'].split()), |
|
sum(1 for token in doc if token.pos_ == 'VERB'), |
|
sum(1 for token in doc if token.pos_ == 'NOUN') |
|
]) |
|
|
|
X = scaler.transform([feat]) |
|
role = 'Interviewer' if clf.predict(X)[0] == 0 else 'Interviewee' |
|
|
|
results.append({**utterance, 'role': role}) |
|
|
|
return results |
|
except Exception as e: |
|
logger.error(f"Role classification failed: {str(e)}") |
|
raise |
|
|
|
|
|
|
|
def analyze_interviewee_voice(audio_path: str, utterances: List[Dict]) -> Dict: |
|
try: |
|
y, sr = librosa.load(audio_path, sr=16000) |
|
interviewee_utterances = [u for u in utterances if u['role'] == 'Interviewee'] |
|
if not interviewee_utterances: |
|
return {'error': 'No interviewee utterances found'} |
|
segments = [] |
|
for u in interviewee_utterances: |
|
start = int(u['start'] * sr / 1000) |
|
end = int(u['end'] * sr / 1000) |
|
segments.append(y[start:end]) |
|
combined_audio = np.concatenate(segments) |
|
total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances) |
|
total_words = sum(len(u['text'].split()) for u in interviewee_utterances) |
|
speaking_rate = total_words / total_duration if total_duration > 0 else 0 |
|
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean'] |
|
filler_count = sum( |
|
sum(u['text'].lower().count(fw) for fw in filler_words) |
|
for u in interviewee_utterances |
|
) |
|
filler_ratio = filler_count / total_words if total_words > 0 else 0 |
|
all_words = ' '.join(u['text'].lower() for u in interviewee_utterances).split() |
|
word_counts = {} |
|
for i in range(len(all_words) - 1): |
|
bigram = (all_words[i], all_words[i + 1]) |
|
word_counts[bigram] = word_counts.get(bigram, 0) + 1 |
|
repetition_score = sum(1 for count in word_counts.values() if count > 1) / len(word_counts) if word_counts else 0 |
|
pitches = [] |
|
for segment in segments: |
|
f0, voiced_flag, _ = librosa.pyin(segment, fmin=80, fmax=300, sr=sr) |
|
pitches.extend(f0[voiced_flag]) |
|
pitch_mean = np.mean(pitches) if len(pitches) > 0 else 0 |
|
pitch_std = np.std(pitches) if len(pitches) > 0 else 0 |
|
jitter = np.mean(np.abs(np.diff(pitches))) / pitch_mean if len(pitches) > 1 and pitch_mean > 0 else 0 |
|
intensities = [] |
|
for segment in segments: |
|
rms = librosa.feature.rms(y=segment)[0] |
|
intensities.extend(rms) |
|
intensity_mean = np.mean(intensities) if intensities else 0 |
|
intensity_std = np.std(intensities) if intensities else 0 |
|
shimmer = np.mean(np.abs(np.diff(intensities))) / intensity_mean if len(intensities) > 1 and intensity_mean > 0 else 0 |
|
anxiety_score = 0.6 * (pitch_std / pitch_mean) + 0.4 * (jitter + shimmer) if pitch_mean > 0 else 0 |
|
confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio)) |
|
hesitation_score = filler_ratio + repetition_score |
|
anxiety_level = 'high' if anxiety_score > 0.15 else 'moderate' if anxiety_score > 0.07 else 'low' |
|
confidence_level = 'high' if confidence_score > 0.7 else 'moderate' if confidence_score > 0.5 else 'low' |
|
fluency_level = 'fluent' if (filler_ratio < 0.05 and repetition_score < 0.1) else 'moderate' if ( |
|
filler_ratio < 0.1 and repetition_score < 0.2) else 'disfluent' |
|
return { |
|
'speaking_rate': float(round(speaking_rate, 2)), |
|
'filler_ratio': float(round(filler_ratio, 4)), |
|
'repetition_score': float(round(repetition_score, 4)), |
|
'pitch_analysis': { |
|
'mean': float(round(pitch_mean, 2)), |
|
'std_dev': float(round(pitch_std, 2)), |
|
'jitter': float(round(jitter, 4)) |
|
}, |
|
'intensity_analysis': { |
|
'mean': float(round(intensity_mean, 2)), |
|
'std_dev': float(round(intensity_std, 2)), |
|
'shimmer': float(round(shimmer, 4)) |
|
}, |
|
'composite_scores': { |
|
'anxiety': float(round(anxiety_score, 4)), |
|
'confidence': float(round(confidence_score, 4)), |
|
'hesitation': float(round(hesitation_score, 4)) |
|
}, |
|
'interpretation': { |
|
'anxiety_level': anxiety_level, |
|
'confidence_level': confidence_level, |
|
'fluency_level': fluency_level |
|
} |
|
} |
|
except Exception as e: |
|
logger.error(f"Voice analysis failed: {str(e)}") |
|
return {'error': str(e)} |
|
|
|
def generate_voice_interpretation(analysis: Dict) -> str: |
|
if 'error' in analysis: |
|
return "Voice analysis not available." |
|
interpretation_lines = [] |
|
interpretation_lines.append("Voice Analysis Summary:") |
|
interpretation_lines.append(f"- Speaking Rate: {analysis['speaking_rate']} words/sec (average)") |
|
interpretation_lines.append(f"- Filler Words: {analysis['filler_ratio'] * 100:.1f}% of words") |
|
interpretation_lines.append(f"- Repetition Score: {analysis['repetition_score']:.3f}") |
|
interpretation_lines.append( |
|
f"- Anxiety Level: {analysis['interpretation']['anxiety_level'].upper()} (score: {analysis['composite_scores']['anxiety']:.3f})") |
|
interpretation_lines.append( |
|
f"- Confidence Level: {analysis['interpretation']['confidence_level'].upper()} (score: {analysis['composite_scores']['confidence']:.3f})") |
|
interpretation_lines.append(f"- Fluency: {analysis['interpretation']['fluency_level'].upper()}") |
|
interpretation_lines.append("") |
|
interpretation_lines.append("Detailed Interpretation:") |
|
interpretation_lines.append("1. A higher speaking rate indicates faster speech, which can suggest nervousness or enthusiasm.") |
|
interpretation_lines.append("2. Filler words and repetitions reduce speech clarity and professionalism.") |
|
interpretation_lines.append("3. Anxiety is measured through pitch variability and voice instability.") |
|
interpretation_lines.append("4. Confidence is assessed through voice intensity and stability.") |
|
interpretation_lines.append("5. Fluency combines filler words and repetition metrics.") |
|
return "\n".join(interpretation_lines) |
|
|
|
def generate_anxiety_confidence_chart(composite_scores: Dict, chart_path_or_buffer): |
|
try: |
|
labels = ['Anxiety', 'Confidence'] |
|
scores = [composite_scores.get('anxiety', 0), composite_scores.get('confidence', 0)] |
|
fig, ax = plt.subplots(figsize=(5, 3)) |
|
bars = ax.bar(labels, scores, color=['#FF6B6B', '#4ECDC4'], edgecolor='black', width=0.6) |
|
ax.set_ylabel('Score (Normalized)', fontsize=12) |
|
ax.set_title('Vocal Dynamics: Anxiety vs. Confidence', fontsize=14, pad=15) |
|
ax.set_ylim(0, 1.2) |
|
for bar in bars: |
|
height = bar.get_height() |
|
ax.text(bar.get_x() + bar.get_width()/2, height + 0.05, f"{height:.2f}", |
|
ha='center', color='black', fontweight='bold', fontsize=11) |
|
ax.grid(True, axis='y', linestyle='--', alpha=0.7) |
|
plt.tight_layout() |
|
plt.savefig(chart_path_or_buffer, format='png', bbox_inches='tight', dpi=200) |
|
plt.close(fig) |
|
except Exception as e: |
|
logger.error(f"Error generating chart: {str(e)}") |
|
|
|
import re |
|
from typing import Dict |
|
|
|
def calculate_acceptance_probability(analysis_data: Dict) -> float: |
|
""" |
|
Calculates an acceptance probability based on voice analysis and content strength. |
|
Combines multiple voice features and analyzes Gemini report text for strengths extraction. |
|
""" |
|
voice = analysis_data.get('voice_analysis', {}) |
|
|
|
if 'error' in voice: |
|
return 0.0 |
|
|
|
w_confidence = 0.4 |
|
w_anxiety = -0.3 |
|
w_fluency = 0.2 |
|
w_speaking_rate = 0.1 |
|
w_filler_repetition = -0.1 |
|
w_content_strengths = 0.2 |
|
|
|
confidence_score = voice.get('composite_scores', {}).get('confidence', 0.0) |
|
anxiety_score = voice.get('composite_scores', {}).get('anxiety', 0.0) |
|
fluency_level = voice.get('interpretation', {}).get('fluency_level', 'disfluent') |
|
speaking_rate = voice.get('speaking_rate', 0.0) |
|
filler_ratio = voice.get('filler_ratio', 0.0) |
|
repetition_score = voice.get('repetition_score', 0.0) |
|
|
|
fluency_map = {'fluent': 1.0, 'moderate': 0.5, 'disfluent': 0.0} |
|
fluency_val = fluency_map.get(fluency_level, 0.0) |
|
|
|
ideal_speaking_rate = 2.5 |
|
speaking_rate_deviation = abs(speaking_rate - ideal_speaking_rate) |
|
speaking_rate_score = max(0, 1 - (speaking_rate_deviation / ideal_speaking_rate)) |
|
|
|
filler_repetition_composite = (filler_ratio + repetition_score) / 2 |
|
filler_repetition_score = max(0, 1 - filler_repetition_composite) |
|
|
|
gemini_report = analysis_data.get('gemini_report_text', '') |
|
strength_count = len(re.findall(r'Strengths?:', gemini_report, re.IGNORECASE)) |
|
content_strength_val = min(1.0, strength_count / 5.0) if strength_count else 0.5 |
|
|
|
raw_score = ( |
|
confidence_score * w_confidence + |
|
(1 - anxiety_score) * abs(w_anxiety) + |
|
fluency_val * w_fluency + |
|
speaking_rate_score * w_speaking_rate + |
|
filler_repetition_score * abs(w_filler_repetition) + |
|
content_strength_val * w_content_strengths |
|
) |
|
|
|
max_possible_score = sum([ |
|
w_confidence, |
|
abs(w_anxiety), |
|
w_fluency, |
|
w_speaking_rate, |
|
abs(w_filler_repetition), |
|
w_content_strengths |
|
]) |
|
normalized_score = (raw_score / max_possible_score) if max_possible_score > 0 else 0.5 |
|
acceptance_probability = max(0.0, min(1.0, normalized_score)) |
|
return float(f"{acceptance_probability * 100:.2f}") |
|
def generate_report(analysis_data: Dict) -> str: |
|
try: |
|
voice = analysis_data.get('voice_analysis', {}) |
|
voice_interpretation = generate_voice_interpretation(voice) |
|
interviewee_responses = [ |
|
f"- {u['text']}" |
|
for u in analysis_data['transcript'] |
|
if u.get('role') == 'Interviewee' |
|
] or ["- No interviewee responses available."] |
|
full_responses_text = "\n".join([u['text'] for u in analysis_data['transcript'] if u.get('role') == 'Interviewee']) |
|
acceptance_prob = analysis_data.get('acceptance_probability', 50.0) |
|
acceptance_line = f"\n**Suitability Score: {acceptance_prob:.2f}%**\n" |
|
if acceptance_prob >= 80: |
|
acceptance_line += "HR Verdict: Outstanding candidate, recommended for immediate advancement." |
|
elif acceptance_prob >= 60: |
|
acceptance_line += "HR Verdict: Strong candidate, suitable for further evaluation." |
|
elif acceptance_prob >= 40: |
|
acceptance_line += "HR Verdict: Moderate potential, needs additional assessment." |
|
else: |
|
acceptance_line += "HR Verdict: Limited fit, significant improvement required." |
|
prompt = f""" |
|
You are EvalBot, a highly experienced senior HR analyst generating a comprehensive interview evaluation report based on both objective metrics and full interviewee responses. |
|
Your task: |
|
- Analyze deeply based on actual responses provided below. Avoid generic analysis. |
|
- Use only insights that can be inferred from the answers or provided metrics. |
|
- Maintain professional, HR-standard language with clear structure and bullet points. |
|
- Avoid redundancy or overly generic feedback. |
|
- The responses are real interviewee answers, treat them as high-priority source. |
|
{acceptance_line} |
|
### Interviewee Full Responses: |
|
{full_responses_text} |
|
### Metrics Summary: |
|
- Duration: {analysis_data['text_analysis']['total_duration']:.2f} seconds |
|
- Speaker Turns: {analysis_data['text_analysis']['speaker_turns']} |
|
- Speaking Rate: {voice.get('speaking_rate', 'N/A')} words/sec |
|
- Filler Words: {voice.get('filler_ratio', 0) * 100:.1f}% |
|
- Confidence Level: {voice.get('interpretation', {}).get('confidence_level', 'N/A')} |
|
- Anxiety Level: {voice.get('interpretation', {}).get('anxiety_level', 'N/A')} |
|
- Fluency Level: {voice.get('interpretation', {}).get('fluency_level', 'N/A')} |
|
- Voice Interpretation Summary: {voice_interpretation} |
|
### Report Sections to Generate: |
|
**1. Executive Summary** |
|
- 3 bullets summarizing performance, key strengths, and hiring recommendation. |
|
- Mention relevant metrics when applicable. |
|
**2. Communication and Vocal Dynamics** |
|
- Analyze delivery: speaking rate, filler words, confidence, anxiety, fluency. |
|
- Provide 3-4 insightful bullets. |
|
- Give 1 actionable improvement recommendation for workplace communication. |
|
**3. Competency and Content** |
|
- Identify 5-8 strengths (use HR competencies: leadership, teamwork, problem-solving, etc.). |
|
- For each: provide short explanation + concrete example inferred from responses. |
|
- Identify 5-10 weaknesses or development areas. |
|
- For each weakness: provide actionable, practical feedback. |
|
**4. Role Fit and Potential** |
|
- Analyze role fit, cultural fit, growth potential in 3 bullets. |
|
- Use examples from responses whenever possible. |
|
**5. Recommendations** |
|
- Provide 5 actionable recommendations categorized into: |
|
- Communication Skills |
|
- Content Delivery |
|
- Professional Presentation |
|
- Each recommendation should include a short improvement strategy/example. |
|
**Next Steps for Hiring Managers** |
|
- Provide 5 clear next steps: next round, training, assessment, mentorship, role fit review. |
|
Ensure each section is clearly titled exactly as requested above. |
|
Avoid repetition between sections. |
|
Use professional HR tone. |
|
Begin the full analysis now. |
|
""" |
|
response = gemini_model.generate_content(prompt) |
|
clean_text = re.sub(r'[^\x20-\x7E\n]+', '', response.text) |
|
return clean_text |
|
except Exception as e: |
|
logger.error(f"Report generation failed: {str(e)}") |
|
return f"Error generating report: {str(e)}" |
|
|
|
def convert_markdown_to_rml(text): |
|
return re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', text) |
|
|
|
def header_footer(canvas, doc): |
|
canvas.saveState() |
|
canvas.setFont('Helvetica', 8) |
|
canvas.setFillColor(colors.HexColor('#666666')) |
|
canvas.drawString(doc.leftMargin, 0.5*inch, f"Page {doc.page} | EvalBot HR Interview Report | Confidential") |
|
canvas.drawRightString(doc.width + doc.leftMargin, 0.5*inch, time.strftime('%B %d, %Y')) |
|
canvas.setStrokeColor(colors.HexColor('#0050BC')) |
|
canvas.setLineWidth(0.8) |
|
canvas.line(doc.leftMargin, doc.height + 0.9*inch, doc.width + doc.leftMargin, doc.height + 0.9*inch) |
|
canvas.setFont('Helvetica-Bold', 9) |
|
canvas.drawString(doc.leftMargin, doc.height + 0.95*inch, "Candidate Interview Analysis") |
|
canvas.restoreState() |
|
|
|
def create_pdf_report(analysis_data: Dict, output_path: str, gemini_report_text: str) -> bool: |
|
try: |
|
doc = SimpleDocTemplate( |
|
output_path, |
|
pagesize=letter, |
|
rightMargin=0.75 * inch, |
|
leftMargin=0.75 * inch, |
|
topMargin=1 * inch, |
|
bottomMargin=1 * inch |
|
) |
|
styles = getSampleStyleSheet() |
|
cover_title = ParagraphStyle(name='CoverTitle', fontSize=24, leading=28, spaceAfter=20, alignment=TA_CENTER, textColor=colors.HexColor('#003087'), fontName='Helvetica-Bold') |
|
h1 = ParagraphStyle(name='Heading1', fontSize=16, leading=20, spaceAfter=14, alignment=TA_CENTER, textColor=colors.HexColor('#003087'), fontName='Helvetica-Bold') |
|
h2 = ParagraphStyle(name='Heading2', fontSize=12, leading=15, spaceBefore=10, spaceAfter=8, textColor=colors.HexColor('#0050BC'), fontName='Helvetica-Bold') |
|
body_text = ParagraphStyle(name='BodyText', fontSize=10, leading=14, spaceAfter=6, fontName='Helvetica', textColor=colors.HexColor('#333333')) |
|
bullet_style = ParagraphStyle(name='Bullet', parent=body_text, leftIndent=20, bulletIndent=10, bulletFontName='Helvetica', bulletFontSize=10) |
|
story = [] |
|
story.append(Spacer(1, 2 * inch)) |
|
story.append(Paragraph("Candidate Interview Analysis Report", cover_title)) |
|
story.append(Spacer(1, 0.5 * inch)) |
|
story.append(Paragraph(f"Candidate ID: {analysis_data.get('user_id', 'N/A')}", body_text)) |
|
story.append(Paragraph(f"Generated: {time.strftime('%B %d, %Y')}", body_text)) |
|
story.append(Spacer(1, 0.5 * inch)) |
|
story.append(Paragraph("Confidential", ParagraphStyle(name='Confidential', fontSize=12, alignment=TA_CENTER, textColor=colors.HexColor('#D32F2F'), fontName='Helvetica-Bold'))) |
|
story.append(PageBreak()) |
|
story.append(Paragraph("Interview Evaluation Summary", h1)) |
|
story.append(Spacer(1, 0.3 * inch)) |
|
acceptance_prob = analysis_data.get('acceptance_probability', 50.0) |
|
prob_color = colors.HexColor('#2E7D32') if acceptance_prob >= 80 else (colors.HexColor('#F57C00') if acceptance_prob >= 60 else colors.HexColor('#D32F2F')) |
|
story.append(Paragraph(f"Suitability Score: <font size=14 color='{prob_color.hexval()}'><b>{acceptance_prob:.2f}%</b></font>", h1)) |
|
story.append(Spacer(1, 0.3 * inch)) |
|
composite_scores = analysis_data.get('voice_analysis', {}).get('composite_scores', {}) |
|
if composite_scores: |
|
chart_buffer = io.BytesIO() |
|
generate_anxiety_confidence_chart(composite_scores, chart_buffer) |
|
chart_buffer.seek(0) |
|
chart_img = Image(chart_buffer, width=4*inch, height=2.5*inch) |
|
story.append(Paragraph("Vocal Dynamics: Anxiety vs. Confidence", h2)) |
|
story.append(Spacer(1, 0.2 * inch)) |
|
story.append(chart_img) |
|
story.append(Spacer(1, 0.3 * inch)) |
|
story.append(Paragraph("Full Interview Report", h2)) |
|
story.append(Spacer(1, 0.2 * inch)) |
|
for line in gemini_report_text.split('\n'): |
|
line = line.strip() |
|
if not line: |
|
continue |
|
if line.startswith('**') and line.endswith('**'): |
|
header_text = convert_markdown_to_rml(line[2:-2]) |
|
story.append(Spacer(1, 12)) |
|
story.append(Paragraph(header_text, h2)) |
|
story.append(Spacer(1, 6)) |
|
elif line.startswith('- ') or line.startswith('* '): |
|
content = convert_markdown_to_rml(line[2:]) |
|
story.append(Paragraph(f'• {content}', bullet_style)) |
|
else: |
|
content = convert_markdown_to_rml(line) |
|
story.append(Paragraph(content, body_text)) |
|
story.append(Spacer(1, 4)) |
|
doc.build(story, onFirstPage=header_footer, onLaterPages=header_footer) |
|
if not os.access(output_path, os.W_OK) or not os.path.exists(output_path): |
|
raise IOError(f"PDF file not accessible or created: {output_path}") |
|
logger.info(f"PDF report generated successfully: {output_path}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"PDF generation failed: {str(e)}", exc_info=True) |
|
return False |
|
|
|
def convert_to_serializable(obj): |
|
if isinstance(obj, np.generic): |
|
return obj.item() |
|
elif isinstance(obj, (np.float32, np.float64, np.int32, np.int64)): |
|
return obj.item() |
|
elif isinstance(obj, torch.Tensor): |
|
return obj.cpu().numpy().tolist() |
|
elif isinstance(obj, np.ndarray): |
|
return obj.tolist() |
|
elif isinstance(obj, dict): |
|
return {key: convert_to_serializable(value) for key, value in obj.items()} |
|
elif isinstance(obj, list): |
|
return [convert_to_serializable(item) for item in obj] |
|
else: |
|
try: |
|
json.dumps(obj) |
|
return obj |
|
except (TypeError, OverflowError): |
|
logger.warning(f"Non-serializable type encountered: {type(obj)}. Converting to str.") |
|
return str(obj) |
|
|
|
def process_interview(audio_path: str, user_id: str = "candidate-123") -> Dict: |
|
try: |
|
logger.info(f"Starting processing for {audio_path} (User ID: {user_id})") |
|
wav_file = convert_to_wav(audio_path) |
|
logger.debug(f"Created WAV file: {wav_file}") |
|
logger.info("Starting transcription") |
|
transcript = transcribe(wav_file) |
|
logger.info("Transcript result: %s", transcript) |
|
if not transcript or 'utterances' not in transcript or not transcript['utterances']: |
|
logger.error("Transcription failed or returned empty utterances") |
|
raise ValueError("Transcription failed or returned empty utterances") |
|
logger.info("Extracting prosodic features") |
|
for utterance in transcript['utterances']: |
|
utterance['prosodic_features'] = extract_prosodic_features( |
|
wav_file, |
|
utterance['start'], |
|
utterance['end'] |
|
) |
|
logger.info("Identifying speakers") |
|
utterances_with_speakers = identify_speakers(transcript, wav_file) |
|
logger.info("Classifying roles") |
|
if os.path.exists(os.path.join(OUTPUT_DIR, 'role_classifier.pkl')): |
|
clf = joblib.load(os.path.join(OUTPUT_DIR, 'role_classifier.pkl')) |
|
vectorizer = joblib.load(os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl')) |
|
scaler = joblib.load(os.path.join(OUTPUT_DIR, 'feature_scaler.pkl')) |
|
else: |
|
clf, vectorizer, scaler = train_role_classifier(utterances_with_speakers) |
|
classified_utterances = classify_roles(utterances_with_speakers, clf, vectorizer, scaler) |
|
logger.info("Analyzing interviewee voice") |
|
voice_analysis = analyze_interviewee_voice(wav_file, classified_utterances) |
|
analysis_data = { |
|
'user_id': user_id, |
|
'transcript': classified_utterances, |
|
'speakers': list(set(u['speaker'] for u in classified_utterances)), |
|
'voice_analysis': voice_analysis, |
|
'text_analysis': { |
|
'total_duration': sum(u['prosodic_features']['duration'] for u in classified_utterances), |
|
'speaker_turns': len(classified_utterances) |
|
} |
|
} |
|
acceptance_probability = calculate_acceptance_probability(analysis_data) |
|
analysis_data['acceptance_probability'] = acceptance_probability |
|
logger.info("Generating report text using Gemini") |
|
gemini_report_text = generate_report(analysis_data) |
|
base_name = f"{user_id}_{os.path.splitext(os.path.basename(audio_path))[0].split('_', 1)[1]}" |
|
pdf_path = os.path.join(OUTPUT_DIR, f"{base_name}_report.pdf") |
|
if not create_pdf_report(analysis_data, pdf_path, gemini_report_text=gemini_report_text): |
|
logger.error(f"Failed to create PDF report: {pdf_path}") |
|
raise RuntimeError("PDF report generation failed") |
|
try: |
|
json_path = os.path.join(OUTPUT_DIR, f"{base_name}_analysis.json") |
|
with open(json_path, 'w') as f: |
|
logger.debug(f"Serializing analysis_data with keys: {list(analysis_data.keys())}") |
|
serializable_data = convert_to_serializable(analysis_data) |
|
json.dump(serializable_data, f, indent=2) |
|
except Exception as e: |
|
logger.error(f"Failed to serialize analysis_data to JSON: {str(e)}", exc_info=True) |
|
raise |
|
os.remove(wav_file) |
|
logger.info(f"Processing completed for {audio_path} (User ID: {user_id})") |
|
return { |
|
'summary': f"User ID: {user_id}\nspeakers: {', '.join(analysis_data['speakers'])}", |
|
'json_path': json_path, |
|
'pdf_path': pdf_path |
|
} |
|
except Exception as e: |
|
logger.error(f"Processing failed: {str(e)}", exc_info=True) |
|
if 'wav_file' in locals() and os.path.exists(wav_file): |
|
os.remove(wav_file) |
|
raise |
|
|