Spaces:
Running
Running
from pyannote.audio import Pipeline | |
from typing import List, Dict, Any | |
import torch | |
import os | |
from config import settings | |
# HF_TOKEN hardcoded assignment removed for security | |
_diarization_pipeline = None | |
def get_diarization_pipeline(): | |
"""Diarization pipeline singleton with fallback""" | |
global _diarization_pipeline | |
if not os.getenv("HF_TOKEN"): | |
print("Warning: HF_TOKEN not set! Diarization will be disabled.") | |
return None | |
if _diarization_pipeline is None: | |
try: | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
_diarization_pipeline = Pipeline.from_pretrained( | |
settings.DIARIZATION_MODEL, | |
use_auth_token=os.environ.get("HF_TOKEN"), | |
device=device | |
) | |
except Exception as e: | |
print(f"Error loading diarization pipeline: {e}") | |
return None | |
return _diarization_pipeline | |
def rename_speakers_for_pediatrics(segments: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
""" | |
Konuşmacıları pediatri bağlamına göre yeniden isimlendirir | |
""" | |
# Konuşmacıları basit bir şekilde yeniden isimlendiriyoruz | |
# Gerçek bir uygulamada ses özellikleri analizi ile daha sofistike olabilir | |
renamed_segments = [] | |
speaker_mapping = {} | |
for segment in segments: | |
speaker = segment["speaker"] | |
if speaker not in speaker_mapping: | |
# İlk konuşmacıyı bölüm başkanı olarak kabul ediyoruz | |
if len(speaker_mapping) == 0: | |
speaker_mapping[speaker] = "Bölüm_Başkanı" | |
# İkinci konuşmacıyı hekim olarak kabul ediyoruz | |
elif len(speaker_mapping) == 1: | |
speaker_mapping[speaker] = "Hekim" | |
# Üçüncü konuşmacıyı asistan olarak kabul ediyoruz | |
elif len(speaker_mapping) == 2: | |
speaker_mapping[speaker] = "Asistan" | |
# Diğer konuşmacılar | |
else: | |
speaker_mapping[speaker] = f"Konuşmacı_{len(speaker_mapping) + 1}" | |
# Segment kopyası oluştur ve konuşmacı ismini güncelle | |
new_segment = segment.copy() | |
new_segment["speaker"] = speaker_mapping[speaker] | |
renamed_segments.append(new_segment) | |
return renamed_segments | |
def diarize_audio(audio_path: str) -> List[Dict[str, Any]]: | |
"""Diarize audio with fallback to single speaker""" | |
try: | |
pipeline = get_diarization_pipeline() | |
if pipeline is None: | |
# Fallback: Return single speaker for entire duration | |
return [{"speaker": "Speaker 1", "start": 0.0, "end": float("inf")}] | |
diarization = pipeline(audio_path) | |
segments = [] | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
if turn.duration >= settings.MIN_SPEAKER_DURATION: | |
segments.append({ | |
"speaker": f"Speaker {speaker.split('_')[-1]}", | |
"start": turn.start, | |
"end": turn.end | |
}) | |
return segments | |
except Exception as e: | |
print(f"Diarization error: {e}") | |
# Fallback: Return single speaker | |
return [{"speaker": "Speaker 1", "start": 0.0, "end": float("inf")}] | |
def diarize_segments(audio_file: str, is_pediatrics: bool = True) -> List[Dict[str, Any]]: | |
""" | |
Ses dosyasındaki konuşmacıları ayırt eder | |
Args: | |
audio_file: Ses dosyasının yolu | |
Returns: | |
Konuşmacı segmentleri listesi | |
[ | |
{"speaker": "speaker_0", "start": 0.5, "end": 2.3, "text": "..."}, | |
{"speaker": "speaker_1", "start": 2.4, "end": 5.1, "text": "..."}, | |
... | |
] | |
""" | |
# Pipeline'ı al | |
pipeline = get_diarization_pipeline() | |
# Diyarizasyon gerçekleştir | |
diarization = pipeline(audio_file) | |
# Sonuçları formatlayalım | |
results = [] | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
segment = { | |
"speaker": speaker, | |
"start": turn.start, | |
"end": turn.end, | |
"text": "" # Bu alanı transcribe işlemi sonrası dolduracağız | |
} | |
results.append(segment) | |
# Pediatri bağlamı için konuşmacı isimlerini güncelle | |
if is_pediatrics: | |
results = rename_speakers_for_pediatrics(results) | |
return results |