Spaces:
Running
Running
| """ | |
| Audiobook Generator - English Source to Multi-Language Audio | |
| Supports 51 languages with preset voices, voice cloning, and emotional AI voices. | |
| Deploy as a Hugging Face Space: | |
| 1. Create a new Space (SDK: Gradio) | |
| 2. Upload app.py and requirements.txt | |
| 3. Add required API secrets in Settings | |
| """ | |
| import os | |
| import base64 | |
| import json | |
| import pathlib | |
| import shutil | |
| import struct | |
| import subprocess | |
| import tempfile | |
| import time | |
| import re | |
| import gradio as gr | |
| import requests as http_requests | |
| from openai import OpenAI | |
| try: | |
| import pypdf | |
| HAS_PYPDF = True | |
| except ImportError: | |
| HAS_PYPDF = False | |
| try: | |
| import docx | |
| HAS_DOCX = True | |
| except ImportError: | |
| HAS_DOCX = False | |
| # ========================================== | |
| # CONFIGURATION | |
| # ========================================== | |
| OMNI_MODEL = "qwen3.5-omni-plus" | |
| TTS_VC_MODEL = "qwen3-tts-vc-2026-01-22" | |
| VOICE_CLONE_MODEL = "qwen-voice-enrollment" | |
| DASHSCOPE_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" | |
| DASHSCOPE_API_URL = "https://dashscope-intl.aliyuncs.com/api/v1" | |
| VOICE_CLONE_URL = f"{DASHSCOPE_API_URL}/services/audio/tts/customization" | |
| TTS_SYNTHESIS_URL = f"{DASHSCOPE_API_URL}/services/aigc/multimodal-generation/generation" | |
| # YourVoic API | |
| YOURVOIC_TTS_URL = "https://yourvoic.com/api/v1/tts/generate" | |
| YOURVOIC_VOICES_URL = "https://yourvoic.com/api/v1/voices" | |
| MAX_CHARS_PER_CHUNK = 1500 | |
| # ========================================== | |
| # LANGUAGES - split by engine | |
| # ========================================== | |
| # "engine": "qwen" = Qwen Preset + Clone, "yourvoic" = YourVoic only | |
| LANGUAGES = { | |
| # -- Qwen Core (11 languages) -- | |
| "English": {"code": "en", "engine": "qwen", "yourvoic": "en-US"}, | |
| "Chinese (Mandarin)": {"code": "zh", "engine": "qwen", "yourvoic": "zh-CN"}, | |
| "Japanese": {"code": "ja", "engine": "qwen", "yourvoic": "ja-JP"}, | |
| "Korean": {"code": "ko", "engine": "qwen", "yourvoic": "ko-KR"}, | |
| "German": {"code": "de", "engine": "qwen", "yourvoic": "de-DE"}, | |
| "French": {"code": "fr", "engine": "qwen", "yourvoic": "fr-FR"}, | |
| "Russian": {"code": "ru", "engine": "qwen", "yourvoic": "ru-RU"}, | |
| "Portuguese": {"code": "pt", "engine": "qwen", "yourvoic": "pt-BR"}, | |
| "Spanish": {"code": "es", "engine": "qwen", "yourvoic": "es-ES"}, | |
| "Italian": {"code": "it", "engine": "qwen", "yourvoic": "it-IT"}, | |
| "Arabic": {"code": "ar", "engine": "qwen", "yourvoic": "ar-SA"}, | |
| # -- YourVoic: African -- | |
| "Afrikaans": {"code": "af", "engine": "yourvoic", "yourvoic": "af-ZA"}, | |
| "Amharic": {"code": "am", "engine": "yourvoic", "yourvoic": "am-ET"}, | |
| "Swahili": {"code": "sw", "engine": "yourvoic", "yourvoic": "sw-KE"}, | |
| "Malagasy": {"code": "mg", "engine": "yourvoic", "yourvoic": "mg-MG"}, | |
| # -- YourVoic: Indian -- | |
| "Hindi": {"code": "hi", "engine": "yourvoic", "yourvoic": "hi-IN"}, | |
| "Bengali": {"code": "bn", "engine": "yourvoic", "yourvoic": "bn-IN"}, | |
| "Marathi": {"code": "mr", "engine": "yourvoic", "yourvoic": "mr-IN"}, | |
| "Telugu": {"code": "te", "engine": "yourvoic", "yourvoic": "te-IN"}, | |
| "Tamil": {"code": "ta", "engine": "yourvoic", "yourvoic": "ta-IN"}, | |
| "Gujarati": {"code": "gu", "engine": "yourvoic", "yourvoic": "gu-IN"}, | |
| "Kannada": {"code": "kn", "engine": "yourvoic", "yourvoic": "kn-IN"}, | |
| "Malayalam": {"code": "ml", "engine": "yourvoic", "yourvoic": "ml-IN"}, | |
| "Punjabi": {"code": "pa", "engine": "yourvoic", "yourvoic": "pa-IN"}, | |
| "Odia": {"code": "or", "engine": "yourvoic", "yourvoic": "or-IN"}, | |
| "Assamese": {"code": "as", "engine": "yourvoic", "yourvoic": "as-IN"}, | |
| "Sindhi": {"code": "sd", "engine": "yourvoic", "yourvoic": "sd-IN"}, | |
| # -- YourVoic: South Asian -- | |
| "Urdu": {"code": "ur", "engine": "yourvoic", "yourvoic": "ur-PK"}, | |
| "Nepali": {"code": "ne", "engine": "yourvoic", "yourvoic": "ne-NP"}, | |
| "Sinhala": {"code": "si", "engine": "yourvoic", "yourvoic": "si-LK"}, | |
| "Pashto": {"code": "ps", "engine": "yourvoic", "yourvoic": "ps-AF"}, | |
| # -- YourVoic: Southeast Asian -- | |
| "Indonesian": {"code": "id", "engine": "yourvoic", "yourvoic": "id-ID"}, | |
| "Malay": {"code": "ms", "engine": "yourvoic", "yourvoic": "ms-MY"}, | |
| "Vietnamese": {"code": "vi", "engine": "yourvoic", "yourvoic": "vi-VN"}, | |
| "Thai": {"code": "th", "engine": "yourvoic", "yourvoic": "th-TH"}, | |
| "Filipino": {"code": "fil", "engine": "yourvoic", "yourvoic": "fil-PH"}, | |
| "Javanese": {"code": "jv", "engine": "yourvoic", "yourvoic": "jv-ID"}, | |
| "Cebuano": {"code": "ceb", "engine": "yourvoic", "yourvoic": "ceb-PH"}, | |
| "Lao": {"code": "lo", "engine": "yourvoic", "yourvoic": "lo-LA"}, | |
| "Burmese": {"code": "my", "engine": "yourvoic", "yourvoic": "my-MM"}, | |
| # -- YourVoic: East Asian -- | |
| "Chinese (Taiwan)": {"code": "zh-TW", "engine": "yourvoic", "yourvoic": "zh-TW"}, | |
| "Cantonese": {"code": "yue", "engine": "yourvoic", "yourvoic": "yue-HK"}, | |
| # -- YourVoic: Middle Eastern -- | |
| "Turkish": {"code": "tr", "engine": "yourvoic", "yourvoic": "tr-TR"}, | |
| "Hebrew": {"code": "he", "engine": "yourvoic", "yourvoic": "he-IL"}, | |
| "Persian (Farsi)": {"code": "fa", "engine": "yourvoic", "yourvoic": "fa-IR"}, | |
| "Azerbaijani": {"code": "az", "engine": "yourvoic", "yourvoic": "az-AZ"}, | |
| # -- YourVoic: European -- | |
| "Dutch": {"code": "nl", "engine": "yourvoic", "yourvoic": "nl-NL"}, | |
| "Romanian": {"code": "ro", "engine": "yourvoic", "yourvoic": "ro-RO"}, | |
| "Polish": {"code": "pl", "engine": "yourvoic", "yourvoic": "pl-PL"}, | |
| "Ukrainian": {"code": "uk", "engine": "yourvoic", "yourvoic": "uk-UA"}, | |
| "Greek": {"code": "el", "engine": "yourvoic", "yourvoic": "el-GR"}, | |
| "Swedish": {"code": "sv", "engine": "yourvoic", "yourvoic": "sv-SE"}, | |
| "Serbian": {"code": "sr", "engine": "yourvoic", "yourvoic": "sr-RS"}, | |
| "Catalan": {"code": "ca", "engine": "yourvoic", "yourvoic": "ca-ES"}, | |
| "Albanian": {"code": "sq", "engine": "yourvoic", "yourvoic": "sq-AL"}, | |
| "Danish": {"code": "da", "engine": "yourvoic", "yourvoic": "da-DK"}, | |
| "Norwegian": {"code": "no", "engine": "yourvoic", "yourvoic": "nb-NO"}, | |
| "Finnish": {"code": "fi", "engine": "yourvoic", "yourvoic": "fi-FI"}, | |
| "Slovak": {"code": "sk", "engine": "yourvoic", "yourvoic": "sk-SK"}, | |
| "Belarusian": {"code": "be", "engine": "yourvoic", "yourvoic": "be-BY"}, | |
| "Armenian": {"code": "hy", "engine": "yourvoic", "yourvoic": "hy-AM"}, | |
| "Georgian": {"code": "ka", "engine": "yourvoic", "yourvoic": "ka-GE"}, | |
| # -- YourVoic: Central Asian -- | |
| "Mongolian": {"code": "mn", "engine": "yourvoic", "yourvoic": "mn-MN"}, | |
| } | |
| QWEN_LANGUAGES = {k for k, v in LANGUAGES.items() if v["engine"] == "qwen"} | |
| VOICE_CLONE_LANGUAGES = { | |
| "English", "Chinese (Mandarin)", "Japanese", "Korean", "German", | |
| "French", "Russian", "Portuguese", "Spanish", "Italian", | |
| } | |
| YOURVOIC_LANGUAGES = {k for k, v in LANGUAGES.items() if v["engine"] == "yourvoic"} | |
| PRESET_VOICES = [ | |
| "Cherry -- Sunny, friendly", "Serena -- Gentle, soft", | |
| "Jennifer -- Cinematic narrator", "Katerina -- Mature, rich rhythm", | |
| "Ethan -- Warm, energetic", "Ryan -- Dramatic, rhythmic", | |
| "Kai -- Soothing, calm", "Neil -- Precise, clear", | |
| "Lenn -- Rational, steady", "Eldric Sage -- Authoritative narrator", | |
| "Arthur -- Classic, mature", "Bella -- Elegant, warm", | |
| "Vivian -- Professional, clear", "Seren -- Calm, measured", | |
| "Dolce -- Sweet, melodic", "Bellona -- Strong, commanding", | |
| "Vincent -- Rich, theatrical", "Andre -- Deep, resonant", | |
| "Mia -- Young, versatile", "Aiden -- Young, lively", | |
| ] | |
| # YourVoic voices mapped by language | |
| # Confirmed voices + Peter as universal fallback for unconfirmed | |
| YOURVOIC_VOICE_MAP = { | |
| # Indian - confirmed from yourvoic.com | |
| "Hindi": ["Rahul", "Deepika", "Aditya"], | |
| "Bengali": ["Sneha", "Aryan"], | |
| "Marathi": ["Anjali", "Rohan"], | |
| "Telugu": ["Arjun", "Lakshmi"], | |
| "Tamil": ["Priya", "Kumar"], | |
| "Gujarati": ["Rahul", "Meera"], | |
| "Kannada": ["Divya", "Karthik"], | |
| "Malayalam": ["Nikhil", "Ammu"], | |
| "Punjabi": ["Vikram", "Simran"], | |
| "Odia": ["Kavya", "Subham"], | |
| # All other YourVoic languages use Peter as default | |
| # The retry logic will discover correct voices via API if Peter fails | |
| } | |
| # Default voices list (shown initially, updates dynamically per language) | |
| YOURVOIC_VOICES_DEFAULT = ["Peter -- Universal fallback"] | |
| def get_voices_for_language(language): | |
| """Get the voice dropdown choices for a specific language.""" | |
| voices = YOURVOIC_VOICE_MAP.get(language, []) | |
| choices = [] | |
| for v in voices: | |
| choices.append(f"{v} -- {language}") | |
| # Always add Peter as fallback | |
| if "Peter" not in [v.split(" --")[0] for v in choices]: | |
| choices.append(f"Peter -- Universal fallback") | |
| return choices | |
| def get_yourvoic_voice_for_language(language, selected_voice): | |
| """Get a valid voice name for the given language. | |
| Uses API lookup for languages without confirmed voices.""" | |
| voice_name = get_voice_name(selected_voice) | |
| valid_voices = YOURVOIC_VOICE_MAP.get(language, []) | |
| # If selected voice is confirmed valid for this language, use it | |
| if voice_name in valid_voices: | |
| return voice_name | |
| # If we have confirmed voices for this language, use the first one | |
| if valid_voices: | |
| return valid_voices[0] | |
| # No confirmed voices - query the API | |
| yourvoic_lang = LANGUAGES.get(language, {}).get("yourvoic", "en-US") | |
| api_voice = _fetch_yourvoic_voice(yourvoic_lang) | |
| if api_voice: | |
| return api_voice | |
| return "Peter" # ultimate fallback | |
| # Cache for API-fetched voices | |
| _yourvoic_voice_cache = {} | |
| def _fetch_yourvoic_voice(yourvoic_lang, model="aura-prime"): | |
| """Query YourVoic /v1/voices endpoint to get valid voices for a language + model.""" | |
| cache_key = f"{yourvoic_lang}:{model}" | |
| if cache_key in _yourvoic_voice_cache: | |
| return _yourvoic_voice_cache[cache_key] | |
| yv_key = os.environ.get("YOURVOIC_API_KEY", "") | |
| if not yv_key: | |
| return None | |
| # Try with model parameter first, then without | |
| for url_params in [ | |
| f"?language={yourvoic_lang}&model={model}", | |
| f"?language={yourvoic_lang}", | |
| ]: | |
| try: | |
| resp = http_requests.get( | |
| f"{YOURVOIC_VOICES_URL}{url_params}", | |
| headers={"X-API-Key": yv_key}, | |
| timeout=15, | |
| ) | |
| print(f"[YourVoic] Voices API {url_params}: status={resp.status_code}") | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| voices = data if isinstance(data, list) else data.get("voices", data.get("data", [])) | |
| if voices and isinstance(voices[0], dict): | |
| # Return all voice names for trying | |
| all_names = [] | |
| for v in voices[:10]: # first 10 | |
| for field in ["id", "name", "voice_id", "voice"]: | |
| if field in v and v[field]: | |
| all_names.append(str(v[field])) | |
| break | |
| if all_names: | |
| # Deduplicate preserving order | |
| seen = set() | |
| unique = [x for x in all_names if not (x in seen or seen.add(x))] | |
| print(f"[YourVoic] Available voices for {yourvoic_lang}: {unique[:5]}") | |
| _yourvoic_voice_cache[cache_key] = unique | |
| return unique | |
| except Exception as e: | |
| print(f"[YourVoic] Voice lookup failed for {yourvoic_lang}: {e}") | |
| return None | |
| def generate_speech_yourvoic_with_retry(client, text, voice, yv_model, emotion, language, lang_config, | |
| translate, api_key, chunk_index, output_dir): | |
| """Wrapper that tries multiple voice names if the first one fails.""" | |
| yourvoic_lang = lang_config.get("yourvoic", "en-US") | |
| # Get list of candidate voices | |
| candidates = [] | |
| # 1. Try hardcoded voices for this language | |
| hardcoded = YOURVOIC_VOICE_MAP.get(language, []) | |
| candidates.extend(hardcoded) | |
| # 2. Try user-selected voice | |
| user_voice = get_voice_name(voice) | |
| if user_voice not in candidates: | |
| candidates.insert(0, user_voice) | |
| # 3. Try universal English voices (work for many languages like Swahili) | |
| for universal in ["Peter", "Sarah", "Caleb"]: | |
| if universal not in candidates: | |
| candidates.append(universal) | |
| # 4. Try API-fetched voices last | |
| api_voices = _fetch_yourvoic_voice(yourvoic_lang, yv_model) | |
| if api_voices: | |
| for av in api_voices: | |
| if av not in candidates: | |
| candidates.append(av) | |
| # Deduplicate preserving order | |
| seen = set() | |
| candidates = [x for x in candidates if not (x in seen or seen.add(x))] | |
| # Try each candidate until one works | |
| for i, candidate_voice in enumerate(candidates[:8]): # try up to 8 | |
| print(f"[YourVoic] Trying voice '{candidate_voice}' for {language} (attempt {i+1})") | |
| wav_path, transcript, error = generate_speech_yourvoic( | |
| client, text, candidate_voice, yv_model, emotion, language, lang_config, | |
| translate, api_key, chunk_index, output_dir, | |
| ) | |
| if wav_path: | |
| # Cache this working voice for future chunks | |
| if language not in YOURVOIC_VOICE_MAP or not YOURVOIC_VOICE_MAP.get(language): | |
| YOURVOIC_VOICE_MAP[language] = [candidate_voice] | |
| elif candidate_voice not in YOURVOIC_VOICE_MAP[language]: | |
| YOURVOIC_VOICE_MAP[language].insert(0, candidate_voice) | |
| return wav_path, transcript, None | |
| if error and "Invalid voice name" not in str(error): | |
| # Non-voice error (credits, etc) - don't try more voices | |
| return None, transcript, error | |
| return None, text, f"No valid voice found for {language}. This language may not be supported on your plan. Tried: {candidates[:8]}" | |
| YOURVOIC_MODELS = [ | |
| "balanced -- Balanced quality and speed (recommended)", | |
| "lite -- Fast, good for previews", | |
| "premium -- Premium quality (paid plans only)", | |
| "fast -- Fast with good quality", | |
| "realtime -- Fastest, real-time apps", | |
| ] | |
| YOURVOIC_EMOTIONS = [ | |
| "neutral", "friendly", "hopeful", "cheerful", "sad", | |
| "excited", "angry", "terrified", "shouting", "whispering", | |
| ] | |
| def get_voice_name(label): | |
| return label.split("--")[0].strip() | |
| def get_yourvoic_model(label): | |
| """Map anonymous model label to actual API model name.""" | |
| name = label.split("--")[0].strip() | |
| model_map = { | |
| "balanced": "aura-prime", | |
| "lite": "aura-lite", | |
| "premium": "aura-max", | |
| "fast": "rapid-max", | |
| "realtime": "rapid-flash", | |
| } | |
| return model_map.get(name, "aura-prime") | |
| # ========================================== | |
| # AUDIO HELPERS | |
| # ========================================== | |
| def base64_to_wav(b64_data, output_path): | |
| audio_bytes = base64.b64decode(b64_data) | |
| sr, nc, bps = 24000, 1, 16 | |
| br = sr * nc * bps // 8 | |
| ba = nc * bps // 8 | |
| ds = len(audio_bytes) | |
| with open(output_path, "wb") as f: | |
| f.write(b"RIFF") | |
| f.write(struct.pack("<I", 36 + ds)) | |
| f.write(b"WAVE") | |
| f.write(b"fmt ") | |
| f.write(struct.pack("<I", 16)) | |
| f.write(struct.pack("<H", 1)) | |
| f.write(struct.pack("<H", nc)) | |
| f.write(struct.pack("<I", sr)) | |
| f.write(struct.pack("<I", br)) | |
| f.write(struct.pack("<H", ba)) | |
| f.write(struct.pack("<H", bps)) | |
| f.write(b"data") | |
| f.write(struct.pack("<I", ds)) | |
| f.write(audio_bytes) | |
| def concatenate_wavs(wav_files, output_path): | |
| if not wav_files: | |
| return | |
| if len(wav_files) == 1: | |
| shutil.copy2(wav_files[0], output_path) | |
| return | |
| list_file = output_path + ".txt" | |
| with open(list_file, "w") as f: | |
| for wav in wav_files: | |
| f.write(f"file '{wav}'\n") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-f", "concat", "-safe", "0", | |
| "-i", list_file, "-c", "copy", output_path], | |
| capture_output=True, check=True, | |
| ) | |
| os.remove(list_file) | |
| def generate_silence(duration_sec, output_path): | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-f", "lavfi", "-i", "anullsrc=r=24000:cl=mono", | |
| "-t", str(duration_sec), "-acodec", "pcm_s16le", output_path], | |
| capture_output=True, check=True, | |
| ) | |
| # ========================================== | |
| # DOCUMENT EXTRACTION | |
| # ========================================== | |
| def extract_text_from_file(filepath): | |
| ext = os.path.splitext(filepath)[1].lower() | |
| if ext == ".pdf": | |
| if not HAS_PYPDF: | |
| raise gr.Error("pypdf not installed.") | |
| reader = pypdf.PdfReader(filepath) | |
| return "\n\n".join(p.extract_text().strip() for p in reader.pages if p.extract_text()) | |
| elif ext in (".docx", ".doc"): | |
| if ext == ".doc": | |
| raise gr.Error("Please save as .docx or .pdf.") | |
| if not HAS_DOCX: | |
| raise gr.Error("python-docx not installed.") | |
| doc = docx.Document(filepath) | |
| return "\n\n".join(p.text.strip() for p in doc.paragraphs if p.text.strip()) | |
| else: | |
| with open(filepath, "r", encoding="utf-8", errors="replace") as f: | |
| return f.read() | |
| # ========================================== | |
| # TEXT SPLITTING | |
| # ========================================== | |
| def split_text_into_chunks(text, max_chars=MAX_CHARS_PER_CHUNK): | |
| text = text.strip() | |
| if not text: | |
| return [] | |
| if len(text) <= max_chars: | |
| return [text] | |
| chunks, paragraphs, current = [], re.split(r"\n\s*\n", text), "" | |
| for para in paragraphs: | |
| para = para.strip() | |
| if not para: | |
| continue | |
| if len(current) + len(para) + 2 <= max_chars: | |
| current = (current + "\n\n" + para).strip() | |
| else: | |
| if current: | |
| chunks.append(current) | |
| if len(para) > max_chars: | |
| sentences = re.split(r"(?<=[.!?])\s+", para) | |
| current = "" | |
| for s in sentences: | |
| if len(current) + len(s) + 1 <= max_chars: | |
| current = (current + " " + s).strip() | |
| else: | |
| if current: | |
| chunks.append(current) | |
| current = s | |
| else: | |
| current = para | |
| if current: | |
| chunks.append(current) | |
| return chunks | |
| # ========================================== | |
| # VOICE CLONING | |
| # ========================================== | |
| def prepare_clone_audio(audio_path): | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", audio_path], | |
| capture_output=True, text=True, | |
| ) | |
| duration = float(result.stdout.strip()) | |
| if duration < 10: | |
| raise ValueError(f"Audio too short ({duration:.1f}s). Need at least 10 seconds.") | |
| tmp = audio_path + "_prepared.wav" | |
| if duration <= 60: | |
| subprocess.run(["ffmpeg", "-y", "-i", audio_path, "-ar", "24000", "-ac", "1", | |
| "-acodec", "pcm_s16le", tmp], capture_output=True, check=True) | |
| else: | |
| start = min(5, duration - 60) | |
| subprocess.run(["ffmpeg", "-y", "-ss", str(start), "-t", "60", "-i", audio_path, | |
| "-ar", "24000", "-ac", "1", "-acodec", "pcm_s16le", tmp], | |
| capture_output=True, check=True) | |
| return tmp | |
| def clone_voice(audio_path, api_key): | |
| prepared = prepare_clone_audio(audio_path) | |
| b64 = base64.b64encode(pathlib.Path(prepared).read_bytes()).decode() | |
| try: | |
| os.remove(prepared) | |
| except OSError: | |
| pass | |
| resp = http_requests.post(VOICE_CLONE_URL, json={ | |
| "model": VOICE_CLONE_MODEL, | |
| "input": { | |
| "action": "create", "target_model": TTS_VC_MODEL, | |
| "preferred_name": "audiobook_voice", | |
| "audio": {"data": f"data:audio/wav;base64,{b64}"}, | |
| }, | |
| }, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, timeout=60) | |
| if resp.status_code != 200: | |
| raise RuntimeError(f"Voice clone failed: {resp.text[:300]}") | |
| return resp.json()["output"]["voice"] | |
| # ========================================== | |
| # TRANSLATION | |
| # ========================================== | |
| def translate_text(client, text, target_language, lang_config): | |
| response = client.chat.completions.create( | |
| model=OMNI_MODEL, modalities=["text"], | |
| messages=[ | |
| {"role": "system", "content": f"Translate English to {target_language}. Output ONLY the translation."}, | |
| {"role": "user", "content": f"Translate:\n\n{text}"}, | |
| ], | |
| ) | |
| return response.choices[0].message.content.strip() | |
| # ========================================== | |
| # TTS MODE 1: PRESET VOICE | |
| # ========================================== | |
| def generate_speech_preset(client, text, voice, language, lang_config, translate, chunk_index, output_dir): | |
| output_wav = os.path.join(output_dir, f"chunk_{chunk_index:04d}.wav") | |
| if translate and language != "English": | |
| sys_prompt = (f"Translate English to {language} " | |
| f"and narrate expressively. Respond ONLY with spoken {language} narration.") | |
| user_text = f"Translate into {language} and narrate:\n\n{text}" | |
| else: | |
| sys_prompt = "Narrate expressively as an audiobook. Respond ONLY with narration." | |
| user_text = f"Narrate:\n\n{text}" | |
| try: | |
| completion = client.chat.completions.create( | |
| model=OMNI_MODEL, | |
| messages=[{"role": "system", "content": sys_prompt}, {"role": "user", "content": user_text}], | |
| modalities=["text", "audio"], audio={"voice": voice, "format": "wav"}, | |
| stream=True, stream_options={"include_usage": True}, | |
| ) | |
| audio_parts, text_parts = [], [] | |
| for event in completion: | |
| if not event.choices: | |
| continue | |
| delta = event.choices[0].delta | |
| if hasattr(delta, "content") and delta.content: | |
| text_parts.append(delta.content) | |
| if hasattr(delta, "audio") and delta.audio: | |
| if isinstance(delta.audio, dict) and "data" in delta.audio: | |
| audio_parts.append(delta.audio["data"]) | |
| elif hasattr(delta.audio, "data") and delta.audio.data: | |
| audio_parts.append(delta.audio.data) | |
| transcript = "".join(text_parts) | |
| if audio_parts: | |
| base64_to_wav("".join(audio_parts), output_wav) | |
| return output_wav, transcript | |
| return None, "No audio received" | |
| except Exception as e: | |
| return None, str(e) | |
| # ========================================== | |
| # TTS MODE 2: CLONED VOICE | |
| # ========================================== | |
| def generate_speech_cloned(client, text, voice_id, language, lang_config, translate, api_key, chunk_index, output_dir): | |
| output_wav = os.path.join(output_dir, f"vc_chunk_{chunk_index:04d}.wav") | |
| final_text = text | |
| if translate and language != "English": | |
| final_text = translate_text(client, text, language, lang_config) | |
| lang_map = { | |
| "English": "English", "Chinese (Mandarin)": "Chinese", "Japanese": "Japanese", | |
| "Korean": "Korean", "German": "German", "French": "French", | |
| "Russian": "Russian", "Portuguese": "Portuguese", "Spanish": "Spanish", "Italian": "Italian", | |
| } | |
| resp = http_requests.post(TTS_SYNTHESIS_URL, json={ | |
| "model": TTS_VC_MODEL, | |
| "input": {"text": final_text, "voice": voice_id, "language_type": lang_map.get(language, "English")}, | |
| }, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, timeout=120) | |
| if resp.status_code != 200: | |
| return None, final_text, f"TTS failed ({resp.status_code})" | |
| audio_url = resp.json().get("output", {}).get("audio", {}).get("url") | |
| if audio_url: | |
| audio_resp = http_requests.get(audio_url, timeout=120) | |
| with open(output_wav, "wb") as f: | |
| f.write(audio_resp.content) | |
| return output_wav, final_text, None | |
| return None, final_text, "No audio URL" | |
| # ========================================== | |
| # TTS MODE 3: EMOTIONAL AI VOICES | |
| # ========================================== | |
| def generate_speech_yourvoic(client, text, voice, yv_model, emotion, language, lang_config, translate, | |
| api_key, chunk_index, output_dir): | |
| """Generate speech using emotional AI voice API.""" | |
| output_file = os.path.join(output_dir, f"yv_chunk_{chunk_index:04d}.mp3") | |
| # Translate if needed | |
| final_text = text | |
| transcript = text | |
| if translate and language != "English": | |
| try: | |
| ds_key = os.environ.get("DASHSCOPE_API_KEY", "") | |
| if ds_key and client: | |
| final_text = translate_text(client, text, language, lang_config) | |
| transcript = final_text | |
| except Exception as e: | |
| print(f"[YourVoic] Translation failed, using English: {e}") | |
| # Build request - voice is passed directly (already resolved by caller) | |
| yourvoic_lang = lang_config.get("yourvoic", "en-US") | |
| print(f"[YourVoic] Language: {language}, voice: {voice}") | |
| payload = { | |
| "text": final_text, | |
| "voice": voice, | |
| "language": yourvoic_lang, | |
| "model": yv_model, | |
| "speed": 0.9, | |
| } | |
| # Add emotion if not neutral | |
| if emotion and emotion != "neutral": | |
| payload["emotion"] = emotion | |
| headers = { | |
| "X-API-Key": api_key, | |
| "Content-Type": "application/json", | |
| } | |
| try: | |
| resp = http_requests.post(YOURVOIC_TTS_URL, json=payload, headers=headers, timeout=120) | |
| print(f"[YourVoic] Chunk {chunk_index}: status={resp.status_code}, size={len(resp.content)} bytes") | |
| if resp.status_code != 200: | |
| error_msg = resp.text[:200] | |
| print(f"[YourVoic] Error: {error_msg}") | |
| return None, transcript, f"YourVoic API error ({resp.status_code}): {error_msg}" | |
| # Check if response is JSON (contains audio_url) or direct audio bytes | |
| content_type = resp.headers.get("Content-Type", "") | |
| if "application/json" in content_type: | |
| data = resp.json() | |
| audio_url = data.get("audio_url") or data.get("url") | |
| if audio_url: | |
| audio_resp = http_requests.get(audio_url, timeout=120) | |
| with open(output_file, "wb") as f: | |
| f.write(audio_resp.content) | |
| else: | |
| return None, transcript, f"No audio URL in response: {json.dumps(data)[:200]}" | |
| else: | |
| # Direct audio bytes | |
| with open(output_file, "wb") as f: | |
| f.write(resp.content) | |
| # Convert MP3 to WAV for consistent concatenation | |
| output_wav = output_file.replace(".mp3", ".wav") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", output_file, "-ar", "24000", "-ac", "1", | |
| "-acodec", "pcm_s16le", output_wav], | |
| capture_output=True, check=True, | |
| ) | |
| return output_wav, transcript, None | |
| except Exception as e: | |
| return None, transcript, str(e) | |
| # ========================================== | |
| # MAIN PIPELINE | |
| # ========================================== | |
| def generate_audiobook(text_input, file_input, target_language, voice_mode, | |
| preset_voice_label, clone_audio, yourvoic_voice_label, | |
| yourvoic_model_label, yourvoic_emotion, | |
| add_pauses, progress=gr.Progress()): | |
| # Resolve text | |
| if file_input is not None: | |
| progress(0.02, desc="Extracting text from document...") | |
| text = extract_text_from_file(file_input) | |
| elif text_input and text_input.strip(): | |
| text = text_input.strip() | |
| else: | |
| raise gr.Error("Please provide text or upload a file.") | |
| if len(text) < 10: | |
| raise gr.Error("Text is too short.") | |
| ds_key = os.environ.get("DASHSCOPE_API_KEY", "") | |
| yv_key = os.environ.get("YOURVOIC_API_KEY", "") | |
| lang_config = LANGUAGES[target_language] | |
| lang_engine = lang_config["engine"] | |
| use_clone = voice_mode == "Clone a Voice" | |
| use_yourvoic = voice_mode == "Emotional AI" | |
| translate = target_language != "English" | |
| # Auto-correct engine if language requires it | |
| if lang_engine == "yourvoic" and not use_yourvoic: | |
| # Language only supported by YourVoic, force switch | |
| use_yourvoic = True | |
| use_clone = False | |
| elif lang_engine == "qwen" and use_yourvoic: | |
| # User chose YourVoic but language is Qwen-only — allow it since YourVoic | |
| # supports most languages, but Qwen languages also work on YourVoic | |
| pass | |
| # Validate keys | |
| if use_yourvoic: | |
| if not yv_key: | |
| raise gr.Error("Voice API key for emotional voices not set. Add YOURVOIC_API_KEY in Settings > Secrets.") | |
| if translate and not ds_key: | |
| raise gr.Error("Translation API key not set. Add DASHSCOPE_API_KEY in Settings > Secrets.") | |
| else: | |
| if not ds_key: | |
| raise gr.Error("Voice API key not set. Add DASHSCOPE_API_KEY in Settings > Secrets.") | |
| client = OpenAI(api_key=ds_key, base_url=DASHSCOPE_BASE_URL) if ds_key else None | |
| tmp_dir = tempfile.mkdtemp(prefix="audiobook_") | |
| # Voice cloning setup | |
| cloned_voice_id = None | |
| if use_clone: | |
| if clone_audio is None: | |
| raise gr.Error("Upload a voice sample for cloning.") | |
| if target_language not in VOICE_CLONE_LANGUAGES: | |
| raise gr.Error(f"Voice cloning supports: {', '.join(sorted(VOICE_CLONE_LANGUAGES))}") | |
| progress(0.03, desc="Cloning voice...") | |
| cloned_voice_id = clone_voice(clone_audio, ds_key) | |
| try: | |
| progress(0.08, desc="Splitting text...") | |
| chunks = split_text_into_chunks(text) | |
| total_chunks = len(chunks) | |
| total_chars = sum(len(c) for c in chunks) | |
| audio_files, all_transcripts = [], [] | |
| silence_path = os.path.join(tmp_dir, "silence.wav") | |
| if add_pauses: | |
| generate_silence(1.5, silence_path) | |
| for i, chunk in enumerate(chunks): | |
| frac = 0.10 + 0.78 * (i / total_chunks) | |
| progress(frac, desc=f"Narrating chunk {i+1}/{total_chunks}...") | |
| wav_path, transcript, error = None, None, None | |
| if use_yourvoic: | |
| yv_voice = yourvoic_voice_label | |
| yv_model = get_yourvoic_model(yourvoic_model_label) | |
| wav_path, transcript, error = generate_speech_yourvoic_with_retry( | |
| client, chunk, yv_voice, yv_model, yourvoic_emotion, | |
| target_language, lang_config, translate, | |
| yv_key, i, tmp_dir, | |
| ) | |
| elif use_clone: | |
| wav_path, transcript, error = generate_speech_cloned( | |
| client, chunk, cloned_voice_id, target_language, | |
| lang_config, translate, ds_key, i, tmp_dir, | |
| ) | |
| else: | |
| voice = get_voice_name(preset_voice_label) | |
| wav_path, transcript = generate_speech_preset( | |
| client, chunk, voice, target_language, | |
| lang_config, translate, i, tmp_dir, | |
| ) | |
| error = None if wav_path else transcript | |
| if wav_path: | |
| audio_files.append(wav_path) | |
| else: | |
| all_transcripts.append(f"Chunk {i+1} failed: {error}") | |
| fail_sil = os.path.join(tmp_dir, f"fail_{i:04d}.wav") | |
| generate_silence(2.0, fail_sil) | |
| audio_files.append(fail_sil) | |
| if transcript and "failed" not in str(transcript).lower(): | |
| all_transcripts.append(transcript) | |
| if add_pauses and i < total_chunks - 1 and audio_files: | |
| audio_files.append(silence_path) | |
| if not audio_files: | |
| raise gr.Error("No audio was generated.") | |
| progress(0.90, desc="Assembling audiobook...") | |
| final_audio = os.path.join(tmp_dir, "audiobook.wav") | |
| concatenate_wavs(audio_files, final_audio) | |
| progress(0.95, desc="Converting to MP3...") | |
| final_mp3 = os.path.join(tmp_dir, "audiobook.mp3") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", final_audio, "-codec:a", "libmp3lame", | |
| "-b:a", "128k", "-ar", "24000", "-ac", "1", final_mp3], | |
| capture_output=True, check=True, | |
| ) | |
| progress(1.0, desc="Done!") | |
| audio_size = os.path.getsize(final_mp3) / (1024 * 1024) | |
| if use_yourvoic: | |
| voice_info = f"Emotional AI: {yourvoic_voice_label} ({yourvoic_emotion})" | |
| mode_info = f"Emotional AI Engine" | |
| elif use_clone: | |
| voice_info = f"Cloned (ID: {cloned_voice_id[:20]}...)" | |
| mode_info = "Voice Clone Engine" | |
| else: | |
| voice_info = preset_voice_label | |
| mode_info = "Premium AI Engine" | |
| stats = ( | |
| f"**Audiobook Generated!**\n\n" | |
| f"- **Source:** {total_chars:,} characters in {total_chunks} chunks\n" | |
| f"- **Language:** {target_language}\n" | |
| f"- **Voice:** {voice_info}\n" | |
| f"- **File size:** {audio_size:.1f} MB\n" | |
| ) | |
| transcript_text = "\n\n---\n\n".join(all_transcripts) if all_transcripts else "" | |
| return final_mp3, stats, transcript_text | |
| except gr.Error: | |
| raise | |
| except Exception as e: | |
| raise gr.Error(f"Pipeline error: {str(e)}") | |
| # ========================================== | |
| # GRADIO UI | |
| # ========================================== | |
| SAMPLE_TEXT = """Chapter 1: The Beginning | |
| The old lighthouse stood at the edge of the world, or so it seemed to the girl who had lived in its shadow all her life. Each morning, she would climb the winding iron staircase to the lamp room, counting exactly one hundred and forty-seven steps, and watch the sun rise from the sea like a great golden coin. | |
| "One day," she whispered to the seagulls, "I'll follow that sun to wherever it goes." | |
| Her name was Elena, and she was seventeen years old. She had hair the color of dark honey and eyes that changed with the weather - grey in storms, green in sunlight. | |
| The lighthouse keeper, her grandfather, was a man of few words but many stories. He kept them locked away like treasures in a chest, only bringing them out on winter nights when the storms howled outside. | |
| "Tell me about the ships," Elena would say, curling up in the worn armchair by the fire. | |
| And he would smile - that slow, careful smile that seemed to cost him something each time - and begin.""" | |
| DESCRIPTION = """ | |
| # Audiobook Generator | |
| ### English Text to Multi-Language Audiobook | |
| """ | |
| # Build language dropdown organized by region | |
| lang_choices = [] | |
| # Qwen languages first | |
| for name in LANGUAGES: | |
| if LANGUAGES[name]["engine"] == "qwen": | |
| lang_choices.append(name) | |
| # YourVoic: African | |
| for name in ["Afrikaans", "Amharic", "Swahili", "Malagasy"]: | |
| if name in LANGUAGES: | |
| lang_choices.append(name) | |
| # YourVoic: Indian | |
| for name in ["Hindi", "Bengali", "Marathi", "Telugu", "Tamil", "Gujarati", "Kannada", | |
| "Malayalam", "Punjabi", "Odia", "Assamese", "Sindhi"]: | |
| if name in LANGUAGES: | |
| lang_choices.append(name) | |
| # YourVoic: South Asian | |
| for name in ["Urdu", "Nepali", "Sinhala", "Pashto"]: | |
| if name in LANGUAGES: | |
| lang_choices.append(name) | |
| # YourVoic: Southeast Asian | |
| for name in ["Indonesian", "Malay", "Vietnamese", "Thai", "Filipino", | |
| "Javanese", "Cebuano", "Lao", "Burmese"]: | |
| if name in LANGUAGES: | |
| lang_choices.append(name) | |
| # YourVoic: East Asian | |
| for name in ["Chinese (Taiwan)", "Cantonese"]: | |
| if name in LANGUAGES: | |
| lang_choices.append(name) | |
| # YourVoic: Middle Eastern | |
| for name in ["Turkish", "Hebrew", "Persian (Farsi)", "Azerbaijani"]: | |
| if name in LANGUAGES: | |
| lang_choices.append(name) | |
| # YourVoic: European | |
| for name in ["Dutch", "Romanian", "Polish", "Ukrainian", "Greek", "Swedish", "Serbian", | |
| "Catalan", "Albanian", "Danish", "Norwegian", "Finnish", "Slovak", | |
| "Belarusian", "Armenian", "Georgian"]: | |
| if name in LANGUAGES: | |
| lang_choices.append(name) | |
| # YourVoic: Central Asian | |
| for name in ["Mongolian"]: | |
| if name in LANGUAGES: | |
| lang_choices.append(name) | |
| def clean_language_name(choice): | |
| return choice.strip() | |
| def auto_select_engine(language_name): | |
| """Auto-select the right voice engine based on language.""" | |
| if language_name in LANGUAGES: | |
| return LANGUAGES[language_name]["engine"] | |
| return "qwen" | |
| def on_language_change(lang_choice): | |
| """Auto-switch visible controls and update voice choices based on language.""" | |
| lang = clean_language_name(lang_choice) | |
| engine = auto_select_engine(lang) | |
| if engine == "yourvoic": | |
| voice_choices = get_voices_for_language(lang) | |
| default_voice = voice_choices[0] if voice_choices else "Peter -- Universal fallback" | |
| return ( | |
| gr.update(visible=False), # preset_voice | |
| gr.update(visible=True, choices=voice_choices, value=default_voice), # yv_voice | |
| gr.update(visible=True), # yv_model | |
| gr.update(visible=True), # yv_emotion | |
| gr.update(value=f"Engine: Emotional AI Voices"), # engine_label | |
| gr.update(visible=False, value=False), # use_clone | |
| gr.update(visible=False), # clone_audio | |
| gr.update(visible=False), # clone_info | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=True), # preset_voice | |
| gr.update(visible=False), # yv_voice | |
| gr.update(visible=False), # yv_model | |
| gr.update(visible=False), # yv_emotion | |
| gr.update(value=f"Engine: Premium AI Voices"), # engine_label | |
| gr.update(visible=True), # use_clone | |
| gr.update(visible=False), # clone_audio | |
| gr.update(visible=False), # clone_info | |
| ) | |
| def on_clone_toggle(use_clone): | |
| """Show/hide clone controls.""" | |
| if use_clone: | |
| return gr.update(visible=True), gr.update(visible=True) | |
| return gr.update(visible=False), gr.update(visible=False) | |
| def generate_wrapper(text_input, file_input, language_choice, use_clone, | |
| preset_voice, clone_audio, yv_voice, yv_model, yv_emotion, | |
| add_pauses, progress=gr.Progress()): | |
| language = clean_language_name(language_choice) | |
| engine = auto_select_engine(language) | |
| # Build voice_mode string for the pipeline | |
| if use_clone: | |
| voice_mode = "Clone a Voice" | |
| elif engine == "yourvoic": | |
| voice_mode = "Emotional AI" | |
| else: | |
| voice_mode = "Preset Voice" | |
| return generate_audiobook( | |
| text_input, file_input, language, voice_mode, | |
| preset_voice, clone_audio, yv_voice, yv_model, yv_emotion, | |
| add_pauses, progress, | |
| ) | |
| with gr.Blocks(title="Audiobook Generator") as demo: | |
| gr.Markdown(DESCRIPTION) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| text_input = gr.Textbox(label="English Text", placeholder="Paste your English text here...", | |
| lines=10, max_lines=25) | |
| file_input = gr.File(label="Or Upload (.txt, .md, .pdf, .docx)", | |
| file_types=[".txt", ".md", ".text", ".pdf", ".docx", ".doc"], type="filepath") | |
| sample_btn = gr.Button("Load Sample Text", variant="secondary", size="sm") | |
| target_lang = gr.Dropdown(choices=lang_choices, value="English", label="Target Language", | |
| info="The right voice engine is selected automatically based on language.") | |
| engine_label = gr.Markdown(value="", visible=False) | |
| # Qwen preset voice (visible for Qwen languages) | |
| preset_voice = gr.Dropdown(choices=PRESET_VOICES, value="Jennifer -- Cinematic narrator", | |
| label="Narrator Voice", visible=True) | |
| # YourVoic controls (visible for YourVoic languages) | |
| yv_voice = gr.Dropdown(choices=YOURVOIC_VOICES_DEFAULT, value="Peter -- Universal fallback", | |
| label="Voice", visible=False, allow_custom_value=True, | |
| info="Voices update automatically per language.") | |
| yv_model = gr.Dropdown(choices=YOURVOIC_MODELS, value="balanced -- Balanced quality and speed (recommended)", | |
| label="AI Model", visible=False) | |
| yv_emotion = gr.Dropdown(choices=YOURVOIC_EMOTIONS, value="friendly", | |
| label="Emotion Style", visible=False, | |
| info="Add emotional expression to the narration") | |
| # Voice cloning toggle (optional, works for Qwen languages only) | |
| use_clone = gr.Checkbox(value=False, label="Use Voice Cloning (10 core languages only)", | |
| info="Clone a voice from audio sample instead of using preset") | |
| clone_audio = gr.Audio(label="Voice Sample (10s-3min)", type="filepath", visible=False) | |
| clone_info = gr.Markdown( | |
| value="> 10-180s clear speech, no background noise. Supports: EN, ZH, JA, KO, DE, FR, RU, PT, ES, IT.", | |
| visible=False, | |
| ) | |
| add_pauses = gr.Checkbox(value=True, label="Add pauses between sections", info="1.5s silence between chunks") | |
| generate_btn = gr.Button("Generate Audiobook", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| audio_output = gr.Audio(label="Generated Audiobook", type="filepath") | |
| stats_output = gr.Markdown(label="Generation Stats") | |
| with gr.Accordion("Translation / Narration Transcript", open=False): | |
| transcript_output = gr.Markdown() | |
| # Events | |
| sample_btn.click(fn=lambda: SAMPLE_TEXT, outputs=text_input) | |
| target_lang.change( | |
| fn=on_language_change, inputs=target_lang, | |
| outputs=[preset_voice, yv_voice, yv_model, yv_emotion, engine_label, | |
| use_clone, clone_audio, clone_info], | |
| ) | |
| use_clone.change(fn=on_clone_toggle, inputs=use_clone, outputs=[clone_audio, clone_info]) | |
| generate_btn.click( | |
| fn=generate_wrapper, | |
| inputs=[text_input, file_input, target_lang, use_clone, | |
| preset_voice, clone_audio, yv_voice, yv_model, yv_emotion, add_pauses], | |
| outputs=[audio_output, stats_output, transcript_output], | |
| ) | |
| gr.Markdown( | |
| "---\n" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |