Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, Query, HTTPException | |
| from moviepy.editor import VideoFileClip | |
| import tempfile | |
| import requests | |
| import os | |
| import shutil | |
| from groq import Groq | |
| from audio_separator.separator import Separator | |
| from google import genai | |
| from google.genai import types | |
| router = APIRouter() | |
| def download_file(url: str, suffix: str) -> str: | |
| """Download genérico para arquivos de áudio e vídeo""" | |
| print(f"Tentando baixar arquivo de: {url}") | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': '*/*', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| } | |
| try: | |
| response = requests.get(url, headers=headers, stream=True, timeout=30) | |
| print(f"Status da resposta: {response.status_code}") | |
| response.raise_for_status() | |
| except requests.exceptions.RequestException as e: | |
| print(f"Erro na requisição: {e}") | |
| raise HTTPException(status_code=400, detail=f"Não foi possível baixar o arquivo: {str(e)}") | |
| if response.status_code != 200: | |
| raise HTTPException(status_code=400, detail=f"Erro ao baixar arquivo. Status: {response.status_code}") | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
| try: | |
| total_size = 0 | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| tmp.write(chunk) | |
| total_size += len(chunk) | |
| tmp.close() | |
| print(f"Arquivo baixado com sucesso. Tamanho: {total_size} bytes") | |
| return tmp.name | |
| except Exception as e: | |
| tmp.close() | |
| if os.path.exists(tmp.name): | |
| os.unlink(tmp.name) | |
| print(f"Erro ao salvar arquivo: {e}") | |
| raise HTTPException(status_code=400, detail=f"Erro ao salvar arquivo: {str(e)}") | |
| def extract_audio_from_video(video_path: str) -> str: | |
| """Extrai áudio de um arquivo de vídeo e salva como WAV""" | |
| print(f"Extraindo áudio do vídeo: {video_path}") | |
| audio_tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
| audio_path = audio_tmp.name | |
| audio_tmp.close() | |
| try: | |
| video = VideoFileClip(video_path) | |
| audio = video.audio | |
| audio.write_audiofile(audio_path, verbose=False, logger=None) | |
| audio.close() | |
| video.close() | |
| print(f"Áudio extraído com sucesso: {audio_path}") | |
| return audio_path | |
| except Exception as e: | |
| if os.path.exists(audio_path): | |
| os.unlink(audio_path) | |
| print(f"Erro ao extrair áudio: {e}") | |
| raise HTTPException(status_code=500, detail=f"Erro ao extrair áudio do vídeo: {str(e)}") | |
| def separate_vocals(audio_path: str) -> str: | |
| """Separa vocais do áudio usando audio-separator com modelo UVR_MDXNET_KARA_2.onnx""" | |
| print(f"Iniciando separação de vocais do arquivo: {audio_path}") | |
| # Criar diretório temporário para saída | |
| temp_output_dir = tempfile.mkdtemp(prefix="vocal_separation_") | |
| try: | |
| # Inicializar o separador | |
| separator = Separator(output_dir=temp_output_dir) | |
| # Carregar modelo específico para vocais (UVR_MDXNET_KARA_2.onnx é melhor para vocais) | |
| print("Carregando modelo UVR_MDXNET_KARA_2.onnx...") | |
| separator.load_model('UVR_MDXNET_KARA_2.onnx') | |
| # Processar arquivo | |
| print("Processando separação de vocais...") | |
| separator.separate(audio_path) | |
| # Encontrar o arquivo de vocais gerado | |
| # O audio-separator geralmente gera arquivos com sufixos específicos | |
| base_name = os.path.splitext(os.path.basename(audio_path))[0] | |
| # Procurar pelo arquivo de vocais (pode ter diferentes sufixos dependendo do modelo) | |
| possible_vocal_files = [ | |
| f"{base_name}_(Vocals).wav", | |
| f"{base_name}_vocals.wav", | |
| f"{base_name}_Vocals.wav", | |
| f"{base_name}_(Vocal).wav" | |
| ] | |
| vocal_file_path = None | |
| for possible_file in possible_vocal_files: | |
| full_path = os.path.join(temp_output_dir, possible_file) | |
| if os.path.exists(full_path): | |
| vocal_file_path = full_path | |
| break | |
| # Se não encontrou pelos nomes padrão, procurar qualquer arquivo wav no diretório | |
| if not vocal_file_path: | |
| wav_files = [f for f in os.listdir(temp_output_dir) if f.endswith('.wav')] | |
| if wav_files: | |
| # Pegar o primeiro arquivo wav encontrado (assumindo que seja o vocal) | |
| vocal_file_path = os.path.join(temp_output_dir, wav_files[0]) | |
| if not vocal_file_path or not os.path.exists(vocal_file_path): | |
| raise HTTPException(status_code=500, detail="Arquivo de vocais não foi gerado corretamente") | |
| # Mover arquivo de vocais para um local temporário permanente | |
| vocal_temp = tempfile.NamedTemporaryFile(delete=False, suffix="_vocals.wav") | |
| vocal_final_path = vocal_temp.name | |
| vocal_temp.close() | |
| shutil.copy2(vocal_file_path, vocal_final_path) | |
| print(f"Vocais separados com sucesso: {vocal_final_path}") | |
| return vocal_final_path | |
| except Exception as e: | |
| print(f"Erro na separação de vocais: {e}") | |
| raise HTTPException(status_code=500, detail=f"Erro ao separar vocais: {str(e)}") | |
| finally: | |
| # Limpar diretório temporário de separação | |
| if os.path.exists(temp_output_dir): | |
| try: | |
| shutil.rmtree(temp_output_dir) | |
| print(f"Diretório temporário removido: {temp_output_dir}") | |
| except Exception as cleanup_error: | |
| print(f"Erro ao remover diretório temporário: {cleanup_error}") | |
| def format_time(seconds_float: float) -> str: | |
| """Converte segundos para formato de tempo SRT (HH:MM:SS,mmm) - versão melhorada""" | |
| # Calcula segundos totais e milissegundos | |
| total_seconds = int(seconds_float) | |
| milliseconds = int((seconds_float - total_seconds) * 1000) | |
| # Calcula horas, minutos e segundos restantes | |
| hours = total_seconds // 3600 | |
| minutes = (total_seconds % 3600) // 60 | |
| seconds = total_seconds % 60 | |
| return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}" | |
| def json_to_srt(segments_data) -> str: | |
| """ | |
| Converte dados de segmentos para formato SRT | |
| """ | |
| if not segments_data: | |
| return "" | |
| srt_lines = [] | |
| for segment in segments_data: | |
| segment_id = segment.get('id', 0) + 1 | |
| start_time = format_time(segment.get('start', 0.0)) | |
| end_time = format_time(segment.get('end', 0.0)) | |
| text = segment.get('text', '').strip() | |
| if text: # Só adiciona se há texto | |
| srt_lines.append(f"{segment_id}") | |
| srt_lines.append(f"{start_time} --> {end_time}") | |
| srt_lines.append(text) | |
| srt_lines.append("") # Linha em branco | |
| return '\n'.join(srt_lines) | |
| def convert_to_srt(transcription_data) -> str: | |
| """ | |
| Função para conversão usando apenas segments | |
| """ | |
| if hasattr(transcription_data, 'segments') and transcription_data.segments: | |
| return json_to_srt(transcription_data.segments) | |
| else: | |
| return "" | |
| def translate_subtitle_internal(content: str) -> str: | |
| """ | |
| Função interna para traduzir legendas usando Gemini | |
| Baseada na lógica do inference_sub.py | |
| """ | |
| try: | |
| print("Iniciando tradução da legenda...") | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| if not api_key: | |
| raise HTTPException(status_code=500, detail="GEMINI_API_KEY não configurada") | |
| client = genai.Client(api_key=api_key) | |
| model = "gemini-2.5-pro" | |
| # Instruções do sistema aprimoradas | |
| SYSTEM_INSTRUCTIONS = """ | |
| Você é um tradutor profissional de legendas especializado em tradução do inglês para o português brasileiro. | |
| Sua função é traduzir legendas mantendo a formatação SRT original intacta e seguindo os padrões da Netflix. | |
| REGRAS FUNDAMENTAIS: | |
| 1. NUNCA altere os timestamps (00:00:00,000 --> 00:00:00,000) | |
| 2. NUNCA altere os números das legendas (1, 2, 3, etc.) | |
| 3. Mantenha a formatação SRT exata: número, timestamp, texto traduzido, linha em branco | |
| 4. Traduza APENAS o texto das falas | |
| PADRÕES DE TRADUÇÃO: | |
| - Tradução natural para português brasileiro | |
| - Mantenha o tom e registro da fala original (formal/informal, gírias, etc.) | |
| - Preserve nomes próprios, lugares e marcas | |
| - Adapte expressões idiomáticas para equivalentes em português quando necessário | |
| - Use contrações naturais do português brasileiro (você → cê, para → pra, quando apropriado) | |
| FORMATAÇÃO NETFLIX: | |
| - Máximo de 2 linhas por legenda | |
| - Máximo de 42 caracteres por linha (incluindo espaços) | |
| - Use quebra de linha quando o texto for muito longo | |
| - Prefira quebras em pontos naturais da fala (após vírgulas, conjunções, etc.) | |
| - Centralize o texto quando possível | |
| PONTUAÇÃO E ESTILO: | |
| - Use pontuação adequada em português | |
| - Mantenha reticências (...) para hesitações ou falas interrompidas | |
| - Use travessão (–) para diálogos quando necessário | |
| - Evite pontos finais desnecessários em falas curtas | |
| Sempre retorne APENAS o conteúdo das legendas traduzidas, mantendo a formatação SRT original. | |
| """ | |
| # Primeiro exemplo | |
| EXAMPLE_INPUT_1 = """1 | |
| 00:00:00,000 --> 00:00:03,500 | |
| You could argue he'd done it to curry favor with the guards. | |
| 2 | |
| 00:00:04,379 --> 00:00:07,299 | |
| Or maybe make a few friends among us Khans. | |
| 3 | |
| 00:00:08,720 --> 00:00:12,199 | |
| Me, I think he did it just to feel normal again. | |
| 4 | |
| 00:00:13,179 --> 00:00:14,740 | |
| If only for a short while.""" | |
| EXAMPLE_OUTPUT_1 = """1 | |
| 00:00:00,000 --> 00:00:03,500 | |
| Você pode dizer que ele fez isso | |
| para agradar os guardas. | |
| 2 | |
| 00:00:04,379 --> 00:00:07,299 | |
| Ou talvez para fazer alguns amigos | |
| entre nós, os Khans. | |
| 3 | |
| 00:00:08,720 --> 00:00:12,199 | |
| Eu acho que ele fez isso só para se sentir | |
| normal de novo. | |
| 4 | |
| 00:00:13,179 --> 00:00:14,740 | |
| Mesmo que só por um tempo.""" | |
| # Segundo exemplo | |
| EXAMPLE_INPUT_2 = """1 | |
| 00:00:15,420 --> 00:00:18,890 | |
| I'm not saying you're wrong, but have you considered the alternatives? | |
| 2 | |
| 00:00:19,234 --> 00:00:21,567 | |
| What if we just... I don't know... talked to him? | |
| 3 | |
| 00:00:22,890 --> 00:00:26,234 | |
| Listen, Jack, this isn't some Hollywood movie where everything works out. | |
| 4 | |
| 00:00:27,123 --> 00:00:29,456 | |
| Sometimes you gotta make the hard choices.""" | |
| EXAMPLE_OUTPUT_2 = """1 | |
| 00:00:15,420 --> 00:00:18,890 | |
| Não tô dizendo que você tá errado, mas | |
| já pensou nas alternativas? | |
| 2 | |
| 00:00:19,234 --> 00:00:21,567 | |
| E se a gente só... sei lá... | |
| conversasse com ele? | |
| 3 | |
| 00:00:22,890 --> 00:00:26,234 | |
| Escuta, Jack, isso não é um filme de | |
| Hollywood onde tudo dá certo. | |
| 4 | |
| 00:00:27,123 --> 00:00:29,456 | |
| Às vezes você tem que fazer | |
| as escolhas difíceis.""" | |
| # Terceiro exemplo com diálogos | |
| EXAMPLE_INPUT_3 = """1 | |
| 00:00:30,789 --> 00:00:32,456 | |
| - Hey, what's up? | |
| - Not much, just chilling. | |
| 2 | |
| 00:00:33,567 --> 00:00:36,123 | |
| Did you see that new Netflix show everyone's talking about? | |
| 3 | |
| 00:00:37,234 --> 00:00:40,789 | |
| Yeah, it's incredible! The cinematography is absolutely stunning. | |
| 4 | |
| 00:00:41,890 --> 00:00:44,567 | |
| I can't believe they canceled it after just one season though.""" | |
| EXAMPLE_OUTPUT_3 = """1 | |
| 00:00:30,789 --> 00:00:32,456 | |
| – E aí, tudo bem? | |
| – De boa, só relaxando. | |
| 2 | |
| 00:00:33,567 --> 00:00:36,123 | |
| Você viu aquela série nova da Netflix | |
| que todo mundo tá falando? | |
| 3 | |
| 00:00:37,234 --> 00:00:40,789 | |
| Vi, é incrível! A cinematografia | |
| é absolutamente deslumbrante. | |
| 4 | |
| 00:00:41,890 --> 00:00:44,567 | |
| Não acredito que cancelaram depois | |
| de só uma temporada.""" | |
| # Estrutura de conversação correta com múltiplos exemplos | |
| contents = [ | |
| # Primeiro exemplo: usuário envia legenda | |
| types.Content( | |
| role="user", | |
| parts=[ | |
| types.Part.from_text(text=EXAMPLE_INPUT_1) | |
| ] | |
| ), | |
| # Primeiro exemplo: modelo responde com tradução | |
| types.Content( | |
| role="model", | |
| parts=[ | |
| types.Part.from_text(text=EXAMPLE_OUTPUT_1) | |
| ] | |
| ), | |
| # Segundo exemplo: usuário envia outra legenda | |
| types.Content( | |
| role="user", | |
| parts=[ | |
| types.Part.from_text(text=EXAMPLE_INPUT_2) | |
| ] | |
| ), | |
| # Segundo exemplo: modelo responde com tradução | |
| types.Content( | |
| role="model", | |
| parts=[ | |
| types.Part.from_text(text=EXAMPLE_OUTPUT_2) | |
| ] | |
| ), | |
| # Terceiro exemplo: usuário envia legenda com diálogos | |
| types.Content( | |
| role="user", | |
| parts=[ | |
| types.Part.from_text(text=EXAMPLE_INPUT_3) | |
| ] | |
| ), | |
| # Terceiro exemplo: modelo responde com tradução | |
| types.Content( | |
| role="model", | |
| parts=[ | |
| types.Part.from_text(text=EXAMPLE_OUTPUT_3) | |
| ] | |
| ), | |
| # Agora o usuário envia a legenda real para ser traduzida | |
| types.Content( | |
| role="user", | |
| parts=[ | |
| types.Part.from_text(text=content) | |
| ] | |
| ) | |
| ] | |
| config = types.GenerateContentConfig( | |
| system_instruction=SYSTEM_INSTRUCTIONS, | |
| response_mime_type="text/plain", | |
| max_output_tokens=4096, | |
| temperature=0.3, # Menos criatividade, mais precisão na tradução | |
| ) | |
| response_text = "" | |
| for chunk in client.models.generate_content_stream( | |
| model=model, | |
| contents=contents, | |
| config=config | |
| ): | |
| if chunk.text: | |
| response_text += chunk.text | |
| translated_content = response_text.strip() | |
| print("Tradução concluída com sucesso") | |
| return translated_content | |
| except Exception as e: | |
| print(f"Erro na tradução interna: {e}") | |
| # Retorna o conteúdo original se a tradução falhar | |
| return content | |
| def generate_srt_subtitle( | |
| url: str = Query(..., description="URL do arquivo de áudio (.wav) ou vídeo") | |
| ): | |
| """ | |
| Gera legenda em formato SRT a partir de arquivo de áudio ou vídeo | |
| - Se for .wav: separa vocais e transcreve | |
| - Se for vídeo: extrai áudio, separa vocais e transcreve | |
| - Usa modelo UVR_MDXNET_KARA_2.onnx para separação de vocais | |
| - Usa segmentação natural do Whisper (segments) | |
| - Detecção automática de idioma | |
| - Tradução automática sempre ativada | |
| """ | |
| local_file = None | |
| audio_file = None | |
| vocal_file = None | |
| try: | |
| # Determinar tipo de arquivo pela URL | |
| url_lower = url.lower() | |
| is_audio = url_lower.endswith('.wav') | |
| is_video = any(url_lower.endswith(ext) for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']) | |
| if not (is_audio or is_video): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="URL deve ser de um arquivo de áudio (.wav) ou vídeo" | |
| ) | |
| if is_audio: | |
| local_file = download_file(url, ".wav") | |
| audio_file = local_file | |
| else: | |
| local_file = download_file(url, ".mp4") | |
| audio_file = extract_audio_from_video(local_file) | |
| # Separar vocais do áudio | |
| vocal_file = separate_vocals(audio_file) | |
| # Transcrição com configurações fixas otimizadas | |
| api_key = os.getenv("GROQ_API") | |
| if not api_key: | |
| raise HTTPException(status_code=500, detail="GROQ_API key não configurada") | |
| client = Groq(api_key=api_key) | |
| print(f"Iniciando transcrição com modelo: whisper-large-v3") | |
| with open(vocal_file, "rb") as file: | |
| transcription_params = { | |
| "file": (os.path.basename(vocal_file), file.read()), | |
| "model": "whisper-large-v3", | |
| "response_format": "verbose_json", | |
| "timestamp_granularities": ["segment"], | |
| "temperature": 0.0, | |
| # language é automaticamente detectado (não enviado) | |
| } | |
| transcription = client.audio.transcriptions.create(**transcription_params) | |
| # Converter para SRT usando segments | |
| srt_content_original = convert_to_srt(transcription) | |
| # Traduzir sempre | |
| srt_content = translate_subtitle_internal(srt_content_original) if srt_content_original else None | |
| return { | |
| "srt": srt_content, | |
| "duration": getattr(transcription, 'duration', 0), | |
| "language": getattr(transcription, 'language', 'unknown'), | |
| "model_used": "whisper-large-v3", | |
| "processing_method": "segments", | |
| "vocal_separation": "UVR_MDXNET_KARA_2.onnx", | |
| "translation_applied": True, | |
| "segment_count": len(transcription.segments) if hasattr(transcription, 'segments') and transcription.segments else 0, | |
| "subtitle_count": len([line for line in srt_content.split('\n') if line.strip().isdigit()]) if srt_content else 0 | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Erro inesperado: {str(e)}") | |
| finally: | |
| # Limpeza de arquivos temporários | |
| for temp_file in [local_file, audio_file, vocal_file]: | |
| if temp_file and os.path.exists(temp_file): | |
| try: | |
| os.unlink(temp_file) | |
| print(f"Arquivo temporário removido: {temp_file}") | |
| except Exception as cleanup_error: | |
| print(f"Erro ao remover arquivo temporário {temp_file}: {cleanup_error}") |