|
|
|
|
|
""" |
|
|
Teste direto do serviço Ultravox (Speech-to-Text + LLM) |
|
|
Testa transcrição e geração de respostas diretamente |
|
|
""" |
|
|
|
|
|
import grpc |
|
|
import asyncio |
|
|
import time |
|
|
import sys |
|
|
import os |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
sys.path.append('/workspace/ultravox-pipeline/protos/generated') |
|
|
sys.path.append('/workspace/ultravox-pipeline') |
|
|
sys.path.append('/workspace/ultravox-pipeline/tests') |
|
|
sys.path.append('/workspace/ultravox-pipeline/tests/integration') |
|
|
|
|
|
import speech_pb2 |
|
|
import speech_pb2_grpc |
|
|
from base_test import BasePipelineTest |
|
|
from test_questions import TEST_QUESTIONS, validate_response |
|
|
from colorama import Fore, Style |
|
|
|
|
|
class UltravoxDirectTest(BasePipelineTest): |
|
|
"""Teste direto do serviço Ultravox""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
from tests.config_helper import test_config |
|
|
|
|
|
super().__init__( |
|
|
test_name="Ultravox Direct (STT + LLM)", |
|
|
service_port=test_config.get_service_port('ultravox'), |
|
|
service_type="ultravox" |
|
|
) |
|
|
self.channel = None |
|
|
self.stub = None |
|
|
|
|
|
async def setup_connection(self): |
|
|
"""Estabelece conexão gRPC com o Ultravox""" |
|
|
try: |
|
|
self.channel = grpc.aio.insecure_channel(f'localhost:{self.service_port}') |
|
|
self.stub = speech_pb2_grpc.SpeechServiceStub(self.channel) |
|
|
print(f"{Fore.GREEN}✅ Conectado ao Ultravox na porta {self.service_port}{Style.RESET_ALL}") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"{Fore.RED}❌ Erro ao conectar ao Ultravox: {e}{Style.RESET_ALL}") |
|
|
return False |
|
|
|
|
|
async def test_single_question(self, question_data: dict, index: int) -> dict: |
|
|
"""Testa uma única pergunta diretamente no Ultravox""" |
|
|
result = { |
|
|
"question": question_data["text"], |
|
|
"success": False, |
|
|
"metrics": {}, |
|
|
"validation": {}, |
|
|
"transcription": "", |
|
|
"response": "" |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
audio_data = self.create_audio_from_text(question_data["text"]) |
|
|
|
|
|
|
|
|
chunk = speech_pb2.AudioChunk() |
|
|
chunk.audio_data = audio_data |
|
|
chunk.sample_rate = 16000 |
|
|
chunk.session_id = f"test_ultravox_{index}_{int(time.time())}" |
|
|
chunk.is_final_chunk = True |
|
|
|
|
|
|
|
|
chunk.system_prompt = "Você é um assistente brasileiro prestativo. Responda de forma clara e direta em português." |
|
|
chunk.user_prompt = "Responda à pergunta que você ouviu:" |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
first_token_time = None |
|
|
tokens = [] |
|
|
transcription_detected = False |
|
|
|
|
|
async for token in self.stub.StreamingRecognize([chunk]): |
|
|
current_time = time.time() - start_time |
|
|
|
|
|
if first_token_time is None: |
|
|
first_token_time = current_time |
|
|
|
|
|
if token.text: |
|
|
tokens.append(token.text) |
|
|
|
|
|
|
|
|
if not transcription_detected and "?" in token.text: |
|
|
transcription_detected = True |
|
|
result["transcription"] = "".join(tokens) |
|
|
tokens = [] |
|
|
|
|
|
|
|
|
if hasattr(token, 'validation') and token.validation: |
|
|
if token.validation.status != speech_pb2.ValidationResult.ValidationStatus.VALID: |
|
|
print(f"{Fore.YELLOW}⚠️ Validação: {token.validation.error_message}{Style.RESET_ALL}") |
|
|
|
|
|
total_time = (time.time() - start_time) * 1000 |
|
|
|
|
|
|
|
|
response_text = "".join(tokens) |
|
|
|
|
|
|
|
|
if not transcription_detected: |
|
|
result["response"] = response_text |
|
|
else: |
|
|
result["response"] = response_text |
|
|
|
|
|
|
|
|
result["metrics"] = { |
|
|
"total_time_ms": total_time, |
|
|
"first_token_ms": first_token_time * 1000 if first_token_time else 0, |
|
|
"response_length": len(result["response"]), |
|
|
"transcription_length": len(result["transcription"]), |
|
|
"tokens_received": len(tokens) |
|
|
} |
|
|
|
|
|
|
|
|
result["validation"] = validate_response( |
|
|
result["response"] or result["transcription"], |
|
|
question_data |
|
|
) |
|
|
|
|
|
result["success"] = True |
|
|
|
|
|
except grpc.RpcError as e: |
|
|
result["error"] = f"Erro gRPC: {e.details()}" |
|
|
result["success"] = False |
|
|
except Exception as e: |
|
|
result["error"] = str(e) |
|
|
result["success"] = False |
|
|
|
|
|
return result |
|
|
|
|
|
async def test_health_check(self): |
|
|
"""Testa se o serviço está respondendo""" |
|
|
print(f"\n{Fore.CYAN}🏥 Testando health check do Ultravox...{Style.RESET_ALL}") |
|
|
|
|
|
try: |
|
|
|
|
|
chunk = speech_pb2.AudioChunk() |
|
|
chunk.audio_data = b'\x00' * 1000 |
|
|
chunk.sample_rate = 16000 |
|
|
chunk.session_id = "health_check" |
|
|
chunk.is_final_chunk = True |
|
|
|
|
|
|
|
|
async for token in self.stub.StreamingRecognize([chunk]): |
|
|
if token: |
|
|
print(f"{Fore.GREEN}✅ Ultravox está respondendo{Style.RESET_ALL}") |
|
|
return True |
|
|
break |
|
|
|
|
|
print(f"{Fore.GREEN}✅ Ultravox processou requisição{Style.RESET_ALL}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"{Fore.RED}❌ Erro no health check: {e}{Style.RESET_ALL}") |
|
|
return False |
|
|
|
|
|
async def test_different_prompts(self): |
|
|
"""Testa diferentes prompts de sistema""" |
|
|
print(f"\n{Fore.CYAN}🎯 Testando diferentes prompts...{Style.RESET_ALL}") |
|
|
|
|
|
prompts = [ |
|
|
("Professor", "Você é um professor de história brasileiro."), |
|
|
("Médico", "Você é um médico brasileiro."), |
|
|
("Chef", "Você é um chef de cozinha brasileiro."), |
|
|
("Padrão", "") |
|
|
] |
|
|
|
|
|
|
|
|
test_audio = self.create_audio_from_text("Qual é a capital do Brasil?") |
|
|
|
|
|
for name, system_prompt in prompts: |
|
|
try: |
|
|
chunk = speech_pb2.AudioChunk() |
|
|
chunk.audio_data = test_audio |
|
|
chunk.sample_rate = 16000 |
|
|
chunk.session_id = f"test_prompt_{name}" |
|
|
chunk.is_final_chunk = True |
|
|
|
|
|
if system_prompt: |
|
|
chunk.system_prompt = system_prompt |
|
|
|
|
|
response = [] |
|
|
async for token in self.stub.StreamingRecognize([chunk]): |
|
|
if token.text: |
|
|
response.append(token.text) |
|
|
if len(response) > 5: |
|
|
break |
|
|
|
|
|
response_text = "".join(response)[:50] |
|
|
print(f" {name}: {response_text}...") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" {name}: ❌ Erro ({e})") |
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Executa teste direto do Ultravox""" |
|
|
print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}") |
|
|
print(f"{Fore.CYAN}🎯 TESTE DIRETO ULTRAVOX (STT + LLM){Style.RESET_ALL}") |
|
|
print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}") |
|
|
|
|
|
test = UltravoxDirectTest() |
|
|
|
|
|
|
|
|
if await test.setup_connection(): |
|
|
|
|
|
if await test.test_health_check(): |
|
|
|
|
|
await test.test_different_prompts() |
|
|
|
|
|
|
|
|
await test.run_all_tests() |
|
|
else: |
|
|
print(f"{Fore.RED}❌ Abortar testes - Ultravox não está respondendo{Style.RESET_ALL}") |
|
|
|
|
|
|
|
|
if test.channel: |
|
|
await test.channel.close() |
|
|
else: |
|
|
print(f"{Fore.RED}❌ Não foi possível conectar ao Ultravox{Style.RESET_ALL}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |