Spaces:
Runtime error
Runtime error
| import os | |
| import argparse | |
| import random | |
| import jsonlines | |
| import soundfile as sf | |
| import json | |
| import copy | |
| import torch | |
| from pathlib import Path | |
| from threading import Thread | |
| import torchaudio | |
| from transformers import AutoTokenizer | |
| from model import VoilaAudioAlphaModel, VoilaModel, VoilaAutonomousModel | |
| from spkr import SpeakerEmbedding | |
| from voila_tokenizer import VoilaTokenizer | |
| from tokenize_func import ( | |
| voila_input_format, | |
| AUDIO_TOKEN_FORMAT, | |
| DEFAULT_AUDIO_TOKEN, | |
| DEFAULT_ASSISTANT_TOKEN, | |
| ) | |
| def disable_torch_init(): | |
| """ | |
| Disable the redundant torch default initialization to accelerate model creation. | |
| """ | |
| import torch | |
| setattr(torch.nn.Linear, "reset_parameters", lambda self: None) | |
| setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None) | |
| def load_model(model_name, audio_tokenizer_path): | |
| disable_torch_init() | |
| if "Voila-audio" in model_name: | |
| model_type = "audio" | |
| cls = VoilaAudioAlphaModel | |
| elif "Voila-auto" in model_name: | |
| model_type = "autonomous" | |
| cls = VoilaAutonomousModel | |
| else: | |
| model_type = "token" | |
| cls = VoilaModel | |
| model = cls.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.bfloat16, | |
| use_flash_attention_2=True, | |
| use_cache=True, | |
| ) | |
| model = model.cuda() | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| tokenizer_voila = VoilaTokenizer(model_path=audio_tokenizer_path, device="cuda") | |
| return model, tokenizer, tokenizer_voila, model_type | |
| def is_audio_output_task(task_type): | |
| return task_type.endswith("ao") or "aiao" in task_type or "tts" in task_type | |
| def eval_model(model, tokenizer, tokenizer_voila, model_type, task_type, history, ref_embs, ref_embs_mask, max_new_tokens=512): | |
| # step1: initializing | |
| num_codebooks = model.config.num_codebooks | |
| codebook_size = model.config.codebook_size | |
| AUDIO_MIN_TOKEN_ID = tokenizer.convert_tokens_to_ids(AUDIO_TOKEN_FORMAT.format(0)) | |
| assert isinstance(AUDIO_MIN_TOKEN_ID, int) | |
| AUDIO_MAX_TOKEN_ID = tokenizer.convert_tokens_to_ids(AUDIO_TOKEN_FORMAT.format(codebook_size*num_codebooks-1)) | |
| assert isinstance(AUDIO_MAX_TOKEN_ID, int) | |
| AUDIO_TOKEN_ID = tokenizer.convert_tokens_to_ids(DEFAULT_AUDIO_TOKEN) | |
| assert isinstance(AUDIO_TOKEN_ID, int) | |
| ASSISTANT_TOKEN_ID = tokenizer.convert_tokens_to_ids(DEFAULT_ASSISTANT_TOKEN) | |
| assert isinstance(ASSISTANT_TOKEN_ID, int) | |
| # step2: set infer config | |
| data_cfg = { | |
| "input_type": model_type, | |
| "task_type": task_type, | |
| "num_codebooks": num_codebooks, | |
| "codebook_size": codebook_size, | |
| } | |
| # step3: infer | |
| input_ids, audio_datas, audio_data_masks, streaming_user_input_audio_tokens = voila_input_format(history, tokenizer, tokenizer_voila, data_cfg) | |
| # prepare user_streaming_generator to simulate streaming user input | |
| def get_input_generator(all_tokens): | |
| assert all_tokens is not None | |
| for i in range(len(all_tokens[0])): | |
| yield all_tokens[:,i] | |
| if model_type == "autonomous": | |
| input_generator = get_input_generator(torch.as_tensor(streaming_user_input_audio_tokens).cuda()) | |
| input_ids = [torch.as_tensor([input]).transpose(1,2).cuda() for input in input_ids] # transpose to [bs, seq, num_codebooks] | |
| input_ids = torch.cat(input_ids, dim=2) # concat to [bs, seq, num_codebooks*2] | |
| else: | |
| input_ids = torch.as_tensor([input_ids]).transpose(1,2).cuda() # transpose to [bs, seq, num_codebooks] | |
| gen_params = { | |
| "input_ids": input_ids, | |
| "ref_embs": ref_embs, | |
| "ref_embs_mask": ref_embs_mask, | |
| "max_new_tokens": max_new_tokens, | |
| "pad_token_id": tokenizer.pad_token_id, | |
| "eos_token_id": tokenizer.eos_token_id, | |
| "llm_audio_token_id": AUDIO_TOKEN_ID, | |
| "min_audio_token_id": AUDIO_MIN_TOKEN_ID, | |
| "temperature": 0.2, | |
| "top_k": 50, | |
| "audio_temperature": 0.8, | |
| "audio_top_k": 50, | |
| } | |
| if model_type == "audio": | |
| audio_datas = torch.tensor([audio_datas], dtype=torch.bfloat16).cuda() | |
| audio_data_masks = torch.tensor([audio_data_masks]).cuda() | |
| gen_params["audio_datas"] = audio_datas | |
| gen_params["audio_data_masks"] = audio_data_masks | |
| elif model_type == "autonomous": | |
| gen_params["input_generator"] = input_generator | |
| gen_params["llm_assistant_token_id"] = ASSISTANT_TOKEN_ID | |
| print(f"Input str: {tokenizer.decode(input_ids[0, :, 0])}") | |
| with torch.inference_mode(): | |
| outputs = model.run_generate(**gen_params) | |
| if model_type == "autonomous": | |
| outputs = outputs.chunk(2, dim=2)[1] | |
| outputs = outputs[0].cpu().tolist() | |
| predict_outputs = outputs[input_ids.shape[1]:] | |
| text_outputs = [] | |
| audio_outputs = [] | |
| for _ in range(num_codebooks): | |
| audio_outputs.append([]) | |
| for item in predict_outputs: | |
| if item[0] >= AUDIO_MIN_TOKEN_ID and item[0] <= AUDIO_MAX_TOKEN_ID: | |
| for n, at in enumerate(item): | |
| audio_outputs[n].append((at - AUDIO_MIN_TOKEN_ID)%codebook_size) | |
| elif item[0] != tokenizer.eos_token_id: | |
| text_outputs.append(item[0]) | |
| out ={ | |
| 'text': tokenizer.decode(text_outputs), | |
| } | |
| if is_audio_output_task(task_type): | |
| audio_values = tokenizer_voila.decode(torch.tensor(audio_outputs).cuda()) | |
| out['audio'] = (audio_values.detach().cpu().numpy(), 16000) | |
| return out | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--instruction", type=str, default="") | |
| parser.add_argument("--input-text", type=str, default=None) | |
| parser.add_argument("--input-audio", type=str, default=None) | |
| parser.add_argument("--result-path", type=str, default="output") | |
| parser.add_argument("--ref-audio", type=str, default="examples/test1.mp3") | |
| parser.add_argument("--model-name", type=str, default="maitrix-org/Voila-chat") | |
| parser.add_argument("--audio-tokenizer-path", type=str, default="maitrix-org/Voila-Tokenizer") | |
| parser.add_argument("--task-type", type=str, default="chat_aiao") | |
| args = parser.parse_args() | |
| assert args.model_name in [ | |
| "maitrix-org/Voila-audio-alpha", | |
| "maitrix-org/Voila-base", | |
| "maitrix-org/Voila-chat", | |
| "maitrix-org/Voila-autonomous-preview", | |
| ] | |
| # step0: Model loading | |
| model, tokenizer, tokenizer_voila, model_type = load_model(args.model_name, args.audio_tokenizer_path) | |
| # step1: prepare inputs | |
| Path(args.result_path).mkdir(exist_ok=True, parents=True) | |
| history = { | |
| "instruction": args.instruction, | |
| "conversations": [], | |
| } | |
| if args.input_text is not None: | |
| history["conversations"].append({"from": "user", "text": args.input_text}) | |
| elif args.input_audio is not None: | |
| history["conversations"].append({"from": "user", "audio": {"file": args.input_audio}}) | |
| else: | |
| raise Exception("Please provide atleast one of --input-text and --input-audio") | |
| history["conversations"].append({"from": "assistant"}) | |
| # step2: encode ref | |
| ref_embs, ref_embs_mask = None, None | |
| if is_audio_output_task(args.task_type): | |
| spkr_model = SpeakerEmbedding(device="cuda") | |
| wav, sr = torchaudio.load(args.ref_audio) | |
| ref_embs = spkr_model(wav, sr) | |
| ref_embs_mask = torch.tensor([1]).cuda() | |
| out = eval_model(model, tokenizer, tokenizer_voila, model_type, args.task_type, history, ref_embs, ref_embs_mask) | |
| print(f"Output str: {out['text']}") | |
| if 'audio' in out: | |
| wav, sr = out['audio'] | |
| save_name = f"{args.result_path}/out.wav" | |
| sf.write(save_name, wav, sr) | |