QualiLab / transcription.py
Marek4321's picture
Update transcription.py
d406562 verified
# transcription.py - Szybka poprawka importów
import os
import time
import streamlit as st
from typing import List, Dict, Optional, Union, Tuple # Dodano Tuple
from pathlib import Path
try:
from openai import OpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
st.error("❌ OpenAI library nie jest dostępna. Zainstaluj: pip install openai")
from config import MODEL_SETTINGS, USER_MESSAGES
class AudioTranscriber:
"""Klasa do transkrypcji audio używając OpenAI Whisper API"""
def __init__(self, api_key: str):
if not OPENAI_AVAILABLE:
raise Exception("OpenAI library nie jest dostępna")
self.client = OpenAI(api_key=api_key)
self.api_key = api_key
self.transcription_stats = {
'total_files': 0,
'successful': 0,
'failed': 0,
'total_duration': 0,
'total_cost_estimate': 0
}
def transcribe_files(self, file_paths: Union[str, List[str]], language: str = "pl") -> str:
"""
Transkrypcja listy plików audio lub pojedynczego pliku
Returns: Połączona transkrypcja wszystkich plików
"""
# Obsługa pojedynczego pliku
if isinstance(file_paths, str):
file_paths = [file_paths]
transcriptions = []
for i, file_path in enumerate(file_paths):
if not os.path.exists(file_path):
st.error(f"❌ Plik nie istnieje: {file_path}")
continue
try:
# Pokaż postęp
if len(file_paths) > 1:
st.info(f"🎙️ Transkrybuję część {i+1}/{len(file_paths)}")
# Transkrypcja pojedynczego pliku z retry
transcription = self.transcribe_with_retries(file_path, language)
if transcription:
transcriptions.append(transcription)
self.transcription_stats['successful'] += 1
st.success(f"✅ Część {i+1} zakończona")
else:
self.transcription_stats['failed'] += 1
st.error(f"❌ Błąd części {i+1}")
except Exception as e:
st.error(f"❌ Błąd transkrypcji części {i+1}: {str(e)}")
self.transcription_stats['failed'] += 1
# Połącz wszystkie transkrypcje
if transcriptions:
# Jeśli było więcej niż jeden plik, dodaj separatory
if len(transcriptions) > 1:
final_transcription = transcriptions[0]
for i, text in enumerate(transcriptions[1:], 1):
final_transcription += f"\n\n=== CZĘŚĆ {i+1} ===\n\n{text}"
else:
final_transcription = transcriptions[0]
return final_transcription
else:
raise Exception("Wszystkie transkrypcje zakończone błędem")
def transcribe_with_retries(self, file_path: str, language: str = "pl", max_retries: int = 3) -> Optional[str]:
"""Transkrypcja z ponawianiem przy błędach"""
for attempt in range(max_retries):
try:
# Sprawdź rozmiar pliku przed każdą próbą
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
if file_size_mb > 25:
raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB")
result = self._transcribe_single_file(file_path, language)
if result:
return result
except Exception as e:
error_msg = str(e).lower()
st.warning(f"⚠️ Próba {attempt + 1}/{max_retries} nieudana: {str(e)}")
if attempt < max_retries - 1:
# Exponential backoff z różnymi strategiami
if "rate limit" in error_msg:
wait_time = 60 + (attempt * 30) # Rate limit = długa przerwa
st.info(f"⏳ Rate limit - czekam {wait_time}s...")
elif "timeout" in error_msg:
wait_time = 30 + (attempt * 15) # Timeout = średnia przerwa
st.info(f"⏳ Timeout - czekam {wait_time}s...")
else:
wait_time = 15 + (attempt * 10) # Inne błędy = krótka przerwa
st.info(f"⏳ Błąd - czekam {wait_time}s...")
time.sleep(wait_time)
else:
st.error(f"❌ Wszystkie {max_retries} prób nieudane dla {file_path}")
return None
def _transcribe_single_file(self, file_path: str, language: str = "pl") -> Optional[str]:
"""Transkrypcja pojedynczego pliku"""
try:
self.transcription_stats['total_files'] += 1
# Sprawdź rozmiar pliku
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
# OpenAI Whisper ma limit 25MB
if file_size_mb > 25:
raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB")
# Sprawdź czy plik nie jest pusty
if file_size == 0:
raise Exception("Plik jest pusty")
st.info(f"📤 Wysyłam do Whisper ({file_size_mb:.1f}MB)...")
# Otwórz plik i wyślij do API
with open(file_path, 'rb') as audio_file:
# Ustaw parametry transkrypcji
params = {
'model': MODEL_SETTINGS['whisper']['model'],
'file': audio_file,
'temperature': MODEL_SETTINGS['whisper']['temperature']
}
# Dodaj język tylko jeśli nie jest auto
if language != 'auto':
params['language'] = language
# Wywołaj API
transcript = self.client.audio.transcriptions.create(**params)
# Sprawdź czy otrzymaliśmy wynik
if not transcript or not hasattr(transcript, 'text') or len(transcript.text.strip()) == 0:
raise Exception("Pusty wynik transkrypcji")
# Estymacja kosztu (Whisper API: $0.006 per minute)
estimated_duration = file_size_mb * 60 # Rough estimate: 1MB ≈ 1 minute
estimated_cost = (estimated_duration / 60) * 0.006
self.transcription_stats['total_duration'] += estimated_duration
self.transcription_stats['total_cost_estimate'] += estimated_cost
st.success(f"✅ Transkrypcja otrzymana ({len(transcript.text.split())} słów)")
# Oczyść i zwróć transkrypcję
return self.clean_transcription(transcript.text)
except Exception as e:
st.error(f"❌ Błąd Whisper API: {str(e)}")
raise e
def clean_transcription(self, transcription: str) -> str:
"""Oczyszczenie i formatowanie transkrypcji"""
try:
# Usuń nadmiarowe spacje i znaki
cleaned = transcription.strip()
# Usuń nadmiarowe spacje
cleaned = ' '.join(cleaned.split())
# Podziel na akapity w rozsądnych miejscach
sentences = cleaned.split('. ')
paragraphs = []
current_paragraph = []
for sentence in sentences:
current_paragraph.append(sentence)
# Nowy akapit co 3-4 zdania
if len(current_paragraph) >= 4:
paragraphs.append('. '.join(current_paragraph) + '.')
current_paragraph = []
# Dodaj ostatni akapit
if current_paragraph:
paragraphs.append('. '.join(current_paragraph))
# Połącz akapity
formatted = '\n\n'.join(paragraphs)
return formatted
except Exception as e:
st.warning(f"⚠️ Błąd formatowania transkrypcji: {e}")
return transcription
def detect_interview_type(self, transcription: str) -> str:
"""
Automatyczne rozpoznanie typu wywiadu na podstawie treści
Returns: 'fgi', 'idi', lub 'unknown'
"""
text_lower = transcription.lower()
# Wskaźniki FGI (Focus Group)
fgi_indicators = [
'moderator', 'grupa', 'wszyscy', 'kto jeszcze', 'a państwo',
'czy zgadzacie się', 'co myślicie', 'focus group',
'uczestnicy', 'grupa fokusowa', 'dyskusja grupowa',
'co sądzicie', 'może ktoś inny', 'a jak pan/pani'
]
# Wskaźniki IDI (Individual)
idi_indicators = [
'wywiad indywidualny', 'jeden na jeden', 'prywatnie',
'osobiście', 'indywidualne', 'w cztery oczy',
'tylko między nami', 'powiedz mi', 'jak się czujesz'
]
fgi_score = sum(1 for indicator in fgi_indicators if indicator in text_lower)
idi_score = sum(1 for indicator in idi_indicators if indicator in text_lower)
# Sprawdź także liczbę różnych głosów/osób
# (FGI zwykle ma więcej przerywników, overlapping speech)
interruption_patterns = ['...', '[niewyraźnie]', '[nakładanie się głosów]', '(śmiech)', '--']
interruption_count = sum(text_lower.count(pattern) for pattern in interruption_patterns)
# Sprawdź długość - FGI są zwykle dłuższe
word_count = len(transcription.split())
# Logika decyzyjna
if fgi_score > idi_score * 1.5 and word_count > 1000:
return 'fgi'
elif idi_score > fgi_score * 1.5:
return 'idi'
elif interruption_count > 10 and word_count > 1500:
return 'fgi'
elif word_count < 800:
return 'idi'
else:
return 'unknown'
def validate_api_key(self) -> bool:
"""Sprawdź czy klucz API działa"""
try:
# Spróbuj pobrać listę modeli
models = self.client.models.list()
# Sprawdź czy whisper-1 jest dostępny
model_names = [model.id for model in models.data]
if 'whisper-1' not in model_names:
st.warning("⚠️ Model whisper-1 nie jest dostępny")
return False
return True
except Exception as e:
st.error(f"❌ Nieprawidłowy klucz API: {str(e)}")
return False
def get_transcription_stats(self) -> Dict:
"""Zwróć statystyki transkrypcji"""
stats = self.transcription_stats.copy()
# Dodaj dodatkowe metryki
if stats['total_files'] > 0:
stats['success_rate'] = (stats['successful'] / stats['total_files']) * 100
else:
stats['success_rate'] = 0
return stats
def estimate_transcription_time(self, file_paths: List[str]) -> Dict:
"""Estymuj czas i koszt transkrypcji"""
valid_files = [path for path in file_paths if os.path.exists(path)]
if not valid_files:
return {
'error': 'Brak prawidłowych plików',
'files_count': 0
}
total_size = sum(os.path.getsize(path) for path in valid_files)
total_size_mb = total_size / (1024 * 1024)
# Estymacje
estimated_duration_minutes = total_size_mb # 1MB ≈ 1 minute
estimated_api_time = estimated_duration_minutes * 0.1 # Whisper jest ~10x szybszy
estimated_cost = estimated_duration_minutes * 0.006 # $0.006 per minute
# Sprawdź limity
files_too_large = []
for path in valid_files:
file_size_mb = os.path.getsize(path) / (1024 * 1024)
if file_size_mb > 25:
files_too_large.append((path, file_size_mb))
return {
'total_size_mb': total_size_mb,
'estimated_audio_duration': estimated_duration_minutes,
'estimated_processing_time': estimated_api_time,
'estimated_cost_usd': estimated_cost,
'files_count': len(valid_files),
'files_too_large': files_too_large
}
# Funkcje pomocnicze
def validate_audio_file(file_path: str) -> Tuple[bool, str]:
"""Sprawdź czy plik audio jest prawidłowy"""
if not os.path.exists(file_path):
return False, "Plik nie istnieje"
# Sprawdź rozmiar
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
if file_size == 0:
return False, "Plik jest pusty"
if file_size_mb > 25:
return False, f"Plik za duży: {file_size_mb:.1f}MB > 25MB"
# Sprawdź rozszerzenie
valid_extensions = ['.mp3', '.wav', '.mp4', '.m4a', '.aac']
file_ext = Path(file_path).suffix.lower()
if file_ext not in valid_extensions:
return False, f"Nieobsługiwane rozszerzenie: {file_ext}"
return True, "OK"
def get_file_duration_estimate(file_path: str) -> float:
"""Estymuj długość pliku audio w minutach"""
try:
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
# Przybliżenie: 1MB ≈ 1 minuta dla typowego MP3
return file_size_mb
except:
return 0.0
# Test modułu
if __name__ == "__main__":
print("🧪 Test AudioTranscriber")
print("✅ Import OK - wszystkie typy dostępne")