ASR_gradio / audio_processing.py
Kr08's picture
Update audio_processing.py
745e5b6 verified
raw
history blame
3.31 kB
import whisperx
import torch
import numpy as np
from scipy.signal import resample
import numpy as np
import whisperx
from pyannote.audio import Pipeline
import os
from dotenv import load_dotenv
load_dotenv()
hf_token = os.getenv("HF_TOKEN")
import whisperx
import torch
import numpy as np
import whisperx
import torch
import numpy as np
import whisperx
import torch
import numpy as np
CHUNK_LENGTH= 30
import whisperx
import torch
import numpy as np
def preprocess_audio(audio, chunk_size=CHUNK_LENGTH*16000): # 30 seconds at 16kHz
chunks = []
for i in range(0, len(audio), chunk_size):
chunk = audio[i:i+chunk_size]
if len(chunk) < chunk_size:
chunk = np.pad(chunk, (0, chunk_size - len(chunk)))
chunks.append(chunk)
return chunks
def process_audio(audio_file):
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float32"
audio = whisperx.load_audio(audio_file)
model = whisperx.load_model("small", device, compute_type=compute_type)
# Initialize speaker diarization pipeline
diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hf_token)
diarization_pipeline = diarization_pipeline.to(torch.device(device))
# Perform diarization on the entire audio
diarization_result = diarization_pipeline({"waveform": torch.from_numpy(audio).unsqueeze(0), "sample_rate": 16000})
# Preprocess audio into consistent chunks
chunks = preprocess_audio(audio)
language_segments = []
final_segments = []
for i, chunk in enumerate(chunks):
# Detect language for this chunk
lang = model.detect_language(chunk)
# Transcribe this chunk
result = model.transcribe(chunk, language=lang)
chunk_start_time = i * 5 # Each chunk is 30 seconds
# Adjust timestamps and add language information
for segment in result["segments"]:
segment_start = chunk_start_time + segment["start"]
segment_end = chunk_start_time + segment["end"]
segment["start"] = segment_start
segment["end"] = segment_end
segment["language"] = lang
speakers = []
for turn, track, speaker in diarization_result.itertracks(yield_label=True):
if turn.start <= segment_end and turn.end >= segment_start:
speakers.append(speaker)
if speakers:
segment["speaker"] = max(set(speakers), key=speakers.count)
else:
segment["speaker"] = "Unknown"
final_segments.append(segment)
# Add language segment
language_segments.append({
"language": lang,
"start": chunk_start_time,
"end": chunk_start_time + 5
})
return language_segments, final_segments
def print_results(language, language_probs, segments):
print(f"Detected Language: {language}")
print("Language Probabilities:")
for lang, prob in language_probs.items():
print(f" {lang}: {prob:.4f}")
print("\nTranscription:")
for segment in segments:
print(f"[{segment['start']:.2f}s - {segment['end']:.2f}s] Speaker {segment['speaker']}: {segment['text']}")