Spaces:
Running
on
Zero
Running
on
Zero
import whisperx | |
import torch | |
import numpy as np | |
from scipy.signal import resample | |
import numpy as np | |
import whisperx | |
from pyannote.audio import Pipeline | |
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
hf_token = os.getenv("HF_TOKEN") | |
import whisperx | |
import torch | |
import numpy as np | |
import whisperx | |
import torch | |
import numpy as np | |
import whisperx | |
import torch | |
import numpy as np | |
CHUNK_LENGTH= 30 | |
import whisperx | |
import torch | |
import numpy as np | |
def preprocess_audio(audio, chunk_size=CHUNK_LENGTH*16000): # 30 seconds at 16kHz | |
chunks = [] | |
for i in range(0, len(audio), chunk_size): | |
chunk = audio[i:i+chunk_size] | |
if len(chunk) < chunk_size: | |
chunk = np.pad(chunk, (0, chunk_size - len(chunk))) | |
chunks.append(chunk) | |
return chunks | |
def process_audio(audio_file): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
compute_type = "float32" | |
audio = whisperx.load_audio(audio_file) | |
model = whisperx.load_model("small", device, compute_type=compute_type) | |
# Initialize speaker diarization pipeline | |
diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hf_token) | |
diarization_pipeline = diarization_pipeline.to(torch.device(device)) | |
# Perform diarization on the entire audio | |
diarization_result = diarization_pipeline({"waveform": torch.from_numpy(audio).unsqueeze(0), "sample_rate": 16000}) | |
# Preprocess audio into consistent chunks | |
chunks = preprocess_audio(audio) | |
language_segments = [] | |
final_segments = [] | |
for i, chunk in enumerate(chunks): | |
# Detect language for this chunk | |
lang = model.detect_language(chunk) | |
# Transcribe this chunk | |
result = model.transcribe(chunk, language=lang) | |
chunk_start_time = i * 5 # Each chunk is 30 seconds | |
# Adjust timestamps and add language information | |
for segment in result["segments"]: | |
segment_start = chunk_start_time + segment["start"] | |
segment_end = chunk_start_time + segment["end"] | |
segment["start"] = segment_start | |
segment["end"] = segment_end | |
segment["language"] = lang | |
speakers = [] | |
for turn, track, speaker in diarization_result.itertracks(yield_label=True): | |
if turn.start <= segment_end and turn.end >= segment_start: | |
speakers.append(speaker) | |
if speakers: | |
segment["speaker"] = max(set(speakers), key=speakers.count) | |
else: | |
segment["speaker"] = "Unknown" | |
final_segments.append(segment) | |
# Add language segment | |
language_segments.append({ | |
"language": lang, | |
"start": chunk_start_time, | |
"end": chunk_start_time + 5 | |
}) | |
return language_segments, final_segments | |
def print_results(language, language_probs, segments): | |
print(f"Detected Language: {language}") | |
print("Language Probabilities:") | |
for lang, prob in language_probs.items(): | |
print(f" {lang}: {prob:.4f}") | |
print("\nTranscription:") | |
for segment in segments: | |
print(f"[{segment['start']:.2f}s - {segment['end']:.2f}s] Speaker {segment['speaker']}: {segment['text']}") |