# file_handler.py - Poprawiony handler plików dla Whisper API import os import tempfile import math from io import BytesIO from typing import List, Dict, Tuple, Union import streamlit as st try: from pydub import AudioSegment PYDUB_AVAILABLE = True except ImportError: PYDUB_AVAILABLE = False st.warning("⚠️ Pydub nie jest dostępny. Funkcje kompresji ograniczone.") try: import librosa import soundfile as sf LIBROSA_AVAILABLE = True except ImportError: LIBROSA_AVAILABLE = False from config import FILE_PROCESSING, USER_MESSAGES class FileHandler: """Klasa do obsługi plików audio/video - zoptymalizowana dla Whisper API (max 25MB)""" def __init__(self): self.temp_files = [] self.processing_stats = {} # Whisper API limits self.WHISPER_MAX_SIZE_MB = 25 self.SAFE_CHUNK_SIZE_MB = 20 # Bezpieczny rozmiar chunka def process_file(self, uploaded_file, max_chunk_size_mb: int = 20, auto_compress: bool = True) -> List[str]: """ Główna funkcja przetwarzania pliku dla Whisper API Returns: Lista ścieżek do plików gotowych do transkrypcji (każdy <25MB) """ try: file_size_mb = uploaded_file.size / (1024 * 1024) st.info(f"🔄 Przetwarzam {uploaded_file.name} ({file_size_mb:.1f}MB)") # Sprawdź czy plik mieści się w limicie Whisper if file_size_mb <= self.WHISPER_MAX_SIZE_MB: # Plik OK - zapisz bezpośrednio temp_path = self._save_temp_file(uploaded_file) if temp_path: st.success(f"✅ Plik gotowy do transkrypcji ({file_size_mb:.1f}MB)") return [temp_path] else: return [] # Plik za duży - wymaga przetwarzania if file_size_mb > 100: st.error(f"❌ Plik zbyt duży ({file_size_mb:.1f}MB). Maksymalnie 100MB.") return [] # Strategia 1: Kompresja if auto_compress and file_size_mb > self.WHISPER_MAX_SIZE_MB: compressed_file = self._compress_audio_for_whisper(uploaded_file) if compressed_file: compressed_size_mb = len(compressed_file.getvalue()) / (1024 * 1024) if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB: temp_path = self._save_bytesio_to_temp(compressed_file, uploaded_file.name) if temp_path: st.success(f"✅ Skompresowano: {file_size_mb:.1f}MB → {compressed_size_mb:.1f}MB") return [temp_path] # Strategia 2: Podział na części return self._split_audio_for_whisper(uploaded_file, max_chunk_size_mb) except Exception as e: st.error(f"❌ Błąd przetwarzania {uploaded_file.name}: {str(e)}") return [] def _compress_audio_for_whisper(self, uploaded_file) -> Union[BytesIO, None]: """Agresywna kompresja audio dla Whisper API""" if not PYDUB_AVAILABLE: st.warning("Pydub niedostępny - pomijam kompresję") return None try: st.info("🗜️ Kompresuję audio...") # Załaduj audio audio_data = uploaded_file.read() uploaded_file.seek(0) # Reset dla dalszego użycia audio = AudioSegment.from_file(BytesIO(audio_data)) # Agresywna kompresja dla Whisper (jakość mowy) compressed = audio.set_channels(1) # Mono compressed = compressed.set_frame_rate(16000) # 16kHz (wystarczy dla mowy) # Jeszcze więcej kompresji jeśli potrzeba original_size_mb = uploaded_file.size / (1024 * 1024) if original_size_mb > 50: # Bardzo duży plik - maksymalna kompresja bitrate = "32k" elif original_size_mb > 35: # Duży plik - silna kompresja bitrate = "48k" else: # Średni plik - umiarkowana kompresja bitrate = "64k" # Export do BytesIO output = BytesIO() compressed.export( output, format="mp3", bitrate=bitrate, parameters=["-ac", "1", "-ar", "16000"] ) output.seek(0) # Sprawdź rozmiar wyniku compressed_size_mb = len(output.getvalue()) / (1024 * 1024) if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB: return output else: st.warning(f"⚠️ Kompresja niewystarczająca ({compressed_size_mb:.1f}MB). Przechodzę do dzielenia.") return None except Exception as e: st.warning(f"Kompresja nieudana: {str(e)}") return None def _split_audio_for_whisper(self, uploaded_file, max_chunk_size_mb: int) -> List[str]: """Dzieli plik audio na części <25MB dla Whisper""" try: if not PYDUB_AVAILABLE: st.error("❌ Pydub wymagany do dzielenia plików. Zainstaluj: pip install pydub") return [] st.info("✂️ Dzielę plik na części...") # Załaduj audio audio_data = uploaded_file.read() audio = AudioSegment.from_file(BytesIO(audio_data)) # Oblicz parametry dzielenia total_duration_ms = len(audio) file_size_mb = uploaded_file.size / (1024 * 1024) # Bezpieczny rozmiar chunka (mniejszy niż limit Whisper) safe_chunk_size_mb = min(max_chunk_size_mb, self.SAFE_CHUNK_SIZE_MB) # Estymacja liczby części estimated_parts = math.ceil(file_size_mb / safe_chunk_size_mb) chunk_duration_ms = total_duration_ms // estimated_parts # Overlap między częściami (10 sekund) overlap_ms = 10 * 1000 st.info(f"📂 Dzielę na {estimated_parts} części (~{chunk_duration_ms//60000:.1f} min każda)") parts = [] base_name = os.path.splitext(uploaded_file.name)[0] for i in range(estimated_parts): start_ms = max(0, i * chunk_duration_ms - (overlap_ms if i > 0 else 0)) end_ms = min(total_duration_ms, (i + 1) * chunk_duration_ms + overlap_ms) # Wytnij część chunk = audio[start_ms:end_ms] # Lekka kompresja części chunk = chunk.set_channels(1) # Mono chunk = chunk.set_frame_rate(22050) # Dobra jakość ale kompaktowa # Zapisz do pliku tymczasowego temp_fd, temp_path = tempfile.mkstemp( suffix=f"_part{i+1:02d}.mp3", prefix=f"{base_name}_" ) os.close(temp_fd) chunk.export(temp_path, format="mp3", bitrate="96k") # Sprawdź rozmiar części part_size_mb = os.path.getsize(temp_path) / (1024 * 1024) if part_size_mb > self.WHISPER_MAX_SIZE_MB: st.error(f"❌ Część {i+1} nadal za duża ({part_size_mb:.1f}MB)") os.remove(temp_path) continue parts.append(temp_path) self.temp_files.append(temp_path) st.success(f"✅ Część {i+1}/{estimated_parts}: {part_size_mb:.1f}MB, {(end_ms-start_ms)//60000:.1f} min") if not parts: st.error("❌ Nie udało się utworzyć żadnej prawidłowej części") return parts except Exception as e: st.error(f"❌ Błąd dzielenia pliku: {str(e)}") return [] def _save_temp_file(self, uploaded_file) -> str: """Zapisuje uploaded file do pliku tymczasowego""" try: suffix = f".{uploaded_file.name.split('.')[-1]}" temp_fd, temp_path = tempfile.mkstemp(suffix=suffix) # Zapisz dane with os.fdopen(temp_fd, 'wb') as tmp_file: content = uploaded_file.read() tmp_file.write(content) # Reset pozycji dla dalszego użycia uploaded_file.seek(0) self.temp_files.append(temp_path) return temp_path except Exception as e: st.error(f"❌ Błąd zapisu tymczasowego: {str(e)}") return "" def _save_bytesio_to_temp(self, bytes_io: BytesIO, original_name: str) -> str: """Zapisz BytesIO do pliku tymczasowego""" try: suffix = f"_compressed.mp3" base_name = os.path.splitext(original_name)[0] temp_fd, temp_path = tempfile.mkstemp( suffix=suffix, prefix=f"{base_name}_" ) with os.fdopen(temp_fd, 'wb') as tmp_file: tmp_file.write(bytes_io.getvalue()) self.temp_files.append(temp_path) return temp_path except Exception as e: st.error(f"❌ Błąd zapisu skompresowanego: {str(e)}") return "" def validate_file_for_whisper(self, uploaded_file) -> Tuple[bool, str]: """Walidacja pliku dla Whisper API""" try: # Sprawdź rozmiar file_size_mb = uploaded_file.size / (1024 * 1024) if file_size_mb == 0: return False, "Plik jest pusty" if file_size_mb > 100: # Rozumny limit dla przetwarzania return False, f"Plik za duży: {file_size_mb:.1f}MB > 100MB" # Sprawdź rozszerzenie file_ext = uploaded_file.name.split('.')[-1].lower() supported_formats = ['mp3', 'wav', 'mp4', 'm4a', 'aac', 'mov', 'avi'] if file_ext not in supported_formats: return False, f"Nieobsługiwany format: .{file_ext}" # Ostrzeżenie dla dużych plików if file_size_mb > self.WHISPER_MAX_SIZE_MB: return True, f"Plik wymaga przetwarzania ({file_size_mb:.1f}MB > {self.WHISPER_MAX_SIZE_MB}MB)" return True, "OK" except Exception as e: return False, f"Błąd walidacji: {str(e)}" def get_audio_duration(self, file_path: str) -> float: """Pobierz długość pliku audio w sekundach""" try: if LIBROSA_AVAILABLE: duration = librosa.get_duration(filename=file_path) return duration elif PYDUB_AVAILABLE: audio = AudioSegment.from_file(file_path) return len(audio) / 1000.0 else: # Fallback - estymacja na podstawie rozmiaru file_size = os.path.getsize(file_path) return file_size / (1024 * 1024) * 60 except: file_size = os.path.getsize(file_path) return file_size / (1024 * 1024) * 60 def estimate_processing_time(self, uploaded_files: List) -> Dict: """Estymuj czas przetwarzania""" total_size_mb = sum(f.size for f in uploaded_files) / (1024 * 1024) total_duration_est = total_size_mb * 60 # Czas przetwarzania (kompresja/dzielenie) processing_time = 0 for f in uploaded_files: file_size_mb = f.size / (1024 * 1024) if file_size_mb > self.WHISPER_MAX_SIZE_MB: processing_time += file_size_mb * 2 # ~2s per MB for processing # Estymacja czasu transkrypcji (Whisper ~1:10 ratio) transcription_time = total_duration_est * 0.1 # Estymacja czasu generowania raportu report_time = len(uploaded_files) * 30 return { 'total_size_mb': total_size_mb, 'estimated_audio_duration': total_duration_est, 'estimated_processing_time': processing_time, 'estimated_transcription_time': transcription_time, 'estimated_report_time': report_time, 'total_estimated_time': processing_time + transcription_time + report_time, 'files_needing_processing': sum(1 for f in uploaded_files if f.size / (1024 * 1024) > self.WHISPER_MAX_SIZE_MB) } def get_file_info(self, uploaded_file) -> Dict: """Pobierz szczegółowe informacje o pliku""" file_size_mb = uploaded_file.size / (1024 * 1024) file_ext = uploaded_file.name.split('.')[-1].lower() return { 'name': uploaded_file.name, 'size_mb': file_size_mb, 'format': file_ext, 'whisper_ready': file_size_mb <= self.WHISPER_MAX_SIZE_MB, 'needs_compression': file_size_mb > self.WHISPER_MAX_SIZE_MB and file_size_mb <= 50, 'needs_splitting': file_size_mb > 50, 'too_large': file_size_mb > 100, 'estimated_duration': file_size_mb * 60, 'estimated_processing_time': max(0, file_size_mb - self.WHISPER_MAX_SIZE_MB) * 2 } def cleanup_temp_files(self): """Wyczyść pliki tymczasowe""" cleaned = 0 errors = 0 for temp_file in self.temp_files: try: if os.path.exists(temp_file): os.remove(temp_file) cleaned += 1 except Exception as e: errors += 1 st.warning(f"Nie można usunąć {temp_file}: {e}") self.temp_files = [] if cleaned > 0: st.success(f"🧹 Wyczyszczono {cleaned} plików tymczasowych") if errors > 0: st.warning(f"⚠️ {errors} plików nie udało się usunąć") def get_processing_stats(self) -> Dict: """Zwróć statystyki przetwarzania""" return { 'temp_files_count': len(self.temp_files), 'whisper_max_size_mb': self.WHISPER_MAX_SIZE_MB, 'safe_chunk_size_mb': self.SAFE_CHUNK_SIZE_MB, 'processing_stats': self.processing_stats, 'libraries_available': { 'pydub': PYDUB_AVAILABLE, 'librosa': LIBROSA_AVAILABLE } } def analyze_upload_batch(self, uploaded_files: List) -> Dict: """Analizuj całą paczkę plików""" analysis = { 'total_files': len(uploaded_files), 'total_size_mb': 0, 'whisper_ready': 0, 'need_compression': 0, 'need_splitting': 0, 'too_large': 0, 'estimated_parts': 0, 'file_details': [] } for file in uploaded_files: info = self.get_file_info(file) analysis['file_details'].append(info) analysis['total_size_mb'] += info['size_mb'] if info['whisper_ready']: analysis['whisper_ready'] += 1 elif info['needs_compression']: analysis['need_compression'] += 1 elif info['needs_splitting']: analysis['need_splitting'] += 1 # Estymacja liczby części parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB) analysis['estimated_parts'] += parts elif info['too_large']: analysis['too_large'] += 1 return analysis def create_processing_plan(self, uploaded_files: List) -> str: """Stwórz plan przetwarzania dla użytkownika""" analysis = self.analyze_upload_batch(uploaded_files) plan = f""" 📋 **PLAN PRZETWARZANIA** 📊 **Podsumowanie:** - Plików: {analysis['total_files']} ({analysis['total_size_mb']:.1f}MB) - Gotowych do transkrypcji: {analysis['whisper_ready']} - Wymagających kompresji: {analysis['need_compression']} - Wymagających dzielenia: {analysis['need_splitting']} - Za dużych: {analysis['too_large']} """ if analysis['estimated_parts'] > 0: plan += f"- Szacowana liczba części: {analysis['estimated_parts']}\n" if analysis['too_large'] > 0: plan += f"\n❌ **PLIKI ZA DUŻE (>100MB):**\n" for info in analysis['file_details']: if info['too_large']: plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n" if analysis['need_splitting'] > 0: plan += f"\n✂️ **PLIKI DO PODZIELENIA:**\n" for info in analysis['file_details']: if info['needs_splitting']: parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB) plan += f"- {info['name']}: {info['size_mb']:.1f}MB → ~{parts} części\n" if analysis['need_compression'] > 0: plan += f"\n🗜️ **PLIKI DO KOMPRESJI:**\n" for info in analysis['file_details']: if info['needs_compression']: plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n" # Estymacja czasów times = self.estimate_processing_time(uploaded_files) plan += f""" ⏱️ **ESTYMACJA CZASÓW:** - Przetwarzanie plików: ~{times['estimated_processing_time']:.1f}s - Transkrypcja: ~{times['estimated_transcription_time']:.1f}s - Generowanie raportu: ~{times['estimated_report_time']:.1f}s - **ŁĄCZNIE: ~{times['total_estimated_time']:.1f}s ({times['total_estimated_time']/60:.1f} min)** """ return plan # Funkcje pomocnicze def check_file_size_for_whisper(file_path: str) -> Tuple[bool, float]: """Sprawdź czy plik mieści się w limicie Whisper""" try: size_mb = os.path.getsize(file_path) / (1024 * 1024) return size_mb <= 25, size_mb except: return False, 0 def estimate_compression_ratio(file_ext: str) -> float: """Estymuj współczynnik kompresji dla różnych formatów""" ratios = { 'wav': 0.1, # WAV kompresuje się bardzo dobrze 'aac': 0.7, # AAC już skompresowany 'mp3': 0.8, # MP3 już skompresowany 'm4a': 0.7, # M4A już skompresowany 'mp4': 0.5, # Video można mocno skompresować audio 'mov': 0.5, 'avi': 0.4 } return ratios.get(file_ext.lower(), 0.6) # Test modułu if __name__ == "__main__": print("🧪 Test FileHandler") handler = FileHandler() stats = handler.get_processing_stats() print(f"📊 Biblioteki: {stats['libraries_available']}") print(f"🎯 Limit Whisper: {stats['whisper_max_size_mb']}MB") print(f"🔒 Bezpieczny chunk: {stats['safe_chunk_size_mb']}MB") print("✅ FileHandler gotowy do użycia")