import os from typing import Union import gradio as gr import numpy as np import torch import torchaudio from seamless_communication.models.inference.translator import Translator from lang_list import ( LANGUAGE_NAME_TO_CODE, S2ST_TARGET_LANGUAGE_NAMES, S2TT_TARGET_LANGUAGE_NAMES, T2TT_TARGET_LANGUAGE_NAMES, TEXT_SOURCE_LANGUAGE_NAMES, ) DESCRIPTION = """# SeamlessM4T [SeamlessM4T](https://github.com/facebookresearch/seamless_communication) is designed to provide high-quality translation, allowing people from different linguistic communities to communicate effortlessly through speech and text. This unified model enables multiple tasks like Speech-to-Speech (S2ST), Speech-to-Text (S2TT), Text-to-Speech (T2ST) translation and more, without relying on multiple separate models. """ CACHE_EXAMPLES = os.getenv("CACHE_EXAMPLES") == "1" TASK_NAMES = [ "S2ST (Speech to Speech translation)", "S2TT (Speech to Text translation)", "T2ST (Text to Speech translation)", "T2TT (Text to Text translation)", "ASR (Automatic Speech Recognition)", ] AUDIO_SAMPLE_RATE = 16000.0 MAX_INPUT_AUDIO_LENGTH = 60 # in seconds DEFAULT_TARGET_LANGUAGE = "French" device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") translator = Translator( model_name_or_card="multitask_unity_large", vocoder_name_or_card="vocoder_36langs", device=device, sample_rate=AUDIO_SAMPLE_RATE, ) def predict( task_name: str, audio_source: str, input_audio_mic: Union[str, None], input_audio_file: Union[str, None], input_text: Union[str, None], source_language: Union[str, None], target_language: str, ) -> tuple[Union[tuple[int, np.ndarray], None], str]: task_name = task_name.split()[0] source_language_code = LANGUAGE_NAME_TO_CODE.get(source_language, None) target_language_code = LANGUAGE_NAME_TO_CODE[target_language] if task_name in ["S2ST", "S2TT", "ASR"]: if audio_source == "microphone": input_data = input_audio_mic else: input_data = input_audio_file arr, org_sr = torchaudio.load(input_data) new_arr = torchaudio.functional.resample(arr, orig_freq=org_sr, new_freq=AUDIO_SAMPLE_RATE) max_length = int(MAX_INPUT_AUDIO_LENGTH * AUDIO_SAMPLE_RATE) if new_arr.shape[1] > max_length: new_arr = new_arr[:, :max_length] gr.Warning(f"Input audio is too long. Only the first {MAX_INPUT_AUDIO_LENGTH} seconds is used.") torchaudio.save(input_data, new_arr, sample_rate=int(AUDIO_SAMPLE_RATE)) else: input_data = input_text text_out, wav, sr = translator.predict( input=input_data, task_str=task_name, tgt_lang=target_language_code, src_lang=source_language_code, ) if task_name in ["S2ST", "T2ST"]: return (sr, wav.cpu().detach().numpy()), text_out else: return None, text_out def process_s2st_example(input_audio_file: str, target_language: str) -> tuple[str, str]: return predict( task_name="S2ST", audio_source="file", input_audio_mic=None, input_audio_file=input_audio_file, input_text=None, source_language=None, target_language=target_language, ) def process_s2tt_example(input_audio_file: str, target_language: str) -> tuple[str, str]: return predict( task_name="S2TT", audio_source="file", input_audio_mic=None, input_audio_file=input_audio_file, input_text=None, source_language=None, target_language=target_language, ) def process_t2st_example(input_text: str, source_language: str, target_language: str) -> tuple[str, str]: return predict( task_name="T2ST", audio_source="", input_audio_mic=None, input_audio_file=None, input_text=input_text, source_language=source_language, target_language=target_language, ) def process_t2tt_example(input_text: str, source_language: str, target_language: str) -> tuple[str, str]: return predict( task_name="T2TT", audio_source="", input_audio_mic=None, input_audio_file=None, input_text=input_text, source_language=source_language, target_language=target_language, ) def process_asr_example(input_audio_file: str, target_language: str) -> tuple[str, str]: return predict( task_name="ASR", audio_source="file", input_audio_mic=None, input_audio_file=input_audio_file, input_text=None, source_language=None, target_language=target_language, ) def update_audio_ui(audio_source: str) -> tuple[dict, dict]: mic = audio_source == "microphone" return ( gr.update(visible=mic, value=None), # input_audio_mic gr.update(visible=not mic, value=None), # input_audio_file ) def update_input_ui(task_name: str) -> tuple[dict, dict, dict, dict]: task_name = task_name.split()[0] if task_name == "S2ST": return ( gr.update(visible=True), # audio_box gr.update(visible=False), # input_text gr.update(visible=False), # source_language gr.update( visible=True, choices=S2ST_TARGET_LANGUAGE_NAMES, value=DEFAULT_TARGET_LANGUAGE ), # target_language ) elif task_name == "S2TT": return ( gr.update(visible=True), # audio_box gr.update(visible=False), # input_text gr.update(visible=False), # source_language gr.update( visible=True, choices=S2TT_TARGET_LANGUAGE_NAMES, value=DEFAULT_TARGET_LANGUAGE ), # target_language ) elif task_name == "T2ST": return ( gr.update(visible=False), # audio_box gr.update(visible=True), # input_text gr.update(visible=True), # source_language gr.update( visible=True, choices=S2ST_TARGET_LANGUAGE_NAMES, value=DEFAULT_TARGET_LANGUAGE ), # target_language ) elif task_name == "T2TT": return ( gr.update(visible=False), # audio_box gr.update(visible=True), # input_text gr.update(visible=True), # source_language gr.update( visible=True, choices=T2TT_TARGET_LANGUAGE_NAMES, value=DEFAULT_TARGET_LANGUAGE ), # target_language ) elif task_name == "ASR": return ( gr.update(visible=True), # audio_box gr.update(visible=False), # input_text gr.update(visible=False), # source_language gr.update( visible=True, choices=S2TT_TARGET_LANGUAGE_NAMES, value=DEFAULT_TARGET_LANGUAGE ), # target_language ) else: raise ValueError(f"Unknown task: {task_name}") def update_output_ui(task_name: str) -> tuple[dict, dict]: task_name = task_name.split()[0] if task_name in ["S2ST", "T2ST"]: return ( gr.update(visible=True, value=None), # output_audio gr.update(value=None), # output_text ) elif task_name in ["S2TT", "T2TT", "ASR"]: return ( gr.update(visible=False, value=None), # output_audio gr.update(value=None), # output_text ) else: raise ValueError(f"Unknown task: {task_name}") def update_example_ui(task_name: str) -> tuple[dict, dict, dict, dict, dict]: task_name = task_name.split()[0] return ( gr.update(visible=task_name == "S2ST"), # s2st_example_row gr.update(visible=task_name == "S2TT"), # s2tt_example_row gr.update(visible=task_name == "T2ST"), # t2st_example_row gr.update(visible=task_name == "T2TT"), # t2tt_example_row gr.update(visible=task_name == "ASR"), # asr_example_row ) with gr.Blocks(css="style.css") as demo: gr.Markdown(DESCRIPTION) gr.DuplicateButton( value="Duplicate Space for private use", elem_id="duplicate-button", visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1", ) with gr.Group(): task_name = gr.Dropdown( label="Task", choices=TASK_NAMES, value=TASK_NAMES[0], ) with gr.Row(): source_language = gr.Dropdown( label="Source language", choices=TEXT_SOURCE_LANGUAGE_NAMES, value="English", visible=False, ) target_language = gr.Dropdown( label="Target language", choices=S2ST_TARGET_LANGUAGE_NAMES, value=DEFAULT_TARGET_LANGUAGE, ) with gr.Row() as audio_box: audio_source = gr.Radio( label="Audio source", choices=["file", "microphone"], value="file", ) input_audio_mic = gr.Audio( label="Input speech", type="filepath", source="microphone", visible=False, ) input_audio_file = gr.Audio( label="Input speech", type="filepath", source="upload", visible=True, ) input_text = gr.Textbox(label="Input text", visible=False) btn = gr.Button("Translate") with gr.Column(): output_audio = gr.Audio( label="Translated speech", autoplay=False, streaming=False, type="numpy", ) output_text = gr.Textbox(label="Translated text") with gr.Row(visible=True) as s2st_example_row: s2st_examples = gr.Examples( examples=[ ["assets/sample_input.mp3", "French"], ["assets/sample_input.mp3", "Mandarin Chinese"], ["assets/sample_input_2.mp3", "Hindi"], ["assets/sample_input_2.mp3", "Spanish"], ], inputs=[input_audio_file, target_language], outputs=[output_audio, output_text], fn=process_s2st_example, cache_examples=CACHE_EXAMPLES, ) with gr.Row(visible=False) as s2tt_example_row: s2tt_examples = gr.Examples( examples=[ ["assets/sample_input.mp3", "French"], ["assets/sample_input.mp3", "Mandarin Chinese"], ["assets/sample_input_2.mp3", "Hindi"], ["assets/sample_input_2.mp3", "Spanish"], ], inputs=[input_audio_file, target_language], outputs=[output_audio, output_text], fn=process_s2tt_example, cache_examples=CACHE_EXAMPLES, ) with gr.Row(visible=False) as t2st_example_row: t2st_examples = gr.Examples( examples=[ ["My favorite animal is the elephant.", "English", "French"], ["My favorite animal is the elephant.", "English", "Mandarin Chinese"], ["Meta AI's Seamless M4T model is democratising spoken communication across language barriers", "English", "Hindi"], ["Meta AI's Seamless M4T model is democratising spoken communication across language barriers", "English", "Spanish"], ], inputs=[input_text, source_language, target_language], outputs=[output_audio, output_text], fn=process_t2st_example, cache_examples=CACHE_EXAMPLES, ) with gr.Row(visible=False) as t2tt_example_row: t2tt_examples = gr.Examples( examples=[ ["My favorite animal is the elephant.", "English", "French"], ["My favorite animal is the elephant.", "English", "Mandarin Chinese"], ["Meta AI's Seamless M4T model is democratising spoken communication across language barriers", "English", "Hindi"], ["Meta AI's Seamless M4T model is democratising spoken communication across language barriers", "English", "Spanish"], ], inputs=[input_text, source_language, target_language], outputs=[output_audio, output_text], fn=process_t2tt_example, cache_examples=CACHE_EXAMPLES, ) with gr.Row(visible=False) as asr_example_row: asr_examples = gr.Examples( examples=[ ["assets/sample_input.mp3", "English"], ["assets/sample_input_2.mp3", "English"], ], inputs=[input_audio_file, target_language], outputs=[output_audio, output_text], fn=process_asr_example, cache_examples=CACHE_EXAMPLES, ) audio_source.change( fn=update_audio_ui, inputs=audio_source, outputs=[ input_audio_mic, input_audio_file, ], queue=False, api_name=False, ) task_name.change( fn=update_input_ui, inputs=task_name, outputs=[ audio_box, input_text, source_language, target_language, ], queue=False, api_name=False, ).then( fn=update_output_ui, inputs=task_name, outputs=[output_audio, output_text], queue=False, api_name=False, ).then( fn=update_example_ui, inputs=task_name, outputs=[ s2st_example_row, s2tt_example_row, t2st_example_row, t2tt_example_row, asr_example_row, ], queue=False, api_name=False, ) btn.click( fn=predict, inputs=[ task_name, audio_source, input_audio_mic, input_audio_file, input_text, source_language, target_language, ], outputs=[output_audio, output_text], api_name="run", ) demo.queue(max_size=50).launch()