Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	File size: 4,339 Bytes
			
			| 53ea588 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | # SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License
"""Riva neural machine translation (NMT) bot.
This bot enables speech-to-speech translation using Riva ASR, NMT and TTS services
with voice activity detection.
"""
import os
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.services.nim import NimLLMService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from nvidia_pipecat.pipeline.ace_pipeline_runner import ACEPipelineRunner, PipelineMetadata
from nvidia_pipecat.services.riva_nmt import RivaNMTService
from nvidia_pipecat.services.riva_speech import (
    RivaASRService,
    RivaTTSService,
)
from nvidia_pipecat.transports.network.ace_fastapi_websocket import (
    ACETransport,
    ACETransportParams,
)
from nvidia_pipecat.transports.services.ace_controller.routers.websocket_router import router as websocket_router
from nvidia_pipecat.utils.logging import setup_default_ace_logging
load_dotenv(override=True)
setup_default_ace_logging(level="INFO")
async def create_pipeline_task(pipeline_metadata: PipelineMetadata):
    """Create the pipeline to be run.
    Args:
        pipeline_metadata (PipelineMetadata): Metadata containing websocket and other pipeline configuration.
    Returns:
        PipelineTask: The configured pipeline task for handling speech-to-speech translation.
    """
    transport = ACETransport(
        websocket=pipeline_metadata.websocket,
        params=ACETransportParams(
            vad_analyzer=SileroVADAnalyzer(),
        ),
    )
    llm = NimLLMService(
        api_key=os.getenv("NVIDIA_API_KEY"),
        model="nvdev/meta/llama-3.1-8b-instruct",
    )
    # Please update the stt and tts language, voice id as needed
    # tts voice id as per the language can be selected from https://docs.nvidia.com/deeplearning/riva/user-guide/docs/tts/tts-overview.html
    language = Language.ES_US
    voice_id = "English-US.Female-1"
    nmt1 = RivaNMTService(source_language=language, target_language=Language.EN_US)
    nmt2 = RivaNMTService(source_language=Language.EN_US, target_language=language)
    stt = RivaASRService(
        server="localhost:50051",
        api_key=os.getenv("NVIDIA_API_KEY"),
        language=language,
        sample_rate=16000,
        model="parakeet-1.1b-en-US-asr-streaming-silero-vad-asr-bls-ensemble",
    )
    tts = RivaTTSService(
        server="localhost:50051",
        api_key=os.getenv("NVIDIA_API_KEY"),
        voice_id=voice_id,
        language=language,
        zero_shot_quality=20,
        sample_rate=16000,
        model="fastpitch-hifigan-tts",
    )
    sentence_aggregator = SentenceAggregator()
    pipeline = Pipeline(
        [
            transport.input(),
            stt,
            nmt1,
            llm,
            sentence_aggregator,
            nmt2,
            tts,
            transport.output(),
        ]
    )
    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            allow_interruptions=True,
            enable_metrics=True,
            enable_usage_metrics=True,
            send_initial_empty_metrics=True,
            report_only_initial_ttfb=True,
            start_metadata={"stream_id": pipeline_metadata.stream_id},
        ),
    )
    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        # Kick off the conversation.
        await task.queue_frames([TranscriptionFrame("Contar una historia.", "", time_now_iso8601)])
    return task
app = FastAPI()
app.include_router(websocket_router)
runner = ACEPipelineRunner.create_instance(pipeline_callback=create_pipeline_task)
app.mount("/static", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "../static")), name="static")
if __name__ == "__main__":
    uvicorn.run("bot:app", host="0.0.0.0", port=8100, workers=1)
 | 
