ultravox-pipeline / tests /integration /test_ultravox_direct.py
Ultravox Pipeline
feat: Reorganização completa dos testes e limpeza do sistema
c2c31f6
raw
history blame
8.94 kB
#!/usr/bin/env python3
"""
Teste direto do serviço Ultravox (Speech-to-Text + LLM)
Testa transcrição e geração de respostas diretamente
"""
import grpc
import asyncio
import time
import sys
import os
from datetime import datetime
# Adicionar paths
sys.path.append('/workspace/ultravox-pipeline/protos/generated')
sys.path.append('/workspace/ultravox-pipeline')
sys.path.append('/workspace/ultravox-pipeline/tests')
sys.path.append('/workspace/ultravox-pipeline/tests/integration')
import speech_pb2
import speech_pb2_grpc
from base_test import BasePipelineTest
from test_questions import TEST_QUESTIONS, validate_response
from colorama import Fore, Style
class UltravoxDirectTest(BasePipelineTest):
"""Teste direto do serviço Ultravox"""
def __init__(self):
# Import here to avoid circular dependencies
from tests.config_helper import test_config
super().__init__(
test_name="Ultravox Direct (STT + LLM)",
service_port=test_config.get_service_port('ultravox'),
service_type="ultravox"
)
self.channel = None
self.stub = None
async def setup_connection(self):
"""Estabelece conexão gRPC com o Ultravox"""
try:
self.channel = grpc.aio.insecure_channel(f'localhost:{self.service_port}')
self.stub = speech_pb2_grpc.SpeechServiceStub(self.channel)
print(f"{Fore.GREEN}✅ Conectado ao Ultravox na porta {self.service_port}{Style.RESET_ALL}")
return True
except Exception as e:
print(f"{Fore.RED}❌ Erro ao conectar ao Ultravox: {e}{Style.RESET_ALL}")
return False
async def test_single_question(self, question_data: dict, index: int) -> dict:
"""Testa uma única pergunta diretamente no Ultravox"""
result = {
"question": question_data["text"],
"success": False,
"metrics": {},
"validation": {},
"transcription": "",
"response": ""
}
try:
# Gerar áudio da pergunta
audio_data = self.create_audio_from_text(question_data["text"])
# Criar requisição
chunk = speech_pb2.AudioChunk()
chunk.audio_data = audio_data
chunk.sample_rate = 16000
chunk.session_id = f"test_ultravox_{index}_{int(time.time())}"
chunk.is_final_chunk = True
# Adicionar prompts opcionais
chunk.system_prompt = "Você é um assistente brasileiro prestativo. Responda de forma clara e direta em português."
chunk.user_prompt = "Responda à pergunta que você ouviu:"
# Processar áudio
start_time = time.time()
first_token_time = None
tokens = []
transcription_detected = False
async for token in self.stub.StreamingRecognize([chunk]):
current_time = time.time() - start_time
if first_token_time is None:
first_token_time = current_time
if token.text:
tokens.append(token.text)
# Verificar se é transcrição ou resposta
if not transcription_detected and "?" in token.text:
transcription_detected = True
result["transcription"] = "".join(tokens)
tokens = [] # Resetar para capturar resposta
# Verificar validação se disponível
if hasattr(token, 'validation') and token.validation:
if token.validation.status != speech_pb2.ValidationResult.ValidationStatus.VALID:
print(f"{Fore.YELLOW}⚠️ Validação: {token.validation.error_message}{Style.RESET_ALL}")
total_time = (time.time() - start_time) * 1000
# Juntar resposta
response_text = "".join(tokens)
# Se não detectou transcrição separada, toda a resposta é considerada
if not transcription_detected:
result["response"] = response_text
else:
result["response"] = response_text
# Métricas
result["metrics"] = {
"total_time_ms": total_time,
"first_token_ms": first_token_time * 1000 if first_token_time else 0,
"response_length": len(result["response"]),
"transcription_length": len(result["transcription"]),
"tokens_received": len(tokens)
}
# Validação da resposta
result["validation"] = validate_response(
result["response"] or result["transcription"],
question_data
)
result["success"] = True
except grpc.RpcError as e:
result["error"] = f"Erro gRPC: {e.details()}"
result["success"] = False
except Exception as e:
result["error"] = str(e)
result["success"] = False
return result
async def test_health_check(self):
"""Testa se o serviço está respondendo"""
print(f"\n{Fore.CYAN}🏥 Testando health check do Ultravox...{Style.RESET_ALL}")
try:
# Enviar áudio vazio como teste
chunk = speech_pb2.AudioChunk()
chunk.audio_data = b'\x00' * 1000 # Áudio silencioso
chunk.sample_rate = 16000
chunk.session_id = "health_check"
chunk.is_final_chunk = True
# Tentar processar
async for token in self.stub.StreamingRecognize([chunk]):
if token:
print(f"{Fore.GREEN}✅ Ultravox está respondendo{Style.RESET_ALL}")
return True
break
print(f"{Fore.GREEN}✅ Ultravox processou requisição{Style.RESET_ALL}")
return True
except Exception as e:
print(f"{Fore.RED}❌ Erro no health check: {e}{Style.RESET_ALL}")
return False
async def test_different_prompts(self):
"""Testa diferentes prompts de sistema"""
print(f"\n{Fore.CYAN}🎯 Testando diferentes prompts...{Style.RESET_ALL}")
prompts = [
("Professor", "Você é um professor de história brasileiro."),
("Médico", "Você é um médico brasileiro."),
("Chef", "Você é um chef de cozinha brasileiro."),
("Padrão", "") # Prompt vazio usa o padrão
]
# Pergunta fixa para comparação
test_audio = self.create_audio_from_text("Qual é a capital do Brasil?")
for name, system_prompt in prompts:
try:
chunk = speech_pb2.AudioChunk()
chunk.audio_data = test_audio
chunk.sample_rate = 16000
chunk.session_id = f"test_prompt_{name}"
chunk.is_final_chunk = True
if system_prompt:
chunk.system_prompt = system_prompt
response = []
async for token in self.stub.StreamingRecognize([chunk]):
if token.text:
response.append(token.text)
if len(response) > 5: # Limitar para teste rápido
break
response_text = "".join(response)[:50] # Primeiros 50 chars
print(f" {name}: {response_text}...")
except Exception as e:
print(f" {name}: ❌ Erro ({e})")
async def main():
"""Executa teste direto do Ultravox"""
print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}")
print(f"{Fore.CYAN}🎯 TESTE DIRETO ULTRAVOX (STT + LLM){Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}")
test = UltravoxDirectTest()
# Conectar ao serviço
if await test.setup_connection():
# Testar health check
if await test.test_health_check():
# Testar diferentes prompts
await test.test_different_prompts()
# Executar todos os testes com as 10 perguntas
await test.run_all_tests()
else:
print(f"{Fore.RED}❌ Abortar testes - Ultravox não está respondendo{Style.RESET_ALL}")
# Fechar conexão
if test.channel:
await test.channel.close()
else:
print(f"{Fore.RED}❌ Não foi possível conectar ao Ultravox{Style.RESET_ALL}")
if __name__ == "__main__":
asyncio.run(main())