File size: 14,077 Bytes
d406562 7889259 d406562 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b d406562 97cca2b d406562 97cca2b 7889259 d406562 7889259 97cca2b d406562 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 97cca2b 7889259 d406562 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# transcription.py - Szybka poprawka importów
import os
import time
import streamlit as st
from typing import List, Dict, Optional, Union, Tuple # Dodano Tuple
from pathlib import Path
try:
from openai import OpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
st.error("❌ OpenAI library nie jest dostępna. Zainstaluj: pip install openai")
from config import MODEL_SETTINGS, USER_MESSAGES
class AudioTranscriber:
"""Klasa do transkrypcji audio używając OpenAI Whisper API"""
def __init__(self, api_key: str):
if not OPENAI_AVAILABLE:
raise Exception("OpenAI library nie jest dostępna")
self.client = OpenAI(api_key=api_key)
self.api_key = api_key
self.transcription_stats = {
'total_files': 0,
'successful': 0,
'failed': 0,
'total_duration': 0,
'total_cost_estimate': 0
}
def transcribe_files(self, file_paths: Union[str, List[str]], language: str = "pl") -> str:
"""
Transkrypcja listy plików audio lub pojedynczego pliku
Returns: Połączona transkrypcja wszystkich plików
"""
# Obsługa pojedynczego pliku
if isinstance(file_paths, str):
file_paths = [file_paths]
transcriptions = []
for i, file_path in enumerate(file_paths):
if not os.path.exists(file_path):
st.error(f"❌ Plik nie istnieje: {file_path}")
continue
try:
# Pokaż postęp
if len(file_paths) > 1:
st.info(f"🎙️ Transkrybuję część {i+1}/{len(file_paths)}")
# Transkrypcja pojedynczego pliku z retry
transcription = self.transcribe_with_retries(file_path, language)
if transcription:
transcriptions.append(transcription)
self.transcription_stats['successful'] += 1
st.success(f"✅ Część {i+1} zakończona")
else:
self.transcription_stats['failed'] += 1
st.error(f"❌ Błąd części {i+1}")
except Exception as e:
st.error(f"❌ Błąd transkrypcji części {i+1}: {str(e)}")
self.transcription_stats['failed'] += 1
# Połącz wszystkie transkrypcje
if transcriptions:
# Jeśli było więcej niż jeden plik, dodaj separatory
if len(transcriptions) > 1:
final_transcription = transcriptions[0]
for i, text in enumerate(transcriptions[1:], 1):
final_transcription += f"\n\n=== CZĘŚĆ {i+1} ===\n\n{text}"
else:
final_transcription = transcriptions[0]
return final_transcription
else:
raise Exception("Wszystkie transkrypcje zakończone błędem")
def transcribe_with_retries(self, file_path: str, language: str = "pl", max_retries: int = 3) -> Optional[str]:
"""Transkrypcja z ponawianiem przy błędach"""
for attempt in range(max_retries):
try:
# Sprawdź rozmiar pliku przed każdą próbą
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
if file_size_mb > 25:
raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB")
result = self._transcribe_single_file(file_path, language)
if result:
return result
except Exception as e:
error_msg = str(e).lower()
st.warning(f"⚠️ Próba {attempt + 1}/{max_retries} nieudana: {str(e)}")
if attempt < max_retries - 1:
# Exponential backoff z różnymi strategiami
if "rate limit" in error_msg:
wait_time = 60 + (attempt * 30) # Rate limit = długa przerwa
st.info(f"⏳ Rate limit - czekam {wait_time}s...")
elif "timeout" in error_msg:
wait_time = 30 + (attempt * 15) # Timeout = średnia przerwa
st.info(f"⏳ Timeout - czekam {wait_time}s...")
else:
wait_time = 15 + (attempt * 10) # Inne błędy = krótka przerwa
st.info(f"⏳ Błąd - czekam {wait_time}s...")
time.sleep(wait_time)
else:
st.error(f"❌ Wszystkie {max_retries} prób nieudane dla {file_path}")
return None
def _transcribe_single_file(self, file_path: str, language: str = "pl") -> Optional[str]:
"""Transkrypcja pojedynczego pliku"""
try:
self.transcription_stats['total_files'] += 1
# Sprawdź rozmiar pliku
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
# OpenAI Whisper ma limit 25MB
if file_size_mb > 25:
raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB")
# Sprawdź czy plik nie jest pusty
if file_size == 0:
raise Exception("Plik jest pusty")
st.info(f"📤 Wysyłam do Whisper ({file_size_mb:.1f}MB)...")
# Otwórz plik i wyślij do API
with open(file_path, 'rb') as audio_file:
# Ustaw parametry transkrypcji
params = {
'model': MODEL_SETTINGS['whisper']['model'],
'file': audio_file,
'temperature': MODEL_SETTINGS['whisper']['temperature']
}
# Dodaj język tylko jeśli nie jest auto
if language != 'auto':
params['language'] = language
# Wywołaj API
transcript = self.client.audio.transcriptions.create(**params)
# Sprawdź czy otrzymaliśmy wynik
if not transcript or not hasattr(transcript, 'text') or len(transcript.text.strip()) == 0:
raise Exception("Pusty wynik transkrypcji")
# Estymacja kosztu (Whisper API: $0.006 per minute)
estimated_duration = file_size_mb * 60 # Rough estimate: 1MB ≈ 1 minute
estimated_cost = (estimated_duration / 60) * 0.006
self.transcription_stats['total_duration'] += estimated_duration
self.transcription_stats['total_cost_estimate'] += estimated_cost
st.success(f"✅ Transkrypcja otrzymana ({len(transcript.text.split())} słów)")
# Oczyść i zwróć transkrypcję
return self.clean_transcription(transcript.text)
except Exception as e:
st.error(f"❌ Błąd Whisper API: {str(e)}")
raise e
def clean_transcription(self, transcription: str) -> str:
"""Oczyszczenie i formatowanie transkrypcji"""
try:
# Usuń nadmiarowe spacje i znaki
cleaned = transcription.strip()
# Usuń nadmiarowe spacje
cleaned = ' '.join(cleaned.split())
# Podziel na akapity w rozsądnych miejscach
sentences = cleaned.split('. ')
paragraphs = []
current_paragraph = []
for sentence in sentences:
current_paragraph.append(sentence)
# Nowy akapit co 3-4 zdania
if len(current_paragraph) >= 4:
paragraphs.append('. '.join(current_paragraph) + '.')
current_paragraph = []
# Dodaj ostatni akapit
if current_paragraph:
paragraphs.append('. '.join(current_paragraph))
# Połącz akapity
formatted = '\n\n'.join(paragraphs)
return formatted
except Exception as e:
st.warning(f"⚠️ Błąd formatowania transkrypcji: {e}")
return transcription
def detect_interview_type(self, transcription: str) -> str:
"""
Automatyczne rozpoznanie typu wywiadu na podstawie treści
Returns: 'fgi', 'idi', lub 'unknown'
"""
text_lower = transcription.lower()
# Wskaźniki FGI (Focus Group)
fgi_indicators = [
'moderator', 'grupa', 'wszyscy', 'kto jeszcze', 'a państwo',
'czy zgadzacie się', 'co myślicie', 'focus group',
'uczestnicy', 'grupa fokusowa', 'dyskusja grupowa',
'co sądzicie', 'może ktoś inny', 'a jak pan/pani'
]
# Wskaźniki IDI (Individual)
idi_indicators = [
'wywiad indywidualny', 'jeden na jeden', 'prywatnie',
'osobiście', 'indywidualne', 'w cztery oczy',
'tylko między nami', 'powiedz mi', 'jak się czujesz'
]
fgi_score = sum(1 for indicator in fgi_indicators if indicator in text_lower)
idi_score = sum(1 for indicator in idi_indicators if indicator in text_lower)
# Sprawdź także liczbę różnych głosów/osób
# (FGI zwykle ma więcej przerywników, overlapping speech)
interruption_patterns = ['...', '[niewyraźnie]', '[nakładanie się głosów]', '(śmiech)', '--']
interruption_count = sum(text_lower.count(pattern) for pattern in interruption_patterns)
# Sprawdź długość - FGI są zwykle dłuższe
word_count = len(transcription.split())
# Logika decyzyjna
if fgi_score > idi_score * 1.5 and word_count > 1000:
return 'fgi'
elif idi_score > fgi_score * 1.5:
return 'idi'
elif interruption_count > 10 and word_count > 1500:
return 'fgi'
elif word_count < 800:
return 'idi'
else:
return 'unknown'
def validate_api_key(self) -> bool:
"""Sprawdź czy klucz API działa"""
try:
# Spróbuj pobrać listę modeli
models = self.client.models.list()
# Sprawdź czy whisper-1 jest dostępny
model_names = [model.id for model in models.data]
if 'whisper-1' not in model_names:
st.warning("⚠️ Model whisper-1 nie jest dostępny")
return False
return True
except Exception as e:
st.error(f"❌ Nieprawidłowy klucz API: {str(e)}")
return False
def get_transcription_stats(self) -> Dict:
"""Zwróć statystyki transkrypcji"""
stats = self.transcription_stats.copy()
# Dodaj dodatkowe metryki
if stats['total_files'] > 0:
stats['success_rate'] = (stats['successful'] / stats['total_files']) * 100
else:
stats['success_rate'] = 0
return stats
def estimate_transcription_time(self, file_paths: List[str]) -> Dict:
"""Estymuj czas i koszt transkrypcji"""
valid_files = [path for path in file_paths if os.path.exists(path)]
if not valid_files:
return {
'error': 'Brak prawidłowych plików',
'files_count': 0
}
total_size = sum(os.path.getsize(path) for path in valid_files)
total_size_mb = total_size / (1024 * 1024)
# Estymacje
estimated_duration_minutes = total_size_mb # 1MB ≈ 1 minute
estimated_api_time = estimated_duration_minutes * 0.1 # Whisper jest ~10x szybszy
estimated_cost = estimated_duration_minutes * 0.006 # $0.006 per minute
# Sprawdź limity
files_too_large = []
for path in valid_files:
file_size_mb = os.path.getsize(path) / (1024 * 1024)
if file_size_mb > 25:
files_too_large.append((path, file_size_mb))
return {
'total_size_mb': total_size_mb,
'estimated_audio_duration': estimated_duration_minutes,
'estimated_processing_time': estimated_api_time,
'estimated_cost_usd': estimated_cost,
'files_count': len(valid_files),
'files_too_large': files_too_large
}
# Funkcje pomocnicze
def validate_audio_file(file_path: str) -> Tuple[bool, str]:
"""Sprawdź czy plik audio jest prawidłowy"""
if not os.path.exists(file_path):
return False, "Plik nie istnieje"
# Sprawdź rozmiar
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
if file_size == 0:
return False, "Plik jest pusty"
if file_size_mb > 25:
return False, f"Plik za duży: {file_size_mb:.1f}MB > 25MB"
# Sprawdź rozszerzenie
valid_extensions = ['.mp3', '.wav', '.mp4', '.m4a', '.aac']
file_ext = Path(file_path).suffix.lower()
if file_ext not in valid_extensions:
return False, f"Nieobsługiwane rozszerzenie: {file_ext}"
return True, "OK"
def get_file_duration_estimate(file_path: str) -> float:
"""Estymuj długość pliku audio w minutach"""
try:
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
# Przybliżenie: 1MB ≈ 1 minuta dla typowego MP3
return file_size_mb
except:
return 0.0
# Test modułu
if __name__ == "__main__":
print("🧪 Test AudioTranscriber")
print("✅ Import OK - wszystkie typy dostępne") |