rufflet17's picture
Update gradio_tabs/single.py
f2c5dfc verified
raw
history blame
77.7 kB
import datetime
from datetime import timezone, timedelta # タイムゾーン対応のために追加
import json
import os
import re # ファイル名サニタイズ用
import sys
import torch
import numpy as np # シード設定用
import random # シード設定用
from pathlib import Path
import time # sleep用
import gradio as gr
import shutil # フォルダ/ファイル名変更用, ファイルコピー用
import pyopenjtalk
import io # メモリ上でのファイル操作用
from pydub import AudioSegment # 結合機能のために追加
import hashlib # メタデータハッシュ化用
import math # ダミー計算用, 容量計算用
import tempfile # 一時ファイル作成用
import functools
import uuid # 結合ファイルの一意な名前生成のために追加
from typing import Dict, Any, List, Tuple, Optional, Set
# --- ログ設定 ---
# TrueにするとターミナルとUIに詳細なログが出力されます。
# Falseにすると、エラーや重要な通知以外のログは抑制されます。
ENABLE_LOGGING = False
# (TTSModelHolder, MockTTSModelなどのモックやヘルパー関数は変更なしのため省略します)
# --- タイムゾーン定義 ---
# グローバルな定数としてJSTを定義
JST = timezone(timedelta(hours=9), 'JST')
# --- モック(本来はライブラリからインポート) ---
class TTSModelHolder:
def __init__(self, root_dir="model_assets"):
self.root_dir = Path(root_dir)
self.model_names = []
self.current_model = None
self._setup_root_dir_and_samples()
self.refresh() # 初回読み込み
def _setup_root_dir_and_samples(self):
"""ルートディレクトリの存在を確認し、空であればサンプルモデルを作成する。"""
p = self.root_dir
if not p.is_dir():
p.mkdir(parents=True, exist_ok=True)
# 起動時に一度だけサンプルモデルを作成するロジック
if not any(p.iterdir()):
if ENABLE_LOGGING:
print("No models found in model_assets. Creating sample models...")
# Sample Model 1
model1_path = p / "MyModel1"
model1_path.mkdir(parents=True, exist_ok=True)
(model1_path / "G_0.safetensors").touch()
config1 = {"data": {"style2id": {"Neutral": 0, "喜び": 1, "悲しみ": 2}}}
with open(model1_path / "config.json", "w", encoding="utf-8") as f:
json.dump(config1, f, indent=2)
# Sample Model 2 (with multiple safetensors and custom styles)
model2_path = p / "mikeneko"
model2_path.mkdir(parents=True, exist_ok=True)
(model2_path / "G_mikeneko_v1.safetensors").touch()
(model2_path / "G_mikeneko_v2_experimental.safetensors").touch()
config2 = {"data": {"style2id": {"Neutral": 0, "1": 1, "2": 2}}}
with open(model2_path / "config.json", "w", encoding="utf-8") as f:
json.dump(config2, f, indent=2)
style_settings_data = {
"styles": {
"Neutral": { "display_name": "Neutral", "weight": 1.0 },
"1": { "display_name": "クール", "weight": 0.8 },
"2": { "display_name": "可愛い", "weight": 1.2 },
}
}
with open(model2_path / "style_settings.json", "w", encoding="utf-8") as f:
json.dump(style_settings_data, f, indent=2, ensure_ascii=False)
# FNモデル (FN1-10)
if ENABLE_LOGGING:
print("Creating FN models (FN1-10)...")
for i in range(1, 11):
fn_path = p / f"FN{i}"
fn_path.mkdir(exist_ok=True)
(fn_path / "G_0.safetensors").touch()
with open(fn_path / "config.json", "w") as f:
json.dump({"data": {"style2id": {"Neutral": 0}}}, f)
# whisperモデル (非表示用)
if ENABLE_LOGGING:
print("Creating 'whisper' model...")
whisper_path = p / "whisper"
whisper_path.mkdir(exist_ok=True)
(whisper_path / "G_0.safetensors").touch()
with open(whisper_path / "config.json", "w") as f:
json.dump({"data": {"style2id": {"Neutral": 0}}}, f)
def refresh(self) -> List[str]:
"""
モデルディレクトリを再スキャンし、内部のモデルリストを更新する。
更新後のモデルリストを返す。
"""
if self.root_dir.is_dir():
self.model_names = sorted([d.name for d in self.root_dir.iterdir() if d.is_dir()])
if ENABLE_LOGGING:
print(f"TTSModelHolder model list refreshed. Known models: {self.model_names}")
else:
self.model_names = []
if ENABLE_LOGGING:
print("TTSModelHolder root directory not found.")
return self.model_names
def get_model(self, model_name, model_path):
if ENABLE_LOGGING:
print(f"Loading model: {model_name} (file: {Path(model_path).name})")
if model_name not in self.model_names:
error_msg = (
f"Model '{model_name}' is not in the known list of TTSModelHolder. "
f"Current list: {self.model_names}. "
"Please refresh the model list by toggling the symlink checkbox or clicking the refresh button."
)
if ENABLE_LOGGING:
print(f"[ERROR] {error_msg}")
raise ValueError(error_msg)
self.current_model = MockTTSModel()
return self.current_model
class MockTTSModel:
def __init__(self):
self.spk2id = {"speaker_0": 0, "speaker_1": 1}
def infer(self, text, **kwargs):
length_scale = kwargs.get('length', 1.0)
if ENABLE_LOGGING:
print(f"Inferencing with text '{text}' and style: {kwargs.get('style')} and weight: {kwargs.get('style_weight')}, length_scale: {length_scale}")
sampling_rate = 44100
base_duration = max(1, len(text) // 5)
duration = base_duration * length_scale
dummy_audio = (torch.randn(int(sampling_rate * duration)) * 0.1 * 32767).numpy().astype("int16")
return sampling_rate, dummy_audio
class InvalidToneError(Exception): pass
class Languages:
JP, EN, ZH = "JP", "EN", "ZH"
@classmethod
@property
def value(cls):
return [cls.JP, cls.EN, cls.ZH]
GRADIO_THEME = "soft"
DEFAULT_ASSIST_TEXT_WEIGHT=0.5
DEFAULT_LENGTH=1.0
DEFAULT_NOISE=0.6
DEFAULT_NOISEW=0.8
DEFAULT_SDP_RATIO=0.2
DEFAULT_STYLE="Neutral"
DEFAULT_STYLE_WEIGHT=1.0
DEFAULT_WORKBENCH_PAUSE = 250
OUTPUT_SIZE_LIMIT_GB = 5
OUTPUT_SIZE_LIMIT_BYTES = OUTPUT_SIZE_LIMIT_GB * 1024**3
# --- ヘルパー関数 ---
STYLE_CONFIG_FILENAME_IN_MODEL_DIR = "style_settings.json"
assets_root_path = Path("model_assets")
INVALID_FILENAME_CHARS_PATTERN = r'[\\/*:"<>|_]'
INVALID_FILENAME_CHARS_FOR_DISPLAY = r'\ / * : " < > | _'
def sanitize_filename(name: str) -> str:
"""ファイル名として使えない文字をハイフンに置換する。"""
return re.sub(r'[\\/*?:"<>|]', '-', name)
def parse_merged_model_name(name: str) -> Optional[Tuple[str, List[int]]]:
parts = re.findall(r'([^_]+)_(\d+)p', name)
if not parts: return None
reconstructed_name = "_".join([f"{model_part}_{percent_part}p" for model_part, percent_part in parts])
if reconstructed_name != name: return None
sorted_parts = sorted(parts, key=lambda p: int(p[1]), reverse=True)
display_name = " ".join([f"{name_part} {percent_part}%" for name_part, percent_part in sorted_parts])
percentages = [int(p[1]) for p in sorted_parts]
return display_name, percentages
def format_and_sort_model_names(dir_list: List[str]) -> List[Tuple[str, str]]:
parsed_models, unparsed_models = [], []
for name in dir_list:
result = parse_merged_model_name(name)
if result:
display_name, percentages = result
parsed_models.append({'display': display_name, 'original': name, 'sort_key': percentages})
else:
unparsed_models.append((name, name))
sorted_parsed = sorted(parsed_models, key=lambda x: x['sort_key'], reverse=True)
result_list = [(m['display'], m['original']) for m in sorted_parsed]
result_list.extend(sorted(unparsed_models))
return result_list
def sort_models_by_custom_order(model_list: List[str], custom_order: List[str]) -> List[str]:
"""
モデルのリストをカスタム順序に基づいてソートする。
カスタム順序リストに含まれるモデルが先頭に、指定された順で並ぶ。
残りのモデルはその後ろにアルファベット順で続く。
"""
sorted_list = []
remaining_models = set(model_list)
# カスタム順序リストに基づいてモデルを追加
for model_name in custom_order:
if model_name in remaining_models:
sorted_list.append(model_name)
remaining_models.remove(model_name)
# 残りのモデルをアルファベット順で追加
sorted_list.extend(sorted(list(remaining_models)))
return sorted_list
def set_random_seed(seed: int):
if seed >= 0:
if ENABLE_LOGGING:
print(f"Setting random seed to: {seed}")
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
def get_directory_size(directory_path: Path) -> int:
total_size = 0
try:
for dirpath, _, filenames in os.walk(directory_path):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
try:
total_size += os.path.getsize(fp)
except OSError: pass
except FileNotFoundError: return 0
return total_size
def format_bytes(size_bytes: int) -> str:
if size_bytes == 0: return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_name[i]}"
# --- (pyopenjtalk関連ヘルパー関数は変更なしのため省略) ---
JIRITSUGO_POS = ["名詞", "動詞", "形容詞", "副詞", "連体詞", "接続詞", "感動詞", "接頭詞"]
def is_jirisugo(morpheme):
if morpheme['pos'] == '記号': return False
return morpheme['pos'] in JIRITSUGO_POS
def contains_kanji(text): return bool(re.search(r'[\u4e00-\u9faf]', text))
def kata2hira(text): return "".join(chr(ord(ch) - 96) if "ァ" <= ch <= "ヶ" else ch for ch in text)
def is_only_katakana(text): return bool(re.fullmatch(r'[\u30A1-\u30F6\u30FC]+', text))
def hiraganize_kanji_parts(block_text):
morphemes = pyopenjtalk.run_frontend(block_text)
if not morphemes: return block_text
result_parts = []
for m in morphemes:
if contains_kanji(m['string']):
reading_kata = m['read'] if 'read' in m and m['read'] != '*' else pyopenjtalk.g2p(m['string'], kana=True)
result_parts.append(kata2hira(reading_kata))
else: result_parts.append(m['string'])
return "".join(result_parts)
def split_into_bunsetsu(text):
if not text: return []
morphemes = pyopenjtalk.run_frontend(text)
result_list, current_unit = [], ""
for m in morphemes:
word = m['string']
if not current_unit or is_jirisugo(m) or m['pos'] == '記号':
if current_unit: result_list.append(current_unit)
current_unit = word
else: current_unit += word
if current_unit: result_list.append(current_unit)
final_result = []
for i, bunsetsu in enumerate(result_list):
if i > 0 and bunsetsu in "。、!?.,":
if final_result: final_result[-1] += bunsetsu
else: final_result.append(bunsetsu)
else: final_result.append(bunsetsu)
return final_result
def create_katakana_mixed_sentence(text, ratio=0.3):
if not text: return ""
bunsetsu_list = split_into_bunsetsu(text)
if not bunsetsu_list: return ""
num_to_convert = round(len(bunsetsu_list) * ratio)
if ratio > 0 and num_to_convert == 0 and len(bunsetsu_list) > 0: num_to_convert = 1
if num_to_convert == 0: return "".join(bunsetsu_list)
k = min(num_to_convert, len(bunsetsu_list))
indices_to_convert = random.sample(range(len(bunsetsu_list)), k=k)
new_bunsetsu_list = list(bunsetsu_list)
for i in indices_to_convert: new_bunsetsu_list[i] = pyopenjtalk.g2p(new_bunsetsu_list[i], kana=True)
return "".join(new_bunsetsu_list)
def process_random_hiraganization(blocks):
new_blocks = []
for block in blocks:
if contains_kanji(block):
rand_val = random.random()
if rand_val < 0.25: new_blocks.append(kata2hira(pyopenjtalk.g2p(block, kana=True)))
elif rand_val < 0.5: new_blocks.append(hiraganize_kanji_parts(block))
else: new_blocks.append(block)
elif is_only_katakana(block):
if random.random() < 0.5: new_blocks.append(kata2hira(block))
else: new_blocks.append(block)
else: new_blocks.append(block)
return new_blocks
def generate_one_variation(base_text, mode: int, ratio: float) -> List[str]:
text_to_process = create_katakana_mixed_sentence(base_text, ratio)
if mode == 2: results = [m['string'] for m in pyopenjtalk.run_frontend(text_to_process)]
elif mode == 3:
morphemes = pyopenjtalk.run_frontend(text_to_process)
if not morphemes: return []
result_list, current_unit, is_current_jiri = [], "", False
initial_type_set = False
for m in morphemes:
if m['pos'] != '記号': is_current_jiri = is_jirisugo(m); initial_type_set = True; break
if not initial_type_set: return [m['string'] for m in morphemes]
for m in morphemes:
word = m['string']
if m['pos'] == '記号':
if current_unit: result_list.append(current_unit)
result_list.append(word); current_unit = ""
continue
is_m_jiri = is_jirisugo(m)
if current_unit and is_m_jiri != is_current_jiri:
result_list.append(current_unit); current_unit = word; is_current_jiri = is_m_jiri
else:
current_unit += word; is_current_jiri = is_m_jiri
if current_unit: result_list.append(current_unit)
results = result_list
elif mode == 4: results = split_into_bunsetsu(text_to_process)
elif mode == 5:
morphemes = pyopenjtalk.run_frontend(text_to_process)
result_list, current_unit = [], ""
for m in morphemes:
current_unit += m['string']
if m['pos'] == '記号' and m['string'] in ['。', '!', '?', '.', '、', ',']: result_list.append(current_unit.strip()); current_unit = ""
elif m['pos'] == '助詞' and m['pos_group1'] == '接続助詞': result_list.append(current_unit.strip()); current_unit = ""
elif m['pos'] in ['動詞', '形容詞'] and m['ctype'] == '終止形': result_list.append(current_unit.strip()); current_unit = ""
elif m['pos'] == '助動詞' and m['string'] in ['だ', 'です', 'ます']: result_list.append(current_unit.strip()); current_unit = ""
if current_unit.strip(): result_list.append(current_unit.strip())
results = [c for c in result_list if c]
else: results = [text_to_process]
if results: results = process_random_hiraganization(results)
return results
# --- ここまで pyopenjtalk関連ヘルパー関数 ---
def find_safetensors_files_webui(model_dir_path_str: str):
model_dir_path = Path(model_dir_path_str)
if not model_dir_path.is_dir(): return []
return sorted([f.name for f in model_dir_path.glob("*.safetensors")])
def load_styles_from_model_folder(model_asset_path: Path) -> Dict[str, Any]:
final_styles: Dict[str, Any] = {}
config_path = model_asset_path / "config.json"
if config_path.exists():
try:
with open(config_path, 'r', encoding='utf-8') as f:
config_data = json.load(f)
if isinstance(config_data, dict) and "data" in config_data and "style2id" in config_data["data"]:
style2id = config_data["data"]["style2id"]
if isinstance(style2id, dict):
for style_name in style2id.keys():
final_styles[style_name] = {"display_name": style_name, "weight": DEFAULT_STYLE_WEIGHT}
except Exception as e:
if ENABLE_LOGGING:
print(f"Warning: Failed to load or parse {config_path}: {e}")
custom_style_config_path = model_asset_path / STYLE_CONFIG_FILENAME_IN_MODEL_DIR
if custom_style_config_path.exists():
try:
with open(custom_style_config_path, 'r', encoding='utf-8') as f:
custom_data = json.load(f)
if isinstance(custom_data, dict) and "styles" in custom_data and isinstance(custom_data["styles"], dict):
loaded_custom_styles = custom_data["styles"]
for style_key, style_info in loaded_custom_styles.items():
final_styles.setdefault(style_key, {})
final_styles[style_key].update(style_info)
except Exception as e:
if ENABLE_LOGGING:
print(f"Warning: Failed to load or parse {custom_style_config_path}: {e}")
if not final_styles or DEFAULT_STYLE not in final_styles:
final_styles[DEFAULT_STYLE] = {"display_name": DEFAULT_STYLE, "weight": DEFAULT_STYLE_WEIGHT}
return final_styles
def process_single_synthesis_webui(
model_holder_ref: TTSModelHolder, current_model_name: str, current_model_file_path_str: str,
text_to_synthesize: str, language_arg: str, speaker_name_arg: Optional[str],
style_arg: str, style_display_name_arg: str, style_weight_arg: float,
seed_arg: int,
reference_audio_path_arg: Optional[str],
length_scale_arg: float, noise_scale_arg: float, noise_scale_w_arg: float, sdp_ratio_arg: float,
pitch_scale_arg: float, intonation_scale_arg: float, use_assist_text_arg: bool,
assist_text_arg: Optional[str], assist_text_weight_arg: float
) -> Tuple[bool, List[str], Optional[Tuple[int, np.ndarray]]]:
current_model_file_path = Path(current_model_file_path_str)
log_messages = []
set_random_seed(seed_arg)
if seed_arg >= 0 and ENABLE_LOGGING:
log_messages.append(f"乱数シードを {seed_arg} に固定しました。")
try:
model_holder_ref.get_model(current_model_name, current_model_file_path)
if model_holder_ref.current_model is None:
msg = f"モデルのロード失敗: {current_model_name} (ファイル: {current_model_file_path.name})"
log_messages.append(f"❌ [エラー] {msg}"); return False, log_messages, None
if ENABLE_LOGGING:
log_messages.append(f"使用モデル: {current_model_name} (ファイル: {current_model_file_path.name})")
except Exception as e:
msg = f"モデルロードエラー '{current_model_name}' (ファイル: {current_model_file_path.name}): {e}"
log_messages.append(f"❌ [エラー] {msg}"); return False, log_messages, None
speaker_id = 0
if model_holder_ref.current_model and hasattr(model_holder_ref.current_model, 'spk2id'):
model_spk2id = model_holder_ref.current_model.spk2id
if speaker_name_arg and speaker_name_arg in model_spk2id:
speaker_id = model_spk2id[speaker_name_arg]
elif model_spk2id:
speaker_id = list(model_spk2id.values())[0]
if ENABLE_LOGGING:
log_messages.append(f"音声合成中...")
try:
length_for_model = 1.0 / length_scale_arg if length_scale_arg != 0 else 1.0
sr, audio_data = model_holder_ref.current_model.infer(
text=text_to_synthesize, language=language_arg,
reference_audio_path=reference_audio_path_arg,
sdp_ratio=sdp_ratio_arg, noise=noise_scale_arg, noise_w=noise_scale_w_arg,
length=length_for_model,
assist_text=assist_text_arg if use_assist_text_arg else None,
assist_text_weight=assist_text_weight_arg, style=style_arg, style_weight=style_weight_arg,
speaker_id=speaker_id, pitch_scale=pitch_scale_arg, intonation_scale=intonation_scale_arg,
)
except (InvalidToneError, ValueError) as e:
msg = f"合成エラー: {e}"; log_messages.append(f"❌ [エラー] {msg}"); return False, log_messages, None
except Exception as e:
msg = f"予期せぬエラー: {e}"; log_messages.append(f"❌ [エラー] {msg}"); return False, log_messages, None
return True, log_messages, (sr, audio_data)
def create_synthesis_app(model_holder: TTSModelHolder) -> gr.Blocks:
MERGER_CACHE_PATH = Path("/tmp/sbv2_merger_cache")
# 文字数制限の定数を定義
MAX_TEXT_LENGTH = 35
is_merger_cache_available = False
if sys.platform != "win32":
try:
MERGER_CACHE_PATH.mkdir(parents=True, exist_ok=True)
is_merger_cache_available = MERGER_CACHE_PATH.is_dir()
if is_merger_cache_available:
if ENABLE_LOGGING:
print(f"Merger cache directory is available at: {MERGER_CACHE_PATH}")
else:
if ENABLE_LOGGING:
print(f"Warning: Merger cache path {MERGER_CACHE_PATH} exists but is not a directory.")
except OSError as e:
if ENABLE_LOGGING:
print(f"Warning: Could not create or access merger cache directory {MERGER_CACHE_PATH}: {e}")
NORMAL_MODE_MODEL_ORDER = [
"mikeneko",
"MyModel1",
]
FN_MODE_MODEL_ORDER = [f"FN{i}" for i in range(1, 11)] # FN1, FN2, ... FN10 の順
custom_css = """
.audio-output-row { display: flex !important; flex-wrap: wrap !important; gap: 10px !important; }
.audio-item-column { flex-grow: 0 !important; flex-shrink: 0 !important; width: var(--audio-width, 250px) !important; background-color: #f8f9fa; padding: 8px; border-radius: 8px; border: 1px solid #dee2e6; }
.dummy-column { border: none !important; background: none !important; padding: 0 !important; margin: 0 !important; }
.compact-audio .wrap.svelte-1w9aqb2 { min-height: 40px !important; }
.compact-audio audio.svelte-1w9aqb2 { height: 40px !important; }
.workbench-item-container { border-bottom: 1px solid #dee2e6; padding: 8px 5px; }
.workbench-top-row { align-items: flex-start !important; }
.workbench-buttons-row { justify-content: space-between !important; }
.text-center { text-align: center; }
"""
with gr.Blocks(css=custom_css) as app:
MAX_AUDIO_OUTPUTS = 4
ITEMS_PER_ROW = 4
MAX_WORKBENCH_ITEMS = 8
all_styles_data_state = gr.State({})
synthesized_wav_files_state = gr.State([])
workbench_state = gr.State([])
merged_preview_state = gr.State({})
def update_workbench_ui(workbench_list: List[Dict]) -> Tuple:
updates = []
for i in range(MAX_WORKBENCH_ITEMS):
if i < len(workbench_list):
item = workbench_list[i]
is_merged = item.get("is_merged", False)
if is_merged:
info_text = (
f"**Text:** {item['text']}\n\n"
f"**Models:** {item['model']}"
)
else:
info_text = (
f"**Text:** {item['text']}\n\n"
f"**Model:** {item['model']}\n\n"
f"**Style:** {item['style']} (Weight: {item['style_weight']:.2f})"
)
wav_path = item['audio_path']
mp3_path = str(Path(wav_path).with_suffix('.mp3'))
playback_path = mp3_path if Path(mp3_path).exists() else wav_path
updates.extend([
gr.update(visible=True),
gr.update(value=f"**{i+1}**"),
gr.update(value=playback_path),
gr.update(value=wav_path, visible=True),
gr.update(value=info_text)
])
else:
updates.extend([
gr.update(visible=False),
gr.update(value=""),
gr.update(value=None),
gr.update(value=None, visible=False),
gr.update(value="")
])
return tuple(updates)
with gr.Tabs():
with gr.Tab("読み上げ"):
gr.Markdown("## 読み上げ")
with gr.Row():
with gr.Column(scale=3):
# infoに文字数制限を追記
text_input = gr.TextArea(
label="読み上げたいテキスト", lines=3, placeholder="ここにテキストを入力",
value="こんにちは、今日もいい天気ですね。", interactive=True,
info=f"最大{MAX_TEXT_LENGTH}文字まで。使用できない文字: {INVALID_FILENAME_CHARS_FOR_DISPLAY}"
)
generate_button = gr.Button("音声合成実行", variant="primary", interactive=True)
with gr.Column(visible=False) as audio_output_area:
gr.Markdown("#### 合成結果")
with gr.Row(elem_classes="audio-output-row"):
audio_item_columns = []
audio_outputs = []
download_buttons = []
to_workbench_buttons = []
synthesized_text_states = []
dummy_audio_item_columns = []
for i in range(MAX_AUDIO_OUTPUTS):
synthesized_text_states.append(gr.State(""))
with gr.Column(visible=False, elem_classes="audio-item-column") as audio_col:
audio_outputs.append(gr.Audio(
label=f"結果 {i+1}", elem_classes="compact-audio",
type="filepath", interactive=False
))
download_buttons.append(gr.DownloadButton("ダウンロード", scale=2, visible=False))
with gr.Row():
to_workbench_buttons.append(gr.Button("🛠️ キープ", scale=2))
audio_item_columns.append(audio_col)
for i in range(ITEMS_PER_ROW - 1):
with gr.Column(visible=False, elem_classes="audio-item-column dummy-column") as dummy_col:
pass
dummy_audio_item_columns.append(dummy_col)
with gr.Accordion("ステータス", open=True):
# ▼▼▼ 修正 ▼▼▼
# Textboxを複数行表示可能に変更
status_textbox = gr.Textbox(interactive=False, lines=1, max_lines=4, autoscroll=True, show_label=False, placeholder="ここにログが表示されます...")
# ▲▲▲ 修正 ▲▲▲
with gr.Column(scale=1):
with gr.Row():
use_fn_model_mode_checkbox = gr.Checkbox(label="FNモデル", value=False, interactive=True, scale=2)
use_symlink_mode_checkbox = gr.Checkbox(label="融☆合モデル", value=False, interactive=True, scale=2, visible=is_merger_cache_available)
refresh_model_list_button = gr.Button("再読込", scale=1)
selected_model_dropdown = gr.Dropdown(label="話者", choices=[], value=None, interactive=True)
current_styles_dropdown = gr.Dropdown(label="スタイル", choices=[], type="value", interactive=True)
style_weight_for_synth_slider = gr.Slider(label="スタイル強度", minimum=0.0, maximum=20.0, value=DEFAULT_STYLE_WEIGHT, step=0.1, info="自動的に推奨強度に設定されます", interactive=True)
batch_count_slider = gr.Slider(label="生成数", value=1, minimum=1, maximum=MAX_AUDIO_OUTPUTS, step=1, interactive=True)
with gr.Accordion("合成パラメータ", open=False):
length_scale_slider = gr.Slider(label="話速", minimum=0.5, maximum=2.0, value=DEFAULT_LENGTH, step=0.05, interactive=True)
pitch_scale_slider = gr.Slider(label="音高", minimum=0.5, maximum=2.0, value=1.0, step=0.01, interactive=True)
intonation_scale_slider = gr.Slider(label="抑揚", minimum=0.0, maximum=2.0, value=1.0, step=0.1, interactive=True)
with gr.Accordion("その他", open=False):
noise_scale_slider = gr.Slider(label="ノイズ強度", minimum=0.0, maximum=2.0, value=DEFAULT_NOISE, step=0.05, interactive=True)
noise_scale_w_slider = gr.Slider(label="持続時間ノイズ強度", minimum=0.0, maximum=2.0, value=DEFAULT_NOISEW, step=0.05, interactive=True)
sdp_ratio_slider = gr.Slider(label="SDP比率", minimum=0.0, maximum=1.0, value=DEFAULT_SDP_RATIO, step=0.05, interactive=True)
with gr.Accordion("設定", open=False):
language_dropdown = gr.Dropdown(label="言語", choices=Languages.value, value="JP", interactive=True)
seed_input = gr.Number(label="Seed", value=-1, info="再現性確保用。-1でランダム", precision=0, interactive=True)
player_width_slider = gr.Slider(label="プレイヤーの横幅 (px)", minimum=150, maximum=800, value=250, step=10, interactive=True)
speaker_name_textbox = gr.Textbox(label="話者名 (モデル依存、空欄で自動)", interactive=True)
reference_audio_input = gr.Audio(label="参照音声 (スタイル指定を上書き)", type="filepath", interactive=True)
use_assist_text_checkbox = gr.Checkbox(label="アシストテキスト使用", value=False, interactive=True)
assist_text_textbox = gr.Textbox(label="アシストテキスト", lines=2, visible=False, interactive=True)
assist_text_weight_slider = gr.Slider(label="アシスト強度", minimum=0.0, maximum=1.0, value=DEFAULT_ASSIST_TEXT_WEIGHT, step=0.05, visible=False, interactive=True)
js_injector_html = gr.HTML(visible=False)
with gr.Accordion("発音ガチャ設定", open=False):
generation_mode_radio = gr.Radio(label="生成モード", choices=["発音ガチャ1", "発音ガチャ2"], value="発音ガチャ1", interactive=True)
random_text_mode_slider = gr.Slider(label="分割の単位", minimum=1, maximum=4, value=1, step=1, info="1:形態素, 2:チャンク, 3:文節, 4:節", interactive=True)
random_text_ratio_textbox = gr.Textbox(label="カタカナ化の割合", value="0.5, 1", info="カンマ区切りで複数指定可。指定値からランダムに1つ使用。", interactive=True)
with gr.Tab("キープ"):
gr.Markdown("## キープ\n読み上げタブで生成した音声をここにストックし、結合や保存ができます。最大8個まで保持できます。")
workbench_items = []
all_workbench_ui_components = []
with gr.Row(variant="panel"):
with gr.Column(scale=3):
with gr.Row():
left_workbench_col = gr.Column(scale=1)
right_workbench_col = gr.Column(scale=1)
with gr.Column(scale=1):
with gr.Blocks():
with gr.Row():
first_audio_num_input = gr.Number(label="前半", value=1, minimum=1, step=1, precision=0, interactive=True)
second_audio_num_input = gr.Number(label="後半", value=2, minimum=1, step=1, precision=0, interactive=True)
merge_pause_input = gr.Number(label="間のポーズ(ms)", value=DEFAULT_WORKBENCH_PAUSE, minimum=-10000, step=10, info="マイナスで重ね合わせ(オーバーレイ)", interactive=True)
with gr.Row():
merge_preview_button = gr.Button("1.結合&プレビュー", variant="primary")
add_merged_to_workbench_button = gr.Button("2.結合した音声を追加", variant="primary")
delete_originals_checkbox = gr.Checkbox(label="結合時に自動で元ファイルを削除", value=False, interactive=True)
preview_audio_player = gr.Audio(label="結合結果プレビュー", interactive=False, type="filepath")
preview_download_button = gr.DownloadButton("プレビューをダウンロード", visible=False)
ITEMS_PER_COLUMN = 4
for i in range(MAX_WORKBENCH_ITEMS):
parent_column = left_workbench_col if i < ITEMS_PER_COLUMN else right_workbench_col
with parent_column:
with gr.Column(visible=False, elem_classes="workbench-item-container") as item_container:
with gr.Row(elem_classes="workbench-top-row"):
with gr.Column(scale=1, min_width=40):
item_num_display = gr.Markdown(f"**{i+1}**", elem_classes=["text-center"])
with gr.Column(scale=4, min_width=160):
audio = gr.Audio(label=f"音声 {i+1}", interactive=False, type="filepath")
with gr.Column(scale=5):
info = gr.Markdown()
with gr.Row(elem_classes="workbench-buttons-row"):
download = gr.DownloadButton("ダウンロード", size="sm", visible=False)
delete_btn = gr.Button("削除", variant="primary", size="sm")
workbench_items.append({"container": item_container, "item_num_display": item_num_display, "audio": audio, "download": download, "info": info, "delete_btn": delete_btn})
for item in workbench_items:
all_workbench_ui_components.extend([item["container"], item["item_num_display"], item["audio"], item["download"], item["info"]])
# --- UIイベントハンドラ関数 (action_refresh_model_list を修正) ---
def load_styles_for_ui(selected_model_name: Optional[str]):
if not selected_model_name: return gr.update(choices=[], value=None), gr.update(value=DEFAULT_STYLE_WEIGHT), {}
model_path = assets_root_path / selected_model_name
styles_map = load_styles_from_model_folder(model_path)
display_names = [data.get("display_name", key) for key, data in styles_map.items()]
default_display_name, default_weight = None, DEFAULT_STYLE_WEIGHT
if DEFAULT_STYLE in styles_map:
default_display_name = styles_map[DEFAULT_STYLE].get("display_name", DEFAULT_STYLE)
default_weight = styles_map[DEFAULT_STYLE].get("weight", DEFAULT_STYLE_WEIGHT)
elif display_names:
first_key = next(iter(styles_map))
default_display_name = styles_map[first_key].get("display_name", first_key)
default_weight = styles_map[first_key].get("weight", DEFAULT_STYLE_WEIGHT)
return gr.update(choices=display_names, value=default_display_name), gr.update(value=default_weight), styles_map
def action_refresh_model_list(use_fn_model_mode: bool, use_symlink_mode: bool):
"""モデルリストを再読み込みし、UIとバックエンドの状態を同期させる。"""
MERGER_CACHE_PATH = Path("/tmp/sbv2_merger_cache")
if use_fn_model_mode:
use_symlink_mode = False
if assets_root_path.exists():
for item in assets_root_path.iterdir():
if item.is_symlink():
try:
item.unlink()
except OSError as e:
if ENABLE_LOGGING:
print(f"Failed to remove symlink {item}: {e}")
if use_symlink_mode:
if MERGER_CACHE_PATH.exists() and MERGER_CACHE_PATH.is_dir():
for item in MERGER_CACHE_PATH.iterdir():
if item.is_dir() and item.name != 'whisper':
target_link = assets_root_path / item.name
if not target_link.exists():
try:
os.symlink(item, target_link)
except OSError as e:
if ENABLE_LOGGING:
print(f"Warning: Could not create symlink for {item.name}: {e}")
else:
if ENABLE_LOGGING:
print(f"Warning: Symlink mode is on, but {MERGER_CACHE_PATH} does not exist or is not a directory.")
model_holder.refresh()
fn_model_pattern = re.compile(r'^FN([1-9]|10)$')
current_available_models = model_holder.model_names
final_choices = []
final_value_for_style_load = None
if use_fn_model_mode:
ui_model_list = [name for name in current_available_models if fn_model_pattern.match(name)]
final_choices = sort_models_by_custom_order(ui_model_list, FN_MODE_MODEL_ORDER)
elif use_symlink_mode:
ui_model_list_names = [p.name for p in assets_root_path.iterdir() if p.is_symlink()]
final_choices = format_and_sort_model_names(ui_model_list_names)
else:
ui_model_list = [
name for name in current_available_models
if name != 'whisper'
and not fn_model_pattern.match(name)
and not (assets_root_path / name).is_symlink()
]
final_choices = sort_models_by_custom_order(ui_model_list, NORMAL_MODE_MODEL_ORDER)
if not final_choices:
# 選択肢が空の場合、エラーを防ぐためにダミー項目を設定し、ドロップダウンを無効化
model_dropdown_update = gr.update(
choices=["(利用可能なモデルがありません)"],
value="(利用可能なモデルがありません)",
interactive=False
)
final_value_for_style_load = None
else:
# 選択肢がある場合、通常通り設定
is_tuple_choices = isinstance(final_choices[0], tuple)
actual_value = final_choices[0][1] if is_tuple_choices else final_choices[0]
model_dropdown_update = gr.update(
choices=final_choices,
value=actual_value,
interactive=True
)
final_value_for_style_load = actual_value
style_dropdown_update, style_weight_update, styles_data_state_update = load_styles_for_ui(final_value_for_style_load)
return model_dropdown_update, style_dropdown_update, style_weight_update, styles_data_state_update
def on_model_select_change(selected_model_name: Optional[str]):
style_dropdown_update, style_weight_update, styles_data_state_update = load_styles_for_ui(selected_model_name)
return style_dropdown_update, style_weight_update, styles_data_state_update
def on_style_dropdown_select(selected_display_name: Optional[str], styles_data: Dict[str, Any]):
if not selected_display_name or not styles_data: return gr.update(value=DEFAULT_STYLE_WEIGHT)
for _, data in styles_data.items():
if data.get("display_name") == selected_display_name:
return gr.update(value=data.get("weight", DEFAULT_STYLE_WEIGHT))
return gr.update(value=DEFAULT_STYLE_WEIGHT)
def action_run_synthesis(
model_name: Optional[str],
style_display_name: Optional[str], style_weight_for_synth: float,
text: str, generation_mode: str, batch_count: int,
lang: str, seed: int, speaker: str, ref_audio: Optional[str],
length: float, pitch: float, intonation:float,
noise:float, noise_w:float, sdp_r:float,
use_assist:bool, assist_text:Optional[str], assist_w:float,
random_text_mode: int, random_text_ratio_str: str,
styles_data: Dict[str, Any],
progress=gr.Progress(track_tqdm=True)
):
error_outputs = []
error_outputs.append("エラーが発生しました。") # status_textbox
error_outputs.append(gr.update(visible=False)) # audio_output_area
for _ in range(MAX_AUDIO_OUTPUTS):
error_outputs.extend([
gr.update(visible=False),
gr.update(value=None),
gr.update(value=None, visible=False),
])
for _ in range(ITEMS_PER_ROW - 1):
error_outputs.append(gr.update(visible=False))
for _ in range(MAX_AUDIO_OUTPUTS):
error_outputs.append("")
error_outputs.append([])
if re.search(INVALID_FILENAME_CHARS_PATTERN, text):
found_chars = "".join(sorted(list(set(re.findall(INVALID_FILENAME_CHARS_PATTERN, text)))))
error_outputs[0] = f"❌ [エラー] テキストに使用できない文字が含まれています: {found_chars}"
return tuple(error_outputs)
if not model_name or model_name == "(利用可能なモデルがありません)": # ダミー項目もチェック
error_outputs[0] = "❌ [エラー] モデルが選択されていません。"
return tuple(error_outputs)
if not text.strip():
error_outputs[0] = "❌ [エラー] テキストが入力されていません。"
return tuple(error_outputs)
if len(text) > MAX_TEXT_LENGTH:
error_outputs[0] = f"❌ [エラー] テキストが長すぎます。{MAX_TEXT_LENGTH}文字以下にしてください。(現在: {len(text)}文字)"
return tuple(error_outputs)
if not style_display_name:
error_outputs[0] = "❌ [エラー] スタイルが選択されていません。"
return tuple(error_outputs)
internal_style_key = None
for key, data in styles_data.items():
if data.get("display_name") == style_display_name: internal_style_key = key; break
if not internal_style_key:
error_outputs[0] = f"❌ [エラー] スタイル '{style_display_name}' の内部キーが見つかりません。"
return tuple(error_outputs)
all_logs = []
model_path = assets_root_path / model_name
files = find_safetensors_files_webui(str(model_path))
if not files:
error_outputs[0] = f"❌ [エラー] モデルフォルダ '{model_name}' に .safetensors ファイルが見つかりません。"
return tuple(error_outputs)
actual_model_file_to_load = str(model_path / files[0])
if ENABLE_LOGGING:
all_logs.append(f"[自動選択] 使用モデルファイル: {files[0]}")
batch_count = int(batch_count)
if batch_count <= 0: batch_count = 1
final_wav_paths = []
final_mp3_paths = []
generated_texts = []
def save_audio_files(audio_segment: AudioSegment, base_filename: str) -> Optional[Tuple[str, str]]:
try:
temp_dir = Path(tempfile.gettempdir())
output_path_wav = temp_dir / f"{base_filename}.wav"
count = 1
while output_path_wav.exists():
output_path_wav = temp_dir / f"{base_filename}-{count}.wav"
count += 1
output_path_mp3 = output_path_wav.with_suffix('.mp3')
audio_segment.export(output_path_wav, format="wav")
audio_segment.export(output_path_mp3, format="mp3", bitrate="192k")
return str(output_path_wav), str(output_path_mp3)
except Exception as e:
all_logs.append(f"❌ [エラー] 一時音声ファイルの保存に失敗: {e}")
return None
if generation_mode == "発音ガチャ2":
try:
ratio_list = [float(x.strip()) for x in random_text_ratio_str.split(',') if x.strip()]
if not ratio_list:
ratio_list = [0.5]
all_logs.append("⚠️ [警告] カタカナ化の割合に有効な数値が指定されなかったため、0.5 を使用します。")
except ValueError:
ratio_list = [0.5]
all_logs.append("⚠️ [警告] カタカナ化の割合の解析に失敗したため、0.5 を使用します。")
internal_mode = int(random_text_mode) + 1
if ENABLE_LOGGING:
all_logs.append(f"--- テキスト変換モード ---")
all_logs.append(f"粒度: {random_text_mode} (内部モード: {internal_mode}), カタカナ化割合候補: {ratio_list}")
generated_variations: Dict[str, List[str]] = {}
max_attempts = batch_count * 20
for _ in progress.tqdm(range(max_attempts), desc="テキストバリエーション生成中", total=max_attempts):
if len(generated_variations) >= batch_count: break
processed_blocks_list = generate_one_variation(text, internal_mode, random.choice(ratio_list))
if not processed_blocks_list: continue
temp_text = "".join(processed_blocks_list)
final_text = re.sub(r'\([^)]*\)', '', temp_text).strip()
if final_text and final_text not in generated_variations:
generated_variations[final_text] = processed_blocks_list
if len(generated_variations) < batch_count:
all_logs.append(f"⚠️ [警告] {batch_count}個のユニークなテキストを生成できませんでした。({len(generated_variations)}個のみ生成)")
for i, (final_text, processed_blocks_list) in enumerate(progress.tqdm(generated_variations.items(), desc=f"{len(generated_variations)}件の音声を生成中")):
if ENABLE_LOGGING:
all_logs.append(f"--- 生成 {i+1}/{len(generated_variations)} ---")
all_logs.append(f" ┠ 分割パターン: {' / '.join(processed_blocks_list)}")
all_logs.append(f" ┗ 合成テキスト: \"{final_text[:50]}{'...' if len(final_text)>50 else ''}\"")
success, logs, audio_tuple = process_single_synthesis_webui(model_holder, model_name, actual_model_file_to_load, final_text, lang, speaker or None, internal_style_key, style_display_name, style_weight_for_synth, -1, ref_audio or None, length, noise, noise_w, sdp_r, pitch, intonation, use_assist, assist_text or None, assist_w)
all_logs.extend([f" {log}" for log in logs])
if success and audio_tuple:
all_logs.append(f"✅ 音声合成が完了しました。(変換後テキスト: 「{final_text}」)")
sr, audio_data = audio_tuple
audio_segment = AudioSegment(data=audio_data.tobytes(), sample_width=audio_data.dtype.itemsize, frame_rate=sr, channels=1)
sanitized_model_name = sanitize_filename(model_name)
sanitized_style_name = sanitize_filename(style_display_name)
style_weight_str = f"{style_weight_for_synth:.1f}".replace('.', '.')
text_for_filename = sanitize_filename(text[:30]) if text else "no-text"
base_filename = f"{sanitized_model_name}-{sanitized_style_name}-{style_weight_str}-{text_for_filename}"
saved_paths = save_audio_files(audio_segment, base_filename)
if saved_paths:
final_wav_paths.append(saved_paths[0])
final_mp3_paths.append(saved_paths[1])
generated_texts.append(text)
if len(final_wav_paths) == 0:
all_logs.append("ℹ️ 音声は生成されませんでした。")
else: # 発音ガチャ1 モード
if ENABLE_LOGGING:
all_logs.append("--- 標準モード ---")
start_seed = int(seed)
for i in progress.tqdm(range(batch_count), desc=f"{batch_count}件の音声を生成中"):
current_seed = start_seed + i if start_seed >= 0 else -1
if ENABLE_LOGGING:
all_logs.append(f"--- 生成 {i+1}/{batch_count} (Seed: {current_seed if current_seed >= 0 else 'Random'}) ---")
all_logs.append(f" ┗ 合成テキスト: \"{text[:50]}{'...' if len(text)>50 else ''}\"")
success, logs, audio_tuple = process_single_synthesis_webui(model_holder, model_name, actual_model_file_to_load, text, lang, speaker or None, internal_style_key, style_display_name, style_weight_for_synth, current_seed, ref_audio or None, length, noise, noise_w, sdp_r, pitch, intonation, use_assist, assist_text or None, assist_w)
all_logs.extend([f" {log}" for log in logs])
if success and audio_tuple:
sr, audio_data = audio_tuple
audio_segment = AudioSegment(data=audio_data.tobytes(), sample_width=audio_data.dtype.itemsize, frame_rate=sr, channels=1)
sanitized_model_name = sanitize_filename(model_name)
sanitized_style_name = sanitize_filename(style_display_name)
style_weight_str = f"{style_weight_for_synth:.1f}".replace('.', '.')
text_for_filename = sanitize_filename(text[:30]) if text else "no-text"
base_filename = f"{sanitized_model_name}-{sanitized_style_name}-{style_weight_str}-{text_for_filename}"
saved_paths = save_audio_files(audio_segment, base_filename)
if saved_paths:
final_wav_paths.append(saved_paths[0])
final_mp3_paths.append(saved_paths[1])
generated_texts.append(text)
num_generated = len(final_wav_paths)
if num_generated > 0:
all_logs.append(f"✅ {num_generated}件の音声合成が完了しました。")
else:
all_logs.append("ℹ️ 音声は生成されませんでした。")
final_outputs = []
if ENABLE_LOGGING:
status_message = "\n".join(all_logs)
else:
essential_logs = [log for log in all_logs if any(prefix in log for prefix in ["✅", "❌", "⚠️", "ℹ️"])]
status_message = "\n".join(essential_logs)
final_outputs.append(status_message)
num_generated = len(final_wav_paths)
final_outputs.append(gr.update(visible=num_generated > 0))
for i in range(MAX_AUDIO_OUTPUTS):
is_visible = i < num_generated
mp3_val = final_mp3_paths[i] if is_visible else None
wav_val = final_wav_paths[i] if is_visible else None
final_outputs.append(gr.update(visible=is_visible))
final_outputs.append(gr.update(value=mp3_val))
final_outputs.append(gr.update(value=wav_val, visible=is_visible))
num_dummies_needed = (ITEMS_PER_ROW - (num_generated % ITEMS_PER_ROW)) % ITEMS_PER_ROW if num_generated > 0 else 0
for i in range(ITEMS_PER_ROW - 1):
final_outputs.append(gr.update(visible=i < num_dummies_needed))
for i in range(MAX_AUDIO_OUTPUTS):
text_val = generated_texts[i] if i < num_generated else ""
final_outputs.append(text_val)
final_outputs.append(final_wav_paths)
return tuple(final_outputs)
def add_to_workbench(
current_status: str,
current_workbench_list: List[Dict],
wav_audio_path: Optional[str],
text: str, model: str, style_display_name: str, style_weight: float
) -> Tuple:
log_messages = []
safe_workbench_list = current_workbench_list or []
if not wav_audio_path or not Path(wav_audio_path).exists():
log_messages.append("⚠️ [キープ追加エラー] 追加する音声ファイル(WAV)が見つかりません。")
final_status = "\n".join(log_messages) if not ENABLE_LOGGING else (current_status + "\n" + "\n".join(log_messages)).strip()
return (final_status, safe_workbench_list) + update_workbench_ui(safe_workbench_list)
if any(item['audio_path'] == wav_audio_path for item in safe_workbench_list):
log_messages.append("ℹ️ この音声はすでにキープに存在します。")
final_status = "\n".join(log_messages) if not ENABLE_LOGGING else (current_status + "\n" + "\n".join(log_messages)).strip()
return (final_status, safe_workbench_list) + update_workbench_ui(safe_workbench_list)
display_model_name = model
parsed_result = parse_merged_model_name(model)
if parsed_result: display_model_name, _ = parsed_result
new_item = {"audio_path": wav_audio_path, "text": text, "model": display_model_name, "original_models": [model], "style": style_display_name, "style_weight": style_weight, "timestamp": datetime.datetime.now(JST).isoformat(), "is_merged": False}
updated_list = safe_workbench_list + [new_item]
if len(updated_list) > MAX_WORKBENCH_ITEMS:
item_to_remove = updated_list.pop(0)
try:
path_to_delete_wav = Path(item_to_remove['audio_path'])
path_to_delete_mp3 = path_to_delete_wav.with_suffix('.mp3')
if path_to_delete_wav.exists() and str(path_to_delete_wav.parent) == tempfile.gettempdir(): path_to_delete_wav.unlink()
if path_to_delete_mp3.exists() and str(path_to_delete_mp3.parent) == tempfile.gettempdir(): path_to_delete_mp3.unlink()
except Exception as e:
if ENABLE_LOGGING:
print(f"Warning: Failed to delete old workbench audio file: {e}")
log_messages.append(f"ℹ️ キープのアイテムが最大数({MAX_WORKBENCH_ITEMS})に達したため、一番古いアイテムを削除しました。")
ui_updates = update_workbench_ui(updated_list)
log_messages.append("✅ キープに音声を追加しました。")
if ENABLE_LOGGING:
final_status = (current_status + "\n" + "\n".join(log_messages)).strip()
else:
essential_logs = [log for log in log_messages if any(prefix in log for prefix in ["✅", "❌", "⚠️", "ℹ️"])]
final_status = "\n".join(essential_logs).strip()
return (final_status, updated_list) + ui_updates
def remove_from_workbench(current_status: str, index_to_remove: int, current_workbench_list: List[Dict]) -> Tuple:
log_messages = []
safe_workbench_list = current_workbench_list or []
if not (0 <= index_to_remove < len(safe_workbench_list)):
final_status = current_status if ENABLE_LOGGING else ""
return (final_status, safe_workbench_list) + update_workbench_ui(safe_workbench_list)
item_to_remove = safe_workbench_list[index_to_remove]
try:
path_to_delete_wav = Path(item_to_remove['audio_path'])
path_to_delete_mp3 = path_to_delete_wav.with_suffix('.mp3')
if path_to_delete_wav.exists() and str(path_to_delete_wav.parent) == tempfile.gettempdir():
path_to_delete_wav.unlink()
if path_to_delete_mp3.exists():
path_to_delete_mp3.unlink()
log_messages.append(f"✅ キープからアイテム #{index_to_remove + 1} を削除し、一時ファイル(WAV/MP3)をクリーンアップしました。")
elif path_to_delete_wav.exists():
log_messages.append(f"✅ キープからアイテム #{index_to_remove + 1} を削除しました。(ファイルは保持: {path_to_delete_wav.name})")
else:
log_messages.append(f"✅ キープからアイテム #{index_to_remove + 1} を削除しました。(関連ファイルなし)")
except Exception as e: log_messages.append(f"⚠️ キープのアイテム #{index_to_remove + 1} のファイル削除中にエラー: {e}")
updated_list = [item for i, item in enumerate(safe_workbench_list) if i != index_to_remove]
ui_updates = update_workbench_ui(updated_list)
if ENABLE_LOGGING:
final_status = (current_status + "\n" + "\n".join(log_messages)).strip()
else:
essential_logs = [log for log in log_messages if any(prefix in log for prefix in ["✅", "❌", "⚠️", "ℹ️"])]
final_status = "\n".join(essential_logs).strip()
return (final_status, updated_list) + ui_updates
def action_merge_preview(current_status: str, first_audio_num: int, second_audio_num: int, pause_ms: int, workbench_list: List[Dict], progress=gr.Progress(track_tqdm=True)):
log_messages = []
def create_error_return():
if ENABLE_LOGGING:
final_status = (current_status + "\n" + "\n".join(log_messages)).strip()
else:
essential_logs = [log for log in log_messages if any(prefix in log for prefix in ["✅", "❌", "⚠️", "ℹ️"])]
final_status = "\n".join(essential_logs).strip()
return (final_status, None, gr.update(value=None, visible=False), {})
if not workbench_list:
log_messages.append("⚠️ [結合プレビュー警告] キープに音声がありません。")
return create_error_return()
idx1, idx2 = int(first_audio_num) - 1, int(second_audio_num) - 1
if not (0 <= idx1 < len(workbench_list) and 0 <= idx2 < len(workbench_list)):
log_messages.append(f"⚠️ [結合プレビュー警告] 指定された番号(#{first_audio_num}, #{second_audio_num})の音声が見つかりません。")
return create_error_return()
item1, item2 = workbench_list[idx1], workbench_list[idx2]
audio_path1, audio_path2 = item1.get("audio_path"), item2.get("audio_path")
if not audio_path1 or not Path(audio_path1).exists() or not audio_path2 or not Path(audio_path2).exists():
log_messages.append("❌ [結合プレビューエラー] 音声ファイル(WAV)が見つかりません。ファイルが削除された可能性があります。")
return create_error_return()
progress(0, desc="結合準備中...")
try:
segment1, segment2 = AudioSegment.from_file(audio_path1), AudioSegment.from_file(audio_path2)
pause_duration = int(pause_ms)
if pause_duration >= 0:
combined_audio = segment1 + AudioSegment.silent(duration=pause_duration) + segment2
if ENABLE_LOGGING: log_messages.append(f"音声 #{first_audio_num} と #{second_audio_num}{pause_duration}ms のポーズを挟んで結合しました。")
else:
overlap_duration = abs(pause_duration)
max_possible_overlap = min(len(segment1), len(segment2))
if overlap_duration > max_possible_overlap:
log_messages.append(f"ℹ️ オーバーラップ長({overlap_duration}ms)が可能な最大値({max_possible_overlap}ms)を超えるため、自動的に調整されました。")
overlap_duration = max_possible_overlap
combined_audio = AudioSegment.silent(duration=len(segment1) + len(segment2) - overlap_duration)
combined_audio = combined_audio.overlay(segment1, position=0).overlay(segment2, position=len(segment1) - overlap_duration)
if ENABLE_LOGGING: log_messages.append(f"音声 #{first_audio_num} と #{second_audio_num}{overlap_duration}ms 重ねて(オーバーレイして)結合しました。")
progress(1, desc="結合完了")
except Exception as e:
log_messages.append(f"❌ [結合プレビューエラー] 音声の結合中にエラーが発生しました: {e}")
return create_error_return()
# --- 新しいファイル名生成ロジック ---
# 1. モデル名の収集と結合
original_models1 = item1.get('original_models', [])
original_models2 = item2.get('original_models', [])
all_original_models_set = set(original_models1 + original_models2)
sorted_original_models = sorted(list(all_original_models_set))
model_part = "_".join([sanitize_filename(name) for name in sorted_original_models])
# 2. テキストの収集と結合
text1 = item1.get('text', '')
text2 = item2.get('text', '')
combined_text = f"{text1}_{text2}"
text_part = sanitize_filename(combined_text[:50]) # 50文字に制限
# 3. ベースファイル名の作成とフォールバック
base_filename = f"{model_part}-{text_part}" if model_part and text_part else f"merged_{uuid.uuid4().hex[:8]}"
# 4. 一時ファイルのパスを決定(重複回避)
temp_dir = Path(tempfile.gettempdir())
wav_temp_path = temp_dir / f"{base_filename}.wav"
count = 1
while wav_temp_path.exists():
wav_temp_path = temp_dir / f"{base_filename}-{count}.wav"
count += 1
mp3_temp_path = wav_temp_path.with_suffix('.mp3')
combined_audio.export(wav_temp_path, format="wav")
combined_audio.export(mp3_temp_path, format="mp3", bitrate="192k")
display_models1, display_models2 = item1.get('model', '').split(' | '), item2.get('model', '').split(' | ')
all_display_models = {m.strip() for m in display_models1 + display_models2 if m.strip()}
metadata = {
"text": f"{item1.get('text', '')} | {item2.get('text', '')}",
"display_models": sorted(list(all_display_models)),
"original_models": sorted_original_models,
"audio_path": str(wav_temp_path),
"timestamp": datetime.datetime.now(JST).isoformat()
}
log_messages.append("✅ 結合プレビューが生成されました。")
if ENABLE_LOGGING:
final_status = (current_status + "\n" + "\n".join(log_messages)).strip()
else:
essential_logs = [log for log in log_messages if any(prefix in log for prefix in ["✅", "❌", "⚠️", "ℹ️"])]
final_status = "\n".join(essential_logs).strip()
return final_status, str(mp3_temp_path), gr.update(value=str(wav_temp_path), visible=True), metadata
def action_add_merged_to_workbench(current_status: str, preview_data: Dict, current_workbench_list: List[Dict], delete_originals: bool, first_audio_num: int, second_audio_num: int) -> Tuple:
log_messages = []
safe_workbench_list = current_workbench_list or []
def create_error_return():
if ENABLE_LOGGING:
final_status = (current_status + "\n" + "\n".join(log_messages)).strip()
else:
essential_logs = [log for log in log_messages if any(prefix in log for prefix in ["✅", "❌", "⚠️", "ℹ️"])]
final_status = "\n".join(essential_logs).strip()
return (final_status, safe_workbench_list) + update_workbench_ui(safe_workbench_list)
if not preview_data or "audio_path" not in preview_data:
log_messages.append("⚠️ [キープ追加エラー] 追加する結合済み音声がありません。先にプレビューを生成してください。")
return create_error_return()
src_path = Path(preview_data["audio_path"])
if not src_path.exists():
log_messages.append("⚠️ [キープ追加エラー] 結合済み音声ファイルが見つかりません。")
return create_error_return()
new_merged_item = {"audio_path": str(src_path), "text": preview_data.get("text", "N/A"), "model": " | ".join(preview_data.get("display_models", [])), "original_models": preview_data.get("original_models", []), "style": "N/A", "style_weight": 0.0, "timestamp": preview_data.get("timestamp"), "is_merged": True}
final_workbench_list = []
if delete_originals:
idx1, idx2 = int(first_audio_num) - 1, int(second_audio_num) - 1
indices_to_remove = {idx1, idx2}
items_to_delete, remaining_list = [], []
for i, item in enumerate(safe_workbench_list):
if i in indices_to_remove: items_to_delete.append(item)
else: remaining_list.append(item)
for item_to_remove in items_to_delete:
try:
path_to_delete_wav = Path(item_to_remove['audio_path'])
path_to_delete_mp3 = path_to_delete_wav.with_suffix('.mp3')
if path_to_delete_wav.exists() and str(path_to_delete_wav.parent) == tempfile.gettempdir(): path_to_delete_wav.unlink()
if path_to_delete_mp3.exists() and str(path_to_delete_mp3.parent) == tempfile.gettempdir(): path_to_delete_mp3.unlink()
except Exception as e: log_messages.append(f"⚠️ 元の音声ファイル削除中にエラー: {e}")
final_workbench_list = [new_merged_item] + remaining_list
log_messages.append(f"✅ 結合音声をキープに追加し、元の音声(#{idx1+1}, #{idx2+1})を削除しました。")
else:
final_workbench_list = [new_merged_item] + safe_workbench_list
log_messages.append("✅ 結合済みの音声をキープの一番上に追加しました。")
if len(final_workbench_list) > MAX_WORKBENCH_ITEMS:
item_to_remove = final_workbench_list.pop(-1)
try:
path_to_delete_wav = Path(item_to_remove['audio_path'])
path_to_delete_mp3 = path_to_delete_wav.with_suffix('.mp3')
if path_to_delete_wav.exists() and str(path_to_delete_wav.parent) == tempfile.gettempdir(): path_to_delete_wav.unlink()
if path_to_delete_mp3.exists() and str(path_to_delete_mp3.parent) == tempfile.gettempdir(): path_to_delete_mp3.unlink()
except Exception as e:
if ENABLE_LOGGING:
print(f"Warning: Failed to delete old workbench audio file: {e}")
log_messages.append(f"ℹ️ キープが最大数({MAX_WORKBENCH_ITEMS})に達したため一番古いアイテムを削除しました。")
ui_updates = update_workbench_ui(final_workbench_list)
if ENABLE_LOGGING:
final_status = (current_status + "\n" + "\n".join(log_messages)).strip()
else:
essential_logs = [log for log in log_messages if any(prefix in log for prefix in ["✅", "❌", "⚠️", "ℹ️"])]
final_status = "\n".join(essential_logs).strip()
return (final_status, final_workbench_list) + ui_updates
# --- イベントリスナー接続 ---
def on_fn_mode_change(is_fn_mode_on: bool) -> gr.Checkbox:
if is_fn_mode_on: return gr.update(value=False)
return gr.update()
def on_symlink_mode_change(is_symlink_mode_on: bool) -> gr.Checkbox:
if is_symlink_mode_on: return gr.update(value=False)
return gr.update()
refresh_inputs = [use_fn_model_mode_checkbox, use_symlink_mode_checkbox]
refresh_outputs = [selected_model_dropdown, current_styles_dropdown, style_weight_for_synth_slider, all_styles_data_state]
use_fn_model_mode_checkbox.change(on_fn_mode_change, inputs=[use_fn_model_mode_checkbox], outputs=[use_symlink_mode_checkbox]).then(action_refresh_model_list, inputs=refresh_inputs, outputs=refresh_outputs)
use_symlink_mode_checkbox.change(on_symlink_mode_change, inputs=[use_symlink_mode_checkbox], outputs=[use_fn_model_mode_checkbox]).then(action_refresh_model_list, inputs=refresh_inputs, outputs=refresh_outputs)
refresh_model_list_button.click(fn=action_refresh_model_list, inputs=refresh_inputs, outputs=refresh_outputs)
app.load(fn=action_refresh_model_list, inputs=refresh_inputs, outputs=refresh_outputs)
selected_model_dropdown.change(on_model_select_change, inputs=[selected_model_dropdown], outputs=[current_styles_dropdown, style_weight_for_synth_slider, all_styles_data_state])
current_styles_dropdown.change(on_style_dropdown_select, inputs=[current_styles_dropdown, all_styles_data_state], outputs=[style_weight_for_synth_slider])
use_assist_text_checkbox.change(lambda x: (gr.update(visible=x), gr.update(visible=x)), inputs=[use_assist_text_checkbox], outputs=[assist_text_textbox, assist_text_weight_slider])
generate_outputs = [status_textbox, audio_output_area]
for i in range(MAX_AUDIO_OUTPUTS):
generate_outputs.extend([audio_item_columns[i], audio_outputs[i], download_buttons[i]])
generate_outputs.extend(dummy_audio_item_columns)
generate_outputs.extend(synthesized_text_states)
generate_outputs.append(synthesized_wav_files_state)
generate_button.click(
fn=action_run_synthesis,
inputs=[
selected_model_dropdown,
current_styles_dropdown, style_weight_for_synth_slider,
text_input, generation_mode_radio, batch_count_slider,
language_dropdown, seed_input, speaker_name_textbox,
reference_audio_input,
length_scale_slider, pitch_scale_slider, intonation_scale_slider,
noise_scale_slider, noise_scale_w_slider, sdp_ratio_slider,
use_assist_text_checkbox, assist_text_textbox, assist_text_weight_slider,
random_text_mode_slider, random_text_ratio_textbox,
all_styles_data_state
],
outputs=generate_outputs
)
for i in range(MAX_AUDIO_OUTPUTS):
to_workbench_buttons[i].click(
fn=lambda current_status, workbench_list, text, model, style_display, style_weight, all_wavs, idx=i: \
add_to_workbench(
current_status, workbench_list,
all_wavs[idx] if all_wavs and idx < len(all_wavs) else None,
text, model, style_display, style_weight
),
inputs=[
status_textbox, workbench_state, synthesized_text_states[i],
selected_model_dropdown, current_styles_dropdown, style_weight_for_synth_slider,
synthesized_wav_files_state
],
outputs=[status_textbox, workbench_state] + all_workbench_ui_components
)
for i, item in enumerate(workbench_items):
item["delete_btn"].click(
fn=lambda s, w, current_i=i: remove_from_workbench(s, current_i, w),
inputs=[status_textbox, workbench_state],
outputs=[status_textbox, workbench_state] + all_workbench_ui_components,
)
merge_preview_button.click(
fn=action_merge_preview,
inputs=[
status_textbox,
first_audio_num_input,
second_audio_num_input,
merge_pause_input,
workbench_state
],
outputs=[status_textbox, preview_audio_player, preview_download_button, merged_preview_state]
)
add_merged_to_workbench_button.click(
fn=action_add_merged_to_workbench,
inputs=[
status_textbox,
merged_preview_state,
workbench_state,
delete_originals_checkbox,
first_audio_num_input,
second_audio_num_input
],
outputs=[status_textbox, workbench_state] + all_workbench_ui_components
)
player_width_slider.release(lambda w: f"<script>document.documentElement.style.setProperty('--audio-width', '{w}px');</script>", inputs=[player_width_slider], outputs=[js_injector_html])
return app
# --- アプリケーションの起動 ---
if __name__ == "__main__":
if Path("model_assets").exists(): shutil.rmtree("model_assets")
merger_cache_path = Path("/tmp/sbv2_merger_cache")
mock_model_holder = TTSModelHolder()
if ENABLE_LOGGING:
print(f"Initial models loaded by TTSModelHolder: {mock_model_holder.model_names}")
app = create_synthesis_app(mock_model_holder)
assets_dir_path = assets_root_path.resolve()
assets_dir_path.mkdir(exist_ok=True)
allowed_paths = [str(assets_dir_path)]
if sys.platform != "win32" and merger_cache_path.is_dir():
allowed_paths.append(str(merger_cache_path.resolve()))
output_dir_path = Path("output").resolve()
output_dir_path.mkdir(exist_ok=True, parents=True)
allowed_paths.append(str(output_dir_path))
allowed_paths.append(tempfile.gettempdir())
print(f"Gradioに次のパスへのアクセスを許可します: {', '.join(allowed_paths)}")
app.launch(allowed_paths=allowed_paths)