Spaces:
Running
Running
import gc | |
import glob | |
import logging | |
import os | |
import traceback | |
import cpuinfo | |
import numpy as np | |
import psutil | |
import torch | |
# from contants import config | |
from contants import config | |
import utils | |
from bert_vits2 import Bert_VITS2 | |
from contants import ModelType | |
from gpt_sovits.gpt_sovits import GPT_SoVITS | |
from logger import logger | |
from manager.observer import Subject | |
from utils.data_utils import HParams, check_is_none | |
from vits import VITS | |
from vits.hubert_vits import HuBert_VITS | |
from vits.w2v2_vits import W2V2_VITS | |
class ModelManager(Subject): | |
def __init__(self, device=config.system.device): | |
self.device = device | |
self.logger = logger | |
self.models = { | |
# ModelType:{model_id: {"model_path": model_path, "config_path": config_path, "model": model, | |
# "n_speakers": n_speakers}}, | |
# model_id 类型为 int | |
ModelType.VITS: {}, | |
ModelType.HUBERT_VITS: {}, | |
ModelType.W2V2_VITS: {}, | |
ModelType.BERT_VITS2: {}, | |
ModelType.GPT_SOVITS: {}, | |
} | |
self.sid2model = { | |
# ModelType:[{"real_id": real_id, "model": model, "model_id": model_id, "n_speakers": n_speakers}] | |
ModelType.VITS: [], | |
ModelType.HUBERT_VITS: [], | |
ModelType.W2V2_VITS: [], | |
ModelType.BERT_VITS2: [], | |
ModelType.GPT_SOVITS: [], | |
} | |
self.voice_speakers = { | |
ModelType.VITS.value: [], | |
ModelType.HUBERT_VITS.value: [], | |
ModelType.W2V2_VITS.value: [], | |
ModelType.BERT_VITS2.value: [], | |
ModelType.GPT_SOVITS.value: [], | |
} | |
self.emotion_reference = None | |
self.hubert = None | |
self.dimensional_emotion_model = None | |
self.tts_front = None | |
self.bert_models = {} | |
self.model_handler = None | |
self.emotion_model = None | |
self.processor = None | |
# self.sid2model = [] | |
# self.name_mapping_id = [] | |
self.voice_objs_count = 0 | |
self._observers = [] | |
self.model_class_map = { | |
ModelType.VITS: VITS, | |
ModelType.HUBERT_VITS: HuBert_VITS, | |
ModelType.W2V2_VITS: W2V2_VITS, | |
ModelType.BERT_VITS2: Bert_VITS2, | |
ModelType.GPT_SOVITS: GPT_SoVITS, | |
} | |
self.available_tts_model = set() | |
def model_init(self): | |
if config.tts_config.auto_load: | |
models = self.scan_path() | |
else: | |
models = config.tts_config.asdict().get("models") | |
for model in models: | |
self.load_model(model_path=model.get("model_path"), | |
config_path=model.get("config_path"), | |
sovits_path=model.get("sovits_path"), | |
gpt_path=model.get("gpt_path")) | |
dimensional_emotion_model_path = os.path.join(config.abs_path, config.system.data_path, | |
config.model_config.dimensional_emotion_model) | |
if os.path.isfile(dimensional_emotion_model_path): | |
if self.dimensional_emotion_model is None: | |
self.dimensional_emotion_model = self.load_dimensional_emotion_model(dimensional_emotion_model_path) | |
self.log_device_info() | |
if self.vits_speakers_count != 0: | |
self.logger.info(f"[{ModelType.VITS.value}] {self.vits_speakers_count} speakers") | |
if self.hubert_speakers_count != 0: | |
self.logger.info(f"[{ModelType.HUBERT_VITS.value}] {self.hubert_speakers_count} speakers") | |
if self.w2v2_speakers_count != 0: | |
self.logger.info(f"[{ModelType.W2V2_VITS.value}] {self.w2v2_speakers_count} speakers") | |
if self.bert_vits2_speakers_count != 0: | |
self.logger.info(f"[{ModelType.BERT_VITS2.value}] {self.bert_vits2_speakers_count} speakers") | |
if self.gpt_sovits_speakers_count != 0: | |
self.logger.info(f"[{ModelType.GPT_SOVITS.value}] {self.gpt_sovits_speakers_count} speakers") | |
self.logger.info(f"{self.speakers_count} speakers in total.") | |
if self.speakers_count == 0: | |
self.logger.warning(f"No model was loaded.") | |
def vits_speakers(self): | |
return self.voice_speakers[ModelType.VITS] | |
def speakers_count(self): | |
return self.vits_speakers_count + self.hubert_speakers_count + self.w2v2_speakers_count + self.bert_vits2_speakers_count + self.gpt_sovits_speakers_count | |
def vits_speakers_count(self): | |
return len(self.voice_speakers[ModelType.VITS.value]) | |
def hubert_speakers_count(self): | |
return len(self.voice_speakers[ModelType.HUBERT_VITS.value]) | |
def w2v2_speakers_count(self): | |
return len(self.voice_speakers[ModelType.W2V2_VITS.value]) | |
def w2v2_emotion_count(self): | |
return len(self.emotion_reference) if self.emotion_reference is not None else 0 | |
def bert_vits2_speakers_count(self): | |
return len(self.voice_speakers[ModelType.BERT_VITS2.value]) | |
def gpt_sovits_speakers_count(self): | |
return len(self.voice_speakers[ModelType.GPT_SOVITS.value]) | |
# 添加观察者 | |
def attach(self, observer): | |
self._observers.append(observer) | |
# 移除观察者 | |
def detach(self, observer): | |
self._observers.remove(observer) | |
# 通知所有观察者 | |
def notify(self, event_type, **kwargs): | |
for observer in self._observers: | |
observer.update(event_type, **kwargs) | |
def log_device_info(self): | |
cuda_available = torch.cuda.is_available() | |
self.logger.info( | |
f"PyTorch Version: {torch.__version__} Cuda available:{cuda_available} Device type:{self.device.type}") | |
if self.device.type == 'cuda': | |
if cuda_available: | |
device_name = torch.cuda.get_device_name(self.device.index) | |
gpu_memory_info = round(torch.cuda.get_device_properties(self.device).total_memory / 1024 ** 3) # GB | |
self.logger.info( | |
f"Using GPU on {device_name} {gpu_memory_info}GB, GPU Device Index: {self.device.index}") | |
else: | |
self.logger.warning("GPU device specified, but CUDA is not available.") | |
else: | |
cpu_info = cpuinfo.get_cpu_info() | |
cpu_name = cpu_info.get("brand_raw") | |
cpu_count = psutil.cpu_count(logical=False) | |
thread_count = psutil.cpu_count(logical=True) | |
memory_info = psutil.virtual_memory() | |
total_memory = round(memory_info.total / (1024 ** 3)) | |
self.logger.info( | |
f"Using CPU on {cpu_name} with {cpu_count} cores and {thread_count} threads. Total memory: {total_memory}GB") | |
def relative_to_absolute_path(self, *paths): | |
absolute_paths = [] | |
for path in paths: | |
if path is None: | |
return None | |
path = os.path.normpath(path) | |
if path.startswith('models'): | |
path = os.path.join(config.abs_path, config.system.data_path, path) | |
else: | |
path = os.path.join(config.abs_path, config.system.data_path, config.tts_config.models_path, | |
path) | |
absolute_paths.append(path) | |
return absolute_paths | |
def absolute_to_relative_path(self, *paths): | |
relative_paths = [] | |
for path in paths: | |
if path is None: | |
relative_paths.append(None) | |
continue | |
# 获取models目录下的相对路径 | |
relative_path = os.path.relpath(path, os.path.join(config.abs_path, config.system.data_path, | |
config.tts_config.models_path)) | |
relative_paths.append(relative_path) | |
return relative_paths | |
def _load_model_from_path(self, model_path, config_path, sovits_path, gpt_path): | |
if check_is_none(sovits_path, gpt_path): | |
hps = utils.get_hparams_from_file(config_path) | |
model_type = self.recognition_model_type(hps) | |
else: | |
hps = None | |
model_type = ModelType.GPT_SOVITS | |
model_args = { | |
"model_type": model_type, | |
"model_path": model_path, | |
"config_path": config_path, | |
"sovits_path": sovits_path, | |
"gpt_path": gpt_path, | |
"config": hps, | |
"device": self.device | |
} | |
model_class = self.model_class_map[model_type] | |
model = model_class(**model_args) | |
if model_type == ModelType.VITS: | |
bert_embedding = getattr(hps.data, 'bert_embedding', getattr(hps.model, 'bert_embedding', False)) | |
if bert_embedding and self.tts_front is None: | |
self.load_VITS_PinYin_model( | |
os.path.join(config.abs_path, config.system.data_path, config.model_config.vits_chinese_bert)) | |
if not config.vits_config.dynamic_loading: | |
model.load_model() | |
self.available_tts_model.add(ModelType.VITS.value) | |
elif model_type == ModelType.W2V2_VITS: | |
if self.emotion_reference is None: | |
self.emotion_reference = self.load_npy( | |
os.path.join(config.abs_path, config.system.data_path, config.model_config.dimensional_emotion_npy)) | |
model.load_model(emotion_reference=self.emotion_reference, | |
dimensional_emotion_model=self.dimensional_emotion_model) | |
self.available_tts_model.add(ModelType.W2V2_VITS.value) | |
elif model_type == ModelType.HUBERT_VITS: | |
if self.hubert is None: | |
self.hubert = self.load_hubert_model( | |
os.path.join(config.abs_path, config.system.data_path, config.model_config.hubert_soft_0d54a1f4)) | |
model.load_model(hubert=self.hubert) | |
elif model_type == ModelType.BERT_VITS2: | |
bert_model_names = model.bert_model_names | |
for bert_model_name in bert_model_names.values(): | |
if self.model_handler is None: | |
from manager.model_handler import ModelHandler | |
self.model_handler = ModelHandler(self.device) | |
self.model_handler.load_bert(bert_model_name) | |
if model.hps_ms.model.emotion_embedding == 1: | |
self.model_handler.load_emotion() | |
elif model.hps_ms.model.emotion_embedding == 2: | |
self.model_handler.load_clap() | |
model.load_model(self.model_handler) | |
self.available_tts_model.add(ModelType.BERT_VITS2.value) | |
elif model_type == ModelType.GPT_SOVITS: | |
if self.model_handler is None: | |
from manager.model_handler import ModelHandler | |
self.model_handler = ModelHandler(self.device) | |
self.model_handler.load_ssl() | |
self.model_handler.load_bert("CHINESE_ROBERTA_WWM_EXT_LARGE") | |
model.load_model(self.model_handler) | |
sid2model = [] | |
speakers = [] | |
new_id = len(self.voice_speakers[model_type.value]) | |
model_id = max([-1] + list(self.models[model_type].keys())) + 1 | |
for real_id, name in enumerate(model.speakers): | |
sid2model.append({"real_id": real_id, "model": model, "model_id": model_id}) | |
speakers.append({"id": new_id, "name": name, "lang": model.lang}) | |
new_id += 1 | |
model_data = { | |
"model": model, | |
"model_type": model_type, | |
"model_id": model_id, | |
"model_path": model_path, | |
"config": hps, | |
"sovits_path": sovits_path, | |
"gpt_path": gpt_path, | |
"sid2model": sid2model, | |
"speakers": speakers | |
} | |
if model_type == ModelType.GPT_SOVITS: | |
logging.info( | |
f"model_type:{model_type.value} model_id:{model_id} sovits_path:{sovits_path} gpt_path:{gpt_path}") | |
else: | |
logging.info( | |
f"model_type:{model_type.value} model_id:{model_id} n_speakers:{len(speakers)} model_path:{model_path}") | |
return model_data | |
def load_model(self, model_path: str, config_path: str, sovits_path: str, gpt_path: str): | |
try: | |
if not check_is_none(model_path, config_path): | |
model_path, config_path = self.relative_to_absolute_path(model_path, config_path) | |
else: | |
sovits_path, gpt_path = self.relative_to_absolute_path(sovits_path, gpt_path) | |
model_data = self._load_model_from_path(model_path, config_path, sovits_path, gpt_path) | |
model_id = model_data["model_id"] | |
sid2model = model_data["sid2model"] | |
model_type = model_data["model_type"] | |
self.models[model_type][model_id] = { | |
"model_type": model_data.get("model_type"), | |
"model_path": model_path, | |
"config_path": config_path, | |
"sovits_path": sovits_path, | |
"gpt_path": gpt_path, | |
"model": model_data.get("model"), | |
"n_speakers": len(model_data["speakers"])} | |
self.sid2model[model_type].extend(sid2model) | |
self.voice_speakers[model_type.value].extend(model_data["speakers"]) | |
self.notify("model_loaded", model_manager=self) | |
state = True | |
except Exception as e: | |
self.logger.info(f"Loading failed. {e}") | |
self.logger.error(traceback.format_exc()) | |
state = False | |
return state | |
def unload_model(self, model_type_value: str, model_id: str): | |
state = False | |
model_type = ModelType(model_type_value) | |
model_id = int(model_id) | |
try: | |
if model_id in self.models[model_type].keys(): | |
model_data = self.models[model_type][model_id] | |
model = model_data.get("model") | |
n_speakers = model_data.get("n_speakers") | |
start = 0 | |
for key, value in self.models[model_type].items(): | |
if key == model_id: | |
break | |
start += value.get("n_speakers") | |
if model_type == ModelType.BERT_VITS2: | |
for bert_model_name in model.bert_model_names.values(): | |
self.model_handler.release_bert(bert_model_name) | |
if model.version == "2.1": | |
self.model_handler.release_emotion() | |
elif model.version in ["2.2", "extra", "2.4"]: | |
self.model_handler.release_clap() | |
elif model_type == ModelType.GPT_SOVITS: | |
self.model_handler.release_bert("CHINESE_ROBERTA_WWM_EXT_LARGE") | |
self.model_handler.release_ssl_model() | |
del self.sid2model[model_type][start:start + n_speakers] | |
del self.voice_speakers[model_type.value][start:start + n_speakers] | |
del self.models[model_type][model_id] | |
for new_id, speaker in enumerate(self.voice_speakers[model_type.value]): | |
speaker["id"] = new_id | |
gc.collect() | |
torch.cuda.empty_cache() | |
state = True | |
self.notify("model_unloaded", model_manager=self) | |
self.logger.info(f"Unloading success.") | |
except Exception as e: | |
logging.error(traceback.print_exc()) | |
logging.error(f"Unloading failed. {e}") | |
state = False | |
return state | |
def load_dimensional_emotion_model(self, model_path): | |
try: | |
import audonnx | |
root = os.path.dirname(model_path) | |
model_file = model_path | |
dimensional_emotion_model = audonnx.load(root=root, model_file=model_file) | |
self.notify("model_loaded", model_manager=self) | |
except Exception as e: | |
self.logger.warning(f"Load DIMENSIONAL_EMOTION_MODEL failed {e}") | |
return dimensional_emotion_model | |
def unload_dimensional_emotion_model(self): | |
self.dimensional_emotion_model = None | |
self.notify("model_unloaded", model_manager=self) | |
def load_hubert_model(self, model_path): | |
""""HuBERT-VITS""" | |
try: | |
from vits.hubert_model import hubert_soft | |
hubert = hubert_soft(model_path) | |
except Exception as e: | |
self.logger.warning(f"Load HUBERT_SOFT_MODEL failed {e}") | |
return hubert | |
def unload_hubert_model(self): | |
self.hubert = None | |
self.notify("model_unloaded", model_manager=self) | |
def load_VITS_PinYin_model(self, bert_path): | |
""""vits_chinese""" | |
from vits.text.vits_pinyin import VITS_PinYin | |
if self.tts_front is None: | |
self.tts_front = VITS_PinYin(bert_path, self.device) | |
def reorder_model(self, old_index, new_index): | |
"""重新排序模型,将old_index位置的模型移动到new_index位置""" | |
if 0 <= old_index < len(self.models) and 0 <= new_index < len(self.models): | |
model = self.models[old_index] | |
del self.models[old_index] | |
self.models.insert(new_index, model) | |
def get_models_path(self): | |
"""按返回模型路径列表,列表每一项为{"model_path": model_path, "config_path": config_path}""" | |
info = [] | |
for models in self.models.values(): | |
for model in models.values(): | |
info.append({ | |
"model_type": model.get("model_type"), | |
"model_path": model.get("model_path"), | |
"config_path": model.get("config_path"), | |
"sovits_path": model.get("sovits_path"), | |
"gpt_path": model.get("gpt_path"), | |
}) | |
return info | |
def get_models_path_by_type(self): | |
"""按模型类型返回模型路径""" | |
info = { | |
ModelType.VITS.value: [], | |
ModelType.HUBERT_VITS.value: [], | |
ModelType.W2V2_VITS.value: [], | |
ModelType.BERT_VITS2.value: [], | |
ModelType.GPT_SOVITS.value: [], | |
} | |
for model_type, models in self.models.items(): | |
for values in models.values(): | |
info[model_type].append(values[0]) | |
return info | |
def get_models_info(self): | |
"""按模型类型返回模型文件夹名以及模型文件名,speakers数量""" | |
info = { | |
ModelType.VITS.value: [], | |
ModelType.HUBERT_VITS.value: [], | |
ModelType.W2V2_VITS.value: [], | |
ModelType.BERT_VITS2.value: [], | |
ModelType.GPT_SOVITS.value: [], | |
} | |
for model_type, model_data in self.models.items(): | |
if model_type != ModelType.GPT_SOVITS: | |
for model_id, model in model_data.items(): | |
model_path = model.get("model_path") | |
config_path = model.get("config_path") | |
model_path = self.absolute_to_relative_path(model_path)[0].replace("\\", "/") | |
config_path = self.absolute_to_relative_path(config_path)[0].replace("\\", "/") | |
info[model_type.value].append( | |
{"model_id": model_id, | |
"model_path": model_path, | |
"config_path": config_path, | |
"n_speakers": model.get("n_speakers")}) | |
else: | |
for model_id, model in model_data.items(): | |
sovits_path = model.get("sovits_path") | |
gpt_path = model.get("gpt_path") | |
sovits_path = self.absolute_to_relative_path(sovits_path)[0].replace("\\", "/") | |
gpt_path = self.absolute_to_relative_path(gpt_path)[0].replace("\\", "/") | |
info[model_type.value].append( | |
{"model_id": model_id, | |
"sovits_path": sovits_path, | |
"gpt_path": gpt_path, | |
"n_speakers": model.get("n_speakers")}) | |
return info | |
def get_model_by_index(self, model_type, model_id): | |
"""根据给定的索引返回模型""" | |
if 0 <= model_id < len(self.models): | |
_, model, _ = self.models[model_type][model_id] | |
return model | |
return None | |
# def get_bert_model(self, bert_model_name): | |
# if bert_model_name not in self.bert_models: | |
# raise ValueError(f"Model {bert_model_name} not loaded!") | |
# return self.bert_models[bert_model_name] | |
def clear_all(self): | |
"""清除所有模型""" | |
self.models.clear() | |
def recognition_model_type(self, hps: HParams) -> str: | |
# model_config = json.load(model_config_json) | |
symbols = getattr(hps, "symbols", None) | |
# symbols = model_config.get("symbols", None) | |
emotion_embedding = getattr(hps.data, "emotion_embedding", False) | |
if "use_spk_conditioned_encoder" in hps.model: | |
model_type = ModelType.BERT_VITS2 | |
return model_type | |
if symbols != None: | |
if not emotion_embedding: | |
mode_type = ModelType.VITS | |
else: | |
mode_type = ModelType.W2V2_VITS | |
else: | |
mode_type = ModelType.HUBERT_VITS | |
return mode_type | |
def _load_npy_from_path(self, path): | |
model_extention = os.path.splitext(path)[1] | |
if model_extention != ".npy": | |
raise ValueError(f"Unsupported model type: {model_extention}") | |
return np.load(path).reshape(-1, 1024) | |
def load_npy(self, emotion_reference_npy): | |
emotion_reference = np.empty((0, 1024)) | |
if isinstance(emotion_reference_npy, list): | |
for i in emotion_reference_npy: | |
emotion_reference = np.append(emotion_reference, self._load_npy_from_path(i), axis=0) | |
elif os.path.isdir(emotion_reference_npy): | |
for root, dirs, files in os.walk(emotion_reference_npy): | |
for file_name in files: | |
if file_name.endswith(".npy"): | |
file_path = os.path.join(root, file_name) | |
emotion_reference = np.append(emotion_reference, self._load_npy_from_path(file_path), | |
axis=0) | |
elif os.path.isfile(emotion_reference_npy): | |
emotion_reference = self._load_npy_from_path(emotion_reference_npy) | |
logging.info(f"Loaded emotional dimention npy range: {len(emotion_reference)}") | |
return emotion_reference | |
def scan_path(self): | |
folder_path = os.path.join(config.abs_path, config.system.data_path, config.tts_config.models_path) | |
model_paths = glob.glob(folder_path + "/**/*.pth", recursive=True) | |
all_paths = [] | |
for id, pth_path in enumerate(model_paths): | |
pth_name = os.path.basename(pth_path) | |
if pth_name.startswith(("D_", "DUR_")): | |
continue | |
dir_name = os.path.dirname(pth_path) | |
config_paths = glob.glob(dir_name + "/*.json", recursive=True) | |
gpt_paths = glob.glob(dir_name + "/*.ckpt", recursive=True) | |
model_path, config_path, sovits_path, gpt_path, model_type = None, None, None, None, None | |
if len(config_paths) > 0: | |
model_path = pth_path | |
config_path = config_paths[0] | |
elif len(gpt_paths) > 0: | |
gpt_path = gpt_paths[0] | |
sovits_path = pth_path | |
model_type = ModelType.GPT_SOVITS | |
else: | |
continue | |
info = { | |
"model_id": id, | |
"model_type": model_type, | |
"model_path": model_path, | |
"config_path": config_path, | |
"sovits_path": sovits_path, | |
"gpt_path": gpt_path, | |
} | |
all_paths.append(info) | |
return all_paths | |
def scan_unload_path(self): | |
all_paths = self.scan_path() | |
unload_paths = [] | |
loaded_paths = [] | |
loaded_paths_2 = [] | |
for model in self.get_models_path(): | |
# 只取已加载的模型路径 | |
if model.get("model_type") == ModelType.GPT_SOVITS: | |
sovits_path, gpt_path = self.absolute_to_relative_path(model.get("sovits_path"), | |
model.get("gpt_path")) | |
sovits_path, gpt_path = sovits_path.replace("\\", "/"), gpt_path.replace("\\", "/") | |
loaded_paths_2.append((sovits_path, gpt_path)) | |
else: | |
model_path = self.absolute_to_relative_path(model.get("model_path"))[0].replace("\\", "/") | |
loaded_paths.append(model_path) | |
for info in all_paths: | |
# 将绝对路径修改为相对路径,并将分隔符格式化为'/' | |
if info.get("model_type") == ModelType.GPT_SOVITS: | |
sovits_path, gpt_path = self.absolute_to_relative_path(info.get("sovits_path"), | |
info.get("gpt_path")) | |
sovits_path, gpt_path = sovits_path.replace("\\", "/"), gpt_path.replace("\\", "/") | |
if not self.is_path_loaded((sovits_path, gpt_path), loaded_paths_2): | |
info.update( | |
{"model_type": info.get("model_type").value, "sovits_path": sovits_path, "gpt_path": gpt_path}) | |
unload_paths.append(info) | |
else: | |
model_path, config_path = self.absolute_to_relative_path(info.get("model_path"), | |
info.get("config_path")) | |
model_path, config_path = model_path.replace("\\", "/"), config_path.replace("\\", "/") | |
if not self.is_path_loaded(model_path, loaded_paths): | |
info.update({"model_path": model_path, "config_path": config_path}) | |
unload_paths.append(info) | |
return unload_paths | |
def is_path_loaded(self, paths, loaded_paths): | |
if len(paths) == 2: | |
sovits_path, gpt_path = paths | |
for loaded_path in loaded_paths: | |
if sovits_path == loaded_path[0] and gpt_path == loaded_path[1]: | |
return True | |
else: | |
path = paths | |
for loaded_path in loaded_paths: | |
if path == loaded_path: | |
return True | |
return False | |