GFiaMon's picture
app docker version init commit
8c85b97
# core/transcription_service.py
import whisperx
import gradio as gr
import gc
import time
import os
from datetime import datetime
import warnings
# Suppress specific pyannote/pytorch warning about degrees of freedom
warnings.filterwarnings("ignore", message="std\(\): degrees of freedom is <= 0")
from src.config.settings import Config
# CORRECT WAY: Import DiarizationPipeline at point of use
from whisperx.diarize import DiarizationPipeline
class TranscriptionService:
def __init__(self):
self.config = Config
self.models_loaded = False
self.whisper_model = None
self.diarize_model = None
self.batch_size = 16
def load_models(self):
"""Load AI models once - use pre-loaded models from init"""
if not self.models_loaded:
print("πŸ“₯ Loading transcription models...")
# Use the model from config instead of hardcoding
self.whisper_model = whisperx.load_model(
self.config.WHISPER_MODEL,
self.config.DEVICE,
compute_type=self.config.COMPUTE_TYPE,
language="en"
)
self.diarize_model = DiarizationPipeline(
use_auth_token=self.config.HUGGINGFACE_TOKEN,
device=self.config.DEVICE
)
self.models_loaded = True
print("βœ… Models loaded successfully")
def transcribe_video(self, video_file_path, progress_callback=None):
"""Clean transcription pipeline without Gradio dependencies.
Added optional progress callback"""
try:
if not self.models_loaded:
self.load_models()
start_time = time.time()
print(f"🎬 Processing video: {os.path.basename(video_file_path)}")
# ======================
# STEP 1: Load Audio from Video
# ======================
if progress_callback:
print(f"DEBUG: Calling progress callback 0.1. Type: {type(progress_callback)}")
try:
progress_callback(0.1, desc="🎬 Loading audio from video...")
print("DEBUG: Progress callback 0.1 called successfully")
except Exception as e:
print(f"DEBUG: Error calling progress callback: {e}")
time.sleep(0.5)
print("1️⃣ Loading audio directly from video...")
audio = whisperx.load_audio(video_file_path)
print(f"βœ… Audio loaded: {len(audio)} samples")
# ======================
# STEP 2: Transcribe with Whisper
# ======================
print("2️⃣ Loading Whisper model...")
if progress_callback:
progress_callback(0.3, desc="πŸ€– Loading Whisper model...")
time.sleep(0.5)
if progress_callback:
progress_callback(0.4, desc="πŸ“ Transcribing audio...")
time.sleep(0.5)
print("3️⃣ Transcribing audio...")
result = self.whisper_model.transcribe(audio, batch_size=self.batch_size)
detected_language = result['language'] # Save language before it gets lost
print(f"βœ… Transcription complete ({detected_language} detected)")
# ======================
# STEP 3: Align Timestamps
# ======================
if progress_callback:
progress_callback(0.5, desc="⏱️ Aligning timestamps...")
time.sleep(0.5)
print("4️⃣ Aligning word-level timestamps...")
# Load the alignment model and its metadata from whisperx for word-level timestamp alignment.
model_a, metadata = whisperx.load_align_model(
language_code=detected_language,
device=self.config.DEVICE
)
result = whisperx.align(
result["segments"],
model_a,
metadata,
audio,
self.config.DEVICE,
return_char_alignments=False
)
# Restore language to result dict after alignment
result["language"] = detected_language
print("βœ… Timestamps aligned")
# ======================
# STEP 4: Speaker Diarization - CORRECT IMPORT
# ======================
if progress_callback:
progress_callback(0.7, desc="πŸ‘₯ Identifying speakers...")
time.sleep(0.5)
print("5️⃣ Loading speaker diarization model...")
diarize_segments = self.diarize_model(audio)
# ======================
# STEP 5: Assign speakers
# ======================
#
if progress_callback:
progress_callback(0.9, desc="πŸ”— Assigning speakers to text...")
time.sleep(0.5)
result = whisperx.assign_word_speakers(diarize_segments, result)
print("6️⃣ Assigning speakers to transcript...")
print("πŸ”— Assigning speakers to text...")
result = whisperx.assign_word_speakers(diarize_segments, result)
print("βœ… Speaker assignment complete")
if progress_callback:
progress_callback(1.0, desc="βœ… Complete!")
time.sleep(0.5)
# ======================
# STEP 6: Format results
# ======================
processing_time = time.time() - start_time
transcription = self._format_results(result, video_file_path)
timing_info = self._get_timing_info(result, processing_time, video_file_path)
return {
"success": True,
"transcription": transcription,
"timing_info": timing_info,
"raw_data": result, # Keep for potential storage
"processing_time": processing_time,
"speakers_count": len(set(seg.get("speaker", "UNKNOWN") for seg in result["segments"]))
}
except Exception as e:
error_msg = f"Transcription failed: {str(e)}"
print(f"❌ ERROR: {error_msg}")
return {
"success": False,
"error": error_msg
}
def _format_results(self, result, video_file_path):
"""Format transcription with speaker labels and comprehensive meeting metadata"""
if not result["segments"]:
return "No transcription segments found"
# Extract meeting metadata
segments = result["segments"]
speakers = set(segment.get("speaker", "UNKNOWN") for segment in segments)
total_duration = segments[-1]["end"] if segments else 0
language = result.get("language", "unknown")
# Calculate statistics
total_words = sum(len(seg.get("text", "").split()) for seg in segments)
avg_segment_length = total_words / len(segments) if segments else 0
# Build header with meeting context
output = "# 🎯 Meeting Transcription\n\n"
output += "## πŸ“‹ Meeting Information\n\n"
output += f"**πŸ“ File:** `{os.path.basename(video_file_path)}`\n"
output += f"**πŸ“… Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
output += f"**⏱️ Duration:** {self._format_timestamp(total_duration)}\n"
output += f"**πŸ‘₯ Speakers:** {len(speakers)}\n"
output += f"**🌐 Language:** {language.upper()}\n"
output += f"**πŸ€– Model:** {self.config.WHISPER_MODEL}\n\n"
output += "---\n\n"
output += "## πŸ’¬ Transcript\n\n"
# Add transcript content
current_speaker = None
for segment in segments:
speaker = segment.get("speaker", "UNKNOWN")
start_time = self._format_timestamp(segment["start"])
if speaker != current_speaker:
output += f"\n**πŸ‘€ {speaker}:**\n"
current_speaker = speaker
output += f"[{start_time}] {segment['text'].strip()}\n"
# Add comprehensive footer
output += "\n---\n\n"
output += "## πŸ“Š Transcript Statistics\n\n"
output += f"**Total Segments:** {len(segments)}\n"
output += f"**Total Words:** {total_words:,}\n"
output += f"**Avg Words/Segment:** {avg_segment_length:.1f}\n"
output += f"**Unique Speakers:** {len(speakers)}\n"
output += f"**Speaker IDs:** {', '.join(sorted(speakers))}\n"
return output
def _get_timing_info(self, result, processing_time, video_file_path):
"""Generate timing information"""
if not result["segments"]:
return "No timing information available"
total_duration = result["segments"][-1]["end"]
speed_ratio = total_duration / processing_time if processing_time > 0 else 0
video_name = os.path.basename(video_file_path)
return f"""
## ⏱️ Processing Statistics
**File:** {video_name}
\n**Duration:** {self._format_timestamp(total_duration)}
**Processing Time:** {processing_time:.1f}s
\n**Speed:** {speed_ratio:.1f}x ({'Faster' if speed_ratio > 1 else 'Slower'} than real-time)
**Completed:** {datetime.now().strftime("%H:%M:%S")}
"""
def _format_timestamp(self, seconds):
"""Convert seconds to MM:SS format"""
if seconds is None:
return "00:00"
minutes = int(seconds // 60)
seconds = int(seconds % 60)
return f"{minutes:02d}:{seconds:02d}"