|
|
|
|
|
import os |
|
import time |
|
import streamlit as st |
|
from typing import List, Dict, Optional, Union, Tuple |
|
from pathlib import Path |
|
|
|
try: |
|
from openai import OpenAI |
|
OPENAI_AVAILABLE = True |
|
except ImportError: |
|
OPENAI_AVAILABLE = False |
|
st.error("❌ OpenAI library nie jest dostępna. Zainstaluj: pip install openai") |
|
|
|
from config import MODEL_SETTINGS, USER_MESSAGES |
|
|
|
class AudioTranscriber: |
|
"""Klasa do transkrypcji audio używając OpenAI Whisper API""" |
|
|
|
def __init__(self, api_key: str): |
|
if not OPENAI_AVAILABLE: |
|
raise Exception("OpenAI library nie jest dostępna") |
|
|
|
self.client = OpenAI(api_key=api_key) |
|
self.api_key = api_key |
|
self.transcription_stats = { |
|
'total_files': 0, |
|
'successful': 0, |
|
'failed': 0, |
|
'total_duration': 0, |
|
'total_cost_estimate': 0 |
|
} |
|
|
|
def transcribe_files(self, file_paths: Union[str, List[str]], language: str = "pl") -> str: |
|
""" |
|
Transkrypcja listy plików audio lub pojedynczego pliku |
|
Returns: Połączona transkrypcja wszystkich plików |
|
""" |
|
|
|
if isinstance(file_paths, str): |
|
file_paths = [file_paths] |
|
|
|
transcriptions = [] |
|
|
|
for i, file_path in enumerate(file_paths): |
|
if not os.path.exists(file_path): |
|
st.error(f"❌ Plik nie istnieje: {file_path}") |
|
continue |
|
|
|
try: |
|
|
|
if len(file_paths) > 1: |
|
st.info(f"🎙️ Transkrybuję część {i+1}/{len(file_paths)}") |
|
|
|
|
|
transcription = self.transcribe_with_retries(file_path, language) |
|
|
|
if transcription: |
|
transcriptions.append(transcription) |
|
self.transcription_stats['successful'] += 1 |
|
st.success(f"✅ Część {i+1} zakończona") |
|
else: |
|
self.transcription_stats['failed'] += 1 |
|
st.error(f"❌ Błąd części {i+1}") |
|
|
|
except Exception as e: |
|
st.error(f"❌ Błąd transkrypcji części {i+1}: {str(e)}") |
|
self.transcription_stats['failed'] += 1 |
|
|
|
|
|
if transcriptions: |
|
|
|
if len(transcriptions) > 1: |
|
final_transcription = transcriptions[0] |
|
for i, text in enumerate(transcriptions[1:], 1): |
|
final_transcription += f"\n\n=== CZĘŚĆ {i+1} ===\n\n{text}" |
|
else: |
|
final_transcription = transcriptions[0] |
|
|
|
return final_transcription |
|
else: |
|
raise Exception("Wszystkie transkrypcje zakończone błędem") |
|
|
|
def transcribe_with_retries(self, file_path: str, language: str = "pl", max_retries: int = 3) -> Optional[str]: |
|
"""Transkrypcja z ponawianiem przy błędach""" |
|
for attempt in range(max_retries): |
|
try: |
|
|
|
file_size_mb = os.path.getsize(file_path) / (1024 * 1024) |
|
if file_size_mb > 25: |
|
raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB") |
|
|
|
result = self._transcribe_single_file(file_path, language) |
|
if result: |
|
return result |
|
|
|
except Exception as e: |
|
error_msg = str(e).lower() |
|
st.warning(f"⚠️ Próba {attempt + 1}/{max_retries} nieudana: {str(e)}") |
|
|
|
if attempt < max_retries - 1: |
|
|
|
if "rate limit" in error_msg: |
|
wait_time = 60 + (attempt * 30) |
|
st.info(f"⏳ Rate limit - czekam {wait_time}s...") |
|
elif "timeout" in error_msg: |
|
wait_time = 30 + (attempt * 15) |
|
st.info(f"⏳ Timeout - czekam {wait_time}s...") |
|
else: |
|
wait_time = 15 + (attempt * 10) |
|
st.info(f"⏳ Błąd - czekam {wait_time}s...") |
|
|
|
time.sleep(wait_time) |
|
else: |
|
st.error(f"❌ Wszystkie {max_retries} prób nieudane dla {file_path}") |
|
|
|
return None |
|
|
|
def _transcribe_single_file(self, file_path: str, language: str = "pl") -> Optional[str]: |
|
"""Transkrypcja pojedynczego pliku""" |
|
try: |
|
self.transcription_stats['total_files'] += 1 |
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
file_size_mb = file_size / (1024 * 1024) |
|
|
|
|
|
if file_size_mb > 25: |
|
raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB") |
|
|
|
|
|
if file_size == 0: |
|
raise Exception("Plik jest pusty") |
|
|
|
st.info(f"📤 Wysyłam do Whisper ({file_size_mb:.1f}MB)...") |
|
|
|
|
|
with open(file_path, 'rb') as audio_file: |
|
|
|
params = { |
|
'model': MODEL_SETTINGS['whisper']['model'], |
|
'file': audio_file, |
|
'temperature': MODEL_SETTINGS['whisper']['temperature'] |
|
} |
|
|
|
|
|
if language != 'auto': |
|
params['language'] = language |
|
|
|
|
|
transcript = self.client.audio.transcriptions.create(**params) |
|
|
|
|
|
if not transcript or not hasattr(transcript, 'text') or len(transcript.text.strip()) == 0: |
|
raise Exception("Pusty wynik transkrypcji") |
|
|
|
|
|
estimated_duration = file_size_mb * 60 |
|
estimated_cost = (estimated_duration / 60) * 0.006 |
|
self.transcription_stats['total_duration'] += estimated_duration |
|
self.transcription_stats['total_cost_estimate'] += estimated_cost |
|
|
|
st.success(f"✅ Transkrypcja otrzymana ({len(transcript.text.split())} słów)") |
|
|
|
|
|
return self.clean_transcription(transcript.text) |
|
|
|
except Exception as e: |
|
st.error(f"❌ Błąd Whisper API: {str(e)}") |
|
raise e |
|
|
|
def clean_transcription(self, transcription: str) -> str: |
|
"""Oczyszczenie i formatowanie transkrypcji""" |
|
try: |
|
|
|
cleaned = transcription.strip() |
|
|
|
|
|
cleaned = ' '.join(cleaned.split()) |
|
|
|
|
|
sentences = cleaned.split('. ') |
|
paragraphs = [] |
|
current_paragraph = [] |
|
|
|
for sentence in sentences: |
|
current_paragraph.append(sentence) |
|
|
|
|
|
if len(current_paragraph) >= 4: |
|
paragraphs.append('. '.join(current_paragraph) + '.') |
|
current_paragraph = [] |
|
|
|
|
|
if current_paragraph: |
|
paragraphs.append('. '.join(current_paragraph)) |
|
|
|
|
|
formatted = '\n\n'.join(paragraphs) |
|
|
|
return formatted |
|
|
|
except Exception as e: |
|
st.warning(f"⚠️ Błąd formatowania transkrypcji: {e}") |
|
return transcription |
|
|
|
def detect_interview_type(self, transcription: str) -> str: |
|
""" |
|
Automatyczne rozpoznanie typu wywiadu na podstawie treści |
|
Returns: 'fgi', 'idi', lub 'unknown' |
|
""" |
|
text_lower = transcription.lower() |
|
|
|
|
|
fgi_indicators = [ |
|
'moderator', 'grupa', 'wszyscy', 'kto jeszcze', 'a państwo', |
|
'czy zgadzacie się', 'co myślicie', 'focus group', |
|
'uczestnicy', 'grupa fokusowa', 'dyskusja grupowa', |
|
'co sądzicie', 'może ktoś inny', 'a jak pan/pani' |
|
] |
|
|
|
|
|
idi_indicators = [ |
|
'wywiad indywidualny', 'jeden na jeden', 'prywatnie', |
|
'osobiście', 'indywidualne', 'w cztery oczy', |
|
'tylko między nami', 'powiedz mi', 'jak się czujesz' |
|
] |
|
|
|
fgi_score = sum(1 for indicator in fgi_indicators if indicator in text_lower) |
|
idi_score = sum(1 for indicator in idi_indicators if indicator in text_lower) |
|
|
|
|
|
|
|
interruption_patterns = ['...', '[niewyraźnie]', '[nakładanie się głosów]', '(śmiech)', '--'] |
|
interruption_count = sum(text_lower.count(pattern) for pattern in interruption_patterns) |
|
|
|
|
|
word_count = len(transcription.split()) |
|
|
|
|
|
if fgi_score > idi_score * 1.5 and word_count > 1000: |
|
return 'fgi' |
|
elif idi_score > fgi_score * 1.5: |
|
return 'idi' |
|
elif interruption_count > 10 and word_count > 1500: |
|
return 'fgi' |
|
elif word_count < 800: |
|
return 'idi' |
|
else: |
|
return 'unknown' |
|
|
|
def validate_api_key(self) -> bool: |
|
"""Sprawdź czy klucz API działa""" |
|
try: |
|
|
|
models = self.client.models.list() |
|
|
|
|
|
model_names = [model.id for model in models.data] |
|
if 'whisper-1' not in model_names: |
|
st.warning("⚠️ Model whisper-1 nie jest dostępny") |
|
return False |
|
|
|
return True |
|
|
|
except Exception as e: |
|
st.error(f"❌ Nieprawidłowy klucz API: {str(e)}") |
|
return False |
|
|
|
def get_transcription_stats(self) -> Dict: |
|
"""Zwróć statystyki transkrypcji""" |
|
stats = self.transcription_stats.copy() |
|
|
|
|
|
if stats['total_files'] > 0: |
|
stats['success_rate'] = (stats['successful'] / stats['total_files']) * 100 |
|
else: |
|
stats['success_rate'] = 0 |
|
|
|
return stats |
|
|
|
def estimate_transcription_time(self, file_paths: List[str]) -> Dict: |
|
"""Estymuj czas i koszt transkrypcji""" |
|
valid_files = [path for path in file_paths if os.path.exists(path)] |
|
|
|
if not valid_files: |
|
return { |
|
'error': 'Brak prawidłowych plików', |
|
'files_count': 0 |
|
} |
|
|
|
total_size = sum(os.path.getsize(path) for path in valid_files) |
|
total_size_mb = total_size / (1024 * 1024) |
|
|
|
|
|
estimated_duration_minutes = total_size_mb |
|
estimated_api_time = estimated_duration_minutes * 0.1 |
|
estimated_cost = estimated_duration_minutes * 0.006 |
|
|
|
|
|
files_too_large = [] |
|
for path in valid_files: |
|
file_size_mb = os.path.getsize(path) / (1024 * 1024) |
|
if file_size_mb > 25: |
|
files_too_large.append((path, file_size_mb)) |
|
|
|
return { |
|
'total_size_mb': total_size_mb, |
|
'estimated_audio_duration': estimated_duration_minutes, |
|
'estimated_processing_time': estimated_api_time, |
|
'estimated_cost_usd': estimated_cost, |
|
'files_count': len(valid_files), |
|
'files_too_large': files_too_large |
|
} |
|
|
|
|
|
def validate_audio_file(file_path: str) -> Tuple[bool, str]: |
|
"""Sprawdź czy plik audio jest prawidłowy""" |
|
if not os.path.exists(file_path): |
|
return False, "Plik nie istnieje" |
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
file_size_mb = file_size / (1024 * 1024) |
|
|
|
if file_size == 0: |
|
return False, "Plik jest pusty" |
|
|
|
if file_size_mb > 25: |
|
return False, f"Plik za duży: {file_size_mb:.1f}MB > 25MB" |
|
|
|
|
|
valid_extensions = ['.mp3', '.wav', '.mp4', '.m4a', '.aac'] |
|
file_ext = Path(file_path).suffix.lower() |
|
|
|
if file_ext not in valid_extensions: |
|
return False, f"Nieobsługiwane rozszerzenie: {file_ext}" |
|
|
|
return True, "OK" |
|
|
|
def get_file_duration_estimate(file_path: str) -> float: |
|
"""Estymuj długość pliku audio w minutach""" |
|
try: |
|
file_size_mb = os.path.getsize(file_path) / (1024 * 1024) |
|
|
|
return file_size_mb |
|
except: |
|
return 0.0 |
|
|
|
|
|
if __name__ == "__main__": |
|
print("🧪 Test AudioTranscriber") |
|
print("✅ Import OK - wszystkie typy dostępne") |