Spaces:
Runtime error
Runtime error
File size: 3,882 Bytes
b3385db 2c01ee6 b3385db 2c01ee6 b3385db 2c01ee6 b3385db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
import asyncio
import json
import logging
import os
from functools import lru_cache
import edge_tts
import gradio as gr
from tts_service.utils import cache_path
from tts_service.voices import voice_manager
log = logging.getLogger(__name__)
@lru_cache(maxsize=None)
def import_voice_converter():
from rvc.infer.infer import VoiceConverter
return VoiceConverter()
# TTS
async def run_tts_script(
text: str,
voice_name: str,
rate: int = 0,
progress=gr.Progress(), # noqa: B008
) -> tuple[str, str]:
async def update_progress(pct, msg) -> None:
log.debug("Progress: %.1f%%: %s", pct * 100, msg)
progress(pct, msg)
await asyncio.sleep(0)
log.info("Synthesizing text (%s chars)", len(text))
await update_progress(0, "Starting...")
voice = voice_manager.voices[voice_name]
format = "wav"
text = text.strip()
output_tts_path = cache_path(voice.tts, "", rate, text, extension=format)
text_ptr = 0
if not os.path.exists(output_tts_path):
rates = f"+{rate}%" if rate >= 0 else f"{rate}%"
communicate = edge_tts.Communicate(text, voice.tts, rate=rates)
with open(output_tts_path, "wb") as f:
async for chunk in communicate.stream():
chunk_type = chunk["type"]
if chunk_type == "audio":
f.write(chunk["data"])
elif chunk_type == "WordBoundary":
chunk_text = chunk["text"]
text_index = text.index(chunk_text, text_ptr)
if text_index == -1:
log.warning("Extraneous text received from edge tts: %s", chunk_text)
continue
text_ptr = text_index + len(chunk_text)
pct_complete = text_ptr / len(text)
log.debug("%.1f%%: %s", pct_complete * 100, chunk)
await update_progress(pct_complete / 2, "Synthesizing...")
else:
log.warning("Unknown chunk type: %s: %s", chunk_type, json.dumps(chunk))
output_rvc_path = cache_path(voice.tts, voice.name, rate, text, extension=format)
if not os.path.exists(output_rvc_path):
infer_pipeline = import_voice_converter()
await infer_pipeline.convert_audio(
pitch=voice.pitch,
filter_radius=voice.filter_radius,
index_rate=voice.index_rate,
volume_envelope=voice.rms_mix_rate,
protect=voice.protect,
hop_length=voice.hop_length,
f0_method=voice.f0_method,
audio_input_path=str(output_tts_path),
audio_output_path=str(output_rvc_path),
model_path=voice.model,
index_path=voice.index,
split_audio=True,
f0_autotune=voice.autotune is not None,
f0_autotune_strength=voice.autotune,
clean_audio=voice.clean is not None,
clean_strength=voice.clean,
export_format=format.upper(),
upscale_audio=voice.upscale,
f0_file=None,
embedder_model=voice.embedder_model,
embedder_model_custom=None,
sid=0,
formant_shifting=None,
formant_qfrency=None,
formant_timbre=None,
post_process=None,
reverb=None,
pitch_shift=None,
limiter=None,
gain=None,
distortion=None,
chorus=None,
bitcrush=None,
clipping=None,
compressor=None,
delay=None,
sliders=None,
callback=lambda pct: update_progress(0.5 + pct / 2, "Converting..."),
)
log.info("Successfully synthesized text (%s chars)", len(text))
return "Text synthesized successfully.", str(output_rvc_path)
# Prerequisites
|