import argparse import csv import docker import ebooklib import gradio as gr import hashlib import json import numpy as np import os import regex as re import requests import shutil import socket import subprocess import sys import threading import time import torch import torchaudio import urllib.request import uuid import zipfile import traceback from bs4 import BeautifulSoup from collections import Counter from collections.abc import MutableMapping from datetime import datetime from ebooklib import epub from glob import glob from huggingface_hub import hf_hub_download from iso639 import languages from multiprocessing import Manager, Event from pydub import AudioSegment from tqdm import tqdm from translate import Translator from TTS.api import TTS as XTTS from TTS.tts.configs.xtts_config import XttsConfig from TTS.tts.models.xtts import Xtts from urllib.parse import urlparse import lib.conf as conf import lib.lang as lang def inject_configs(target_namespace): # Extract variables from both modules and inject them into the target namespace for module in (conf, lang): target_namespace.update({k: v for k, v in vars(module).items() if not k.startswith('__')}) # Inject configurations into the global namespace of this module inject_configs(globals()) def recursive_proxy(data, manager=None): """Recursively convert a nested dictionary into Manager.dict proxies.""" if manager is None: manager = Manager() if isinstance(data, dict): proxy_dict = manager.dict() for key, value in data.items(): proxy_dict[key] = recursive_proxy(value, manager) return proxy_dict elif isinstance(data, list): proxy_list = manager.list() for item in data: proxy_list.append(recursive_proxy(item, manager)) return proxy_list elif isinstance(data, (str, int, float, bool, type(None))): # Scalars return data else: raise TypeError(f"Unsupported data type: {type(data)}") class ConversionContext: def __init__(self): self.manager = Manager() self.sessions = self.manager.dict() # Store all session-specific contexts self.cancellation_events = {} # Store multiprocessing.Event for each session def get_session(self, session_id): """Retrieve or initialize session-specific context""" if session_id not in self.sessions: self.sessions[session_id] = recursive_proxy({ "script_mode": NATIVE, "client": None, "language": default_language_code, "audiobooks_dir": None, "tmp_dir": None, "src": None, "id": session_id, "chapters_dir": None, "chapters_dir_sentences": None, "epub": None, "epub_path": None, "filename_noext": None, "fine_tuned": None, "voice_file": None, "custom_model": None, "custom_model_dir": None, "chapters": None, "cover": None, "metadata": { "title": None, "creator": None, "contributor": None, "language": None, "language_iso1": None, "identifier": None, "publisher": None, "date": None, "description": None, "subject": None, "rights": None, "format": None, "type": None, "coverage": None, "relation": None, "Source": None, "Modified": None, }, "status": "Idle", "progress": 0, "cancellation_requested": False }, manager=self.manager) return self.sessions[session_id] context = ConversionContext() is_gui_process = False class DependencyError(Exception): def __init__(self, message=None): super().__init__(message) # Automatically handle the exception when it's raised self.handle_exception() def handle_exception(self): # Print the full traceback of the exception traceback.print_exc() # Print the exception message print(f'Caught DependencyError: {self}') # Exit the script if it's not a web process if not is_gui_process: sys.exit(1) def prepare_dirs(src, session): try: resume = False os.makedirs(os.path.join(models_dir,'tts'), exist_ok=True) os.makedirs(session['tmp_dir'], exist_ok=True) os.makedirs(session['custom_model_dir'], exist_ok=True) os.makedirs(session['audiobooks_dir'], exist_ok=True) session['src'] = os.path.join(session['tmp_dir'], os.path.basename(src)) if os.path.exists(session['src']): if compare_files_by_hash(session['src'], src): resume = True if not resume: shutil.rmtree(session['chapters_dir'], ignore_errors=True) os.makedirs(session['chapters_dir'], exist_ok=True) os.makedirs(session['chapters_dir_sentences'], exist_ok=True) shutil.copy(src, session['src']) return True except Exception as e: raise DependencyError(e) def check_programs(prog_name, command, options): try: subprocess.run([command, options], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True, None except FileNotFoundError: e = f'''********** Error: {prog_name} is not installed! if your OS calibre package version is not compatible you still can run ebook2audiobook.sh (linux/mac) or ebook2audiobook.cmd (windows) **********''' raise DependencyError(e) except subprocess.CalledProcessError: e = f'Error: There was an issue running {prog_name}.' raise DependencyError(e) def check_fine_tuned(fine_tuned, language): try: for parent, children in models.items(): if fine_tuned in children: if language_xtts.get(language): tts = 'xtts' else: tts = 'fairseq' if parent == tts: return parent return False except Exception as e: raise RuntimeError(e) def analyze_uploaded_file(zip_path, required_files=None): if required_files is None: required_files = default_model_files executable_extensions = {'.exe', '.bat', '.cmd', '.bash', '.bin', '.sh', '.msi', '.dll', '.com'} try: with zipfile.ZipFile(zip_path, 'r') as zf: files_in_zip = set() executables_found = False for file_info in zf.infolist(): file_name = file_info.filename if file_info.is_dir(): continue # Skip directories base_name = os.path.basename(file_name) files_in_zip.add(base_name) _, ext = os.path.splitext(base_name.lower()) if ext in executable_extensions: executables_found = True break missing_files = [f for f in required_files if f not in files_in_zip] is_valid = not executables_found and not missing_files return is_valid, except zipfile.BadZipFile: raise ValueError("error: The file is not a valid ZIP archive.") except Exception as e: raise RuntimeError(f'analyze_uploaded_file(): {e}') async def extract_custom_model(file_src, dest=None, session=None, required_files=None): try: progress_bar = None if is_gui_process: progress_bar = gr.Progress(track_tqdm=True) if dest is None: dest = session['custom_model_dir'] = os.path.join(models_dir, '__sessions', f"model-{session['id']}") os.makedirs(dest, exist_ok=True) if required_files is None: required_files = default_model_files dir_src = os.path.dirname(file_src) dir_name = os.path.basename(file_src).replace('.zip', '') with zipfile.ZipFile(file_src, 'r') as zip_ref: files = zip_ref.namelist() files_length = len(files) dir_tts = 'fairseq' xtts_config = 'config.json' # Check the model type config_data = {} if xtts_config in zip_ref.namelist(): with zip_ref.open(xtts_config) as file: config_data = json.load(file) if config_data.get('model') == 'xtts': dir_tts = 'xtts' dir_dest = os.path.join(dest, dir_tts, dir_name) os.makedirs(dir_dest, exist_ok=True) # Initialize progress bar with tqdm(total=100, unit='%') as t: # Track progress as a percentage for i, file in enumerate(files): if file in required_files: zip_ref.extract(file, dir_dest) progress_percentage = ((i + 1) / files_length) * 100 t.n = int(progress_percentage) t.refresh() if progress_bar is not None: progress_bar(downloaded / total_size) yield dir_name, progress_bar os.remove(file_src) print(f'Extracted files to {dir_dest}') yield dir_name, progress_bar return except Exception as e: raise DependencyError(e) def calculate_hash(filepath, hash_algorithm='sha256'): hash_func = hashlib.new(hash_algorithm) with open(filepath, 'rb') as file: while chunk := file.read(8192): # Read in chunks to handle large files hash_func.update(chunk) return hash_func.hexdigest() def compare_files_by_hash(file1, file2, hash_algorithm='sha256'): return calculate_hash(file1, hash_algorithm) == calculate_hash(file2, hash_algorithm) def has_metadata(f): try: b = epub.read_epub(f) metadata = b.get_metadata('DC', '') if metadata: return True else: return False except Exception as e: return False def convert_to_epub(session): if session['cancellation_requested']: stop_and_detach_tts() print('Cancel requested') return False if session['script_mode'] == DOCKER_UTILS: try: docker_dir = os.path.basename(session['tmp_dir']) docker_file_in = os.path.basename(session['src']) docker_file_out = os.path.basename(session['epub_path']) # Check if the input file is already an EPUB if docker_file_in.lower().endswith('.epub'): shutil.copy(session['src'], session['epub_path']) return True # Convert the ebook to EPUB format using utils Docker image container = session['client'].containers.run( docker_utils_image, command=f'ebook-convert /files/{docker_dir}/{docker_file_in} /files/{docker_dir}/{docker_file_out}', volumes={session['tmp_dir']: {'bind': f'/files/{docker_dir}', 'mode': 'rw'}}, remove=True, detach=False, stdout=True, stderr=True ) print(container.decode('utf-8')) return True except docker.errors.ContainerError as e: raise DependencyError(e) except docker.errors.ImageNotFound as e: raise DependencyError(e) except docker.errors.APIError as e: raise DependencyError(e) else: try: util_app = shutil.which('ebook-convert') subprocess.run([util_app, session['src'], session['epub_path']], check=True) return True except subprocess.CalledProcessError as e: raise DependencyError(e) def get_cover(session): try: if session['cancellation_requested']: stop_and_detach_tts() print('Cancel requested') return False cover_image = False cover_path = os.path.join(session['tmp_dir'], session['filename_noext'] + '.jpg') for item in session['epub'].get_items_of_type(ebooklib.ITEM_COVER): cover_image = item.get_content() break if not cover_image: for item in session['epub'].get_items_of_type(ebooklib.ITEM_IMAGE): if 'cover' in item.file_name.lower() or 'cover' in item.get_id().lower(): cover_image = item.get_content() break if cover_image: with open(cover_path, 'wb') as cover_file: cover_file.write(cover_image) return cover_path return True except Exception as e: raise DependencyError(e) def get_chapters(language, session): try: if session['cancellation_requested']: stop_and_detach_tts() print('Cancel requested') return False all_docs = list(session['epub'].get_items_of_type(ebooklib.ITEM_DOCUMENT)) if all_docs: all_docs = all_docs[1:] doc_patterns = [filter_pattern(str(doc)) for doc in all_docs if filter_pattern(str(doc))] most_common_pattern = filter_doc(doc_patterns) selected_docs = [doc for doc in all_docs if filter_pattern(str(doc)) == most_common_pattern] chapters = [filter_chapter(doc, language) for doc in selected_docs] if session['metadata'].get('creator'): intro = f"{session['metadata']['creator']}, {session['metadata']['title']};\n " chapters[0].insert(0, intro) return chapters return False except Exception as e: raise DependencyError(f'Error extracting main content pages: {e}') def filter_doc(doc_patterns): pattern_counter = Counter(doc_patterns) # Returns a list with one tuple: [(pattern, count)] most_common = pattern_counter.most_common(1) return most_common[0][0] if most_common else None def filter_pattern(doc_identifier): parts = doc_identifier.split(':') if len(parts) > 2: segment = parts[1] if re.search(r'[a-zA-Z]', segment) and re.search(r'\d', segment): return ''.join([char for char in segment if char.isalpha()]) elif re.match(r'^[a-zA-Z]+$', segment): return segment elif re.match(r'^\d+$', segment): return 'numbers' return None def filter_chapter(doc, language): soup = BeautifulSoup(doc.get_body_content(), 'html.parser') # Remove scripts and styles for script in soup(["script", "style"]): script.decompose() # Normalize lines and remove unnecessary spaces text = re.sub(r'(\r\n|\r|\n){3,}', '\r\n', soup.get_text().strip()) text = replace_roman_numbers(text) lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) text = text.replace('»', '"').replace('«', '"') # Pattern 1: Add a space between UTF-8 characters and numbers text = re.sub(r'(?<=[\p{L}])(?=\d)|(?<=\d)(?=[\p{L}])', ' ', text) # Pattern 2: Split numbers into groups of 4 text = re.sub(r'(\d{4})(?=\d)', r'\1 ', text) chapter_sentences = get_sentences(text, language) return chapter_sentences def get_sentences(sentence, language, max_pauses=9): max_length = language_mapping[language]['char_limit'] punctuation = language_mapping[language]['punctuation'] sentence = sentence.replace(".", ";\n") parts = [] while len(sentence) > max_length or sum(sentence.count(p) for p in punctuation) > max_pauses: # Step 1: Look for the last period (.) within max_length possible_splits = [i for i, char in enumerate(sentence[:max_length]) if char == '.'] # Step 2: If no periods, look for the last comma (,) if not possible_splits: possible_splits = [i for i, char in enumerate(sentence[:max_length]) if char == ','] # Step 3: If still no splits, look for any other punctuation if not possible_splits: possible_splits = [i for i, char in enumerate(sentence[:max_length]) if char in punctuation] # Step 4: Determine where to split the sentence if possible_splits: split_at = possible_splits[-1] + 1 # Split at the last occurrence of punctuation else: # If no punctuation is found, split at the last space last_space = sentence.rfind(' ', 0, max_length) if last_space != -1: split_at = last_space + 1 else: # If no space is found, force split at max_length split_at = max_length # Add the split sentence to parts parts.append(sentence[:split_at].strip() + ' ') sentence = sentence[split_at:].strip() # Add the remaining sentence if any if sentence: parts.append(sentence.strip() + ' ') return parts def convert_chapters_to_audio(session): try: if session['cancellation_requested']: stop_and_detach_tts() print('Cancel requested') return False progress_bar = None params = {} if is_gui_process: progress_bar = gr.Progress(track_tqdm=True) params['tts_model'] = None ''' # List available TTS base models print("Available Models:") print("=================") for index, model in enumerate(XTTS().list_models(), 1): print(f"{index}. {model}") ''' if session['metadata']['language'] in language_xtts: params['tts_model'] = 'xtts' if session['custom_model'] is not None: print(f"Loading TTS {params['tts_model']} model from {session['custom_model']}...") model_path = os.path.join(session['custom_model'], 'model.pth') config_path = os.path.join(session['custom_model'],'config.json') vocab_path = os.path.join(session['custom_model'],'vocab.json') voice_path = os.path.join(session['custom_model'],'ref.wav') config = XttsConfig() config.models_dir = os.path.join(models_dir,'tts') config.load_json(config_path) params['tts'] = Xtts.init_from_config(config) params['tts'].load_checkpoint(config, checkpoint_path=model_path, vocab_path=vocab_path, eval=True) print('Computing speaker latents...') params['voice_file'] = session['voice_file'] if session['voice_file'] is not None else voice_path params['gpt_cond_latent'], params['speaker_embedding'] = params['tts'].get_conditioning_latents(audio_path=[params['voice_file']]) elif session['fine_tuned'] != 'std': print(f"Loading TTS {params['tts_model']} model from {session['fine_tuned']}...") hf_repo = models[params['tts_model']][session['fine_tuned']]['repo'] hf_sub = models[params['tts_model']][session['fine_tuned']]['sub'] cache_dir = os.path.join(models_dir,'tts') model_path = hf_hub_download(repo_id=hf_repo, filename=f"{hf_sub}/model.pth", cache_dir=cache_dir) config_path = hf_hub_download(repo_id=hf_repo, filename=f"{hf_sub}/config.json", cache_dir=cache_dir) vocab_path = hf_hub_download(repo_id=hf_repo, filename=f"{hf_sub}/vocab.json", cache_dir=cache_dir) config = XttsConfig() config.models_dir = cache_dir config.load_json(config_path) params['tts'] = Xtts.init_from_config(config) params['tts'].load_checkpoint(config, checkpoint_path=model_path, vocab_path=vocab_path, eval=True) print('Computing speaker latents...') params['voice_file'] = session['voice_file'] if session['voice_file'] is not None else models[params['tts_model']][session['fine_tuned']]['voice'] params['gpt_cond_latent'], params['speaker_embedding'] = params['tts'].get_conditioning_latents(audio_path=[params['voice_file']]) else: print(f"Loading TTS {params['tts_model']} model from {models[params['tts_model']][session['fine_tuned']]['repo']}...") params['tts'] = XTTS(model_name=models[params['tts_model']][session['fine_tuned']]['repo']) params['voice_file'] = session['voice_file'] if session['voice_file'] is not None else models[params['tts_model']][session['fine_tuned']]['voice'] params['tts'].to(session['device']) else: params['tts_model'] = 'fairseq' model_repo = models[params['tts_model']][session['fine_tuned']]['repo'].replace("[lang]", session['metadata']['language']) print(f"Loading TTS {model_repo} model from {model_repo}...") params['tts'] = XTTS(model_repo) params['voice_file'] = session['voice_file'] if session['voice_file'] is not None else models[params['tts_model']][session['fine_tuned']]['voice'] params['tts'].to(session['device']) resume_chapter = 0 resume_sentence = 0 # Check existing files to resume the process if it was interrupted existing_chapters = sorted([f for f in os.listdir(session['chapters_dir']) if f.endswith(f'.{audioproc_format}')]) existing_sentences = sorted([f for f in os.listdir(session['chapters_dir_sentences']) if f.endswith(f'.{audioproc_format}')]) if existing_chapters: count_chapter_files = len(existing_chapters) resume_chapter = count_chapter_files - 1 if count_chapter_files > 0 else 0 print(f'Resuming from chapter {count_chapter_files}') if existing_sentences: resume_sentence = len(existing_sentences) print(f'Resuming from sentence {resume_sentence}') total_chapters = len(session['chapters']) total_sentences = sum(len(array) for array in session['chapters']) current_sentence = 0 with tqdm(total=total_sentences, desc='convert_chapters_to_audio 0.00%', bar_format='{desc}: {n_fmt}/{total_fmt} ', unit='step', initial=resume_sentence) as t: t.n = resume_sentence t.refresh() for x in range(resume_chapter, total_chapters): chapter_num = x + 1 chapter_audio_file = f'chapter_{chapter_num}.{audioproc_format}' sentences = session['chapters'][x] start = current_sentence # Mark the starting sentence of the chapter print(f"\nChapter {chapter_num} containing {len(sentences)} sentences...") for i, sentence in enumerate(sentences): if current_sentence >= resume_sentence and resume_sentence > 0 or resume_sentence == 0: params['sentence_audio_file'] = os.path.join(session['chapters_dir_sentences'], f'{current_sentence}.{audioproc_format}') params['sentence'] = sentence if convert_sentence_to_audio(params, session): t.update(1) percentage = (current_sentence / total_sentences) * 100 t.set_description(f'Processing {percentage:.2f}%') print(f'Sentence: {sentence}') t.refresh() if progress_bar is not None: progress_bar(current_sentence / total_sentences) else: return False current_sentence += 1 end = current_sentence - 1 if combine_audio_sentences(chapter_audio_file, start, end, session): print(f'Combining chapter {chapter_num} to audio, sentence {start} to {end}') else: print('combine_audio_sentences() failed!') return False return True except Exception as e: raise DependencyError(e) def convert_sentence_to_audio(params, session): try: if session['cancellation_requested']: stop_and_detach_tts(params['tts']) print('Cancel requested') return False generation_params = { "temperature": session['temperature'], "length_penalty": session["length_penalty"], "repetition_penalty": session['repetition_penalty'], "num_beams": int(session['length_penalty']) + 1 if session["length_penalty"] > 1 else 1, "top_k": session['top_k'], "top_p": session['top_p'], "speed": session['speed'], "enable_text_splitting": session['enable_text_splitting'] } if params['tts_model'] == 'xtts': if session['custom_model'] is not None or session['fine_tuned'] != 'std': output = params['tts'].inference( text=params['sentence'], language=session['metadata']['language_iso1'], gpt_cond_latent=params['gpt_cond_latent'], speaker_embedding=params['speaker_embedding'], **generation_params ) torchaudio.save( params['sentence_audio_file'], torch.tensor(output[audioproc_format]).unsqueeze(0), sample_rate=24000 ) else: params['tts'].tts_to_file( text=params['sentence'], language=session['metadata']['language_iso1'], file_path=params['sentence_audio_file'], speaker_wav=params['voice_file'], **generation_params ) elif params['tts_model'] == 'fairseq': params['tts'].tts_with_vc_to_file( text=params['sentence'], file_path=params['sentence_audio_file'], speaker_wav=params['voice_file'].replace('_24khz','_16khz'), split_sentences=session['enable_text_splitting'] ) if os.path.exists(params['sentence_audio_file']): return True print(f"Cannot create {params['sentence_audio_file']}") return False except Exception as e: raise DependencyError(e) def combine_audio_sentences(chapter_audio_file, start, end, session): try: chapter_audio_file = os.path.join(session['chapters_dir'], chapter_audio_file) combined_audio = AudioSegment.empty() # Get all audio sentence files sorted by their numeric indices sentence_files = [f for f in os.listdir(session['chapters_dir_sentences']) if f.endswith(".wav")] sentences_dir_ordered = sorted(sentence_files, key=lambda x: int(re.search(r'\d+', x).group())) # Filter the files in the range [start, end] selected_files = [ file for file in sentences_dir_ordered if start <= int(''.join(filter(str.isdigit, os.path.basename(file)))) <= end ] for file in selected_files: if session['cancellation_requested']: stop_and_detach_tts(params['tts']) print('Cancel requested') return False if session['cancellation_requested']: msg = 'Cancel requested' raise ValueError(msg) audio_segment = AudioSegment.from_file(os.path.join(session['chapters_dir_sentences'],file), format=audioproc_format) combined_audio += audio_segment combined_audio.export(chapter_audio_file, format=audioproc_format) print(f'Combined audio saved to {chapter_audio_file}') return True except Exception as e: raise DependencyError(e) def combine_audio_chapters(session): def sort_key(chapter_file): numbers = re.findall(r'\d+', chapter_file) return int(numbers[0]) if numbers else 0 def assemble_audio(): try: combined_audio = AudioSegment.empty() batch_size = 256 # Process the chapter files in batches for i in range(0, len(chapter_files), batch_size): batch_files = chapter_files[i:i + batch_size] batch_audio = AudioSegment.empty() # Initialize an empty AudioSegment for the batch # Sequentially append each file in the current batch to the batch_audio for chapter_file in batch_files: if session['cancellation_requested']: print('Cancel requested') return False audio_segment = AudioSegment.from_wav(os.path.join(session['chapters_dir'],chapter_file)) batch_audio += audio_segment combined_audio += batch_audio combined_audio.export(assembled_audio, format=audioproc_format) print(f'Combined audio saved to {assembled_audio}') return True except Exception as e: raise DependencyError(e) def generate_ffmpeg_metadata(): try: if session['cancellation_requested']: print('Cancel requested') return False ffmpeg_metadata = ';FFMETADATA1\n' if session['metadata'].get('title'): ffmpeg_metadata += f"title={session['metadata']['title']}\n" if session['metadata'].get('creator'): ffmpeg_metadata += f"artist={session['metadata']['creator']}\n" if session['metadata'].get('language'): ffmpeg_metadata += f"language={session['metadata']['language']}\n\n" if session['metadata'].get('publisher'): ffmpeg_metadata += f"publisher={session['metadata']['publisher']}\n" if session['metadata'].get('description'): ffmpeg_metadata += f"description={session['metadata']['description']}\n" if session['metadata'].get('published'): # Check if the timestamp contains fractional seconds if '.' in session['metadata']['published']: # Parse with fractional seconds year = datetime.strptime(session['metadata']['published'], '%Y-%m-%dT%H:%M:%S.%f%z').year else: # Parse without fractional seconds year = datetime.strptime(session['metadata']['published'], '%Y-%m-%dT%H:%M:%S%z').year else: # If published is not provided, use the current year year = datetime.now().year ffmpeg_metadata += f'year={year}\n' if session['metadata'].get('identifiers') and isinstance(session['metadata'].get('identifiers'), dict): isbn = session['metadata']['identifiers'].get('isbn', None) if isbn: ffmpeg_metadata += f'isbn={isbn}\n' # ISBN mobi_asin = session['metadata']['identifiers'].get('mobi-asin', None) if mobi_asin: ffmpeg_metadata += f'asin={mobi_asin}\n' # ASIN start_time = 0 for index, chapter_file in enumerate(chapter_files): if session['cancellation_requested']: msg = 'Cancel requested' raise ValueError(msg) duration_ms = len(AudioSegment.from_wav(os.path.join(session['chapters_dir'],chapter_file))) ffmpeg_metadata += f'[CHAPTER]\nTIMEBASE=1/1000\nSTART={start_time}\n' ffmpeg_metadata += f'END={start_time + duration_ms}\ntitle=Chapter {index + 1}\n' start_time += duration_ms # Write the metadata to the file with open(metadata_file, 'w', encoding='utf-8') as file: file.write(ffmpeg_metadata) return True except Exception as e: raise DependencyError(e) def export_audio(): try: if session['cancellation_requested']: print('Cancel requested') return False ffmpeg_cover = None if session['script_mode'] == DOCKER_UTILS: docker_dir = os.path.basename(session['tmp_dir']) ffmpeg_combined_audio = f'/files/{docker_dir}/' + os.path.basename(assembled_audio) ffmpeg_metadata_file = f'/files/{docker_dir}/' + os.path.basename(metadata_file) ffmpeg_final_file = f'/files/{docker_dir}/' + os.path.basename(docker_final_file) if session['cover'] is not None: ffmpeg_cover = f'/files/{docker_dir}/' + os.path.basename(session['cover']) ffmpeg_cmd = ['ffmpeg', '-i', ffmpeg_combined_audio, '-i', ffmpeg_metadata_file] else: ffmpeg_combined_audio = assembled_audio ffmpeg_metadata_file = metadata_file ffmpeg_final_file = final_file if session['cover'] is not None: ffmpeg_cover = session['cover'] ffmpeg_cmd = [shutil.which('ffmpeg'), '-i', ffmpeg_combined_audio, '-i', ffmpeg_metadata_file] if ffmpeg_cover is not None: ffmpeg_cmd += ['-i', ffmpeg_cover, '-map', '0:a', '-map', '2:v'] else: ffmpeg_cmd += ['-map', '0:a'] ffmpeg_cmd += ['-map_metadata', '1', '-c:a', 'aac', '-b:a', '128k', '-ar', '44100'] if ffmpeg_cover is not None: if ffmpeg_cover.endswith('.png'): ffmpeg_cmd += ['-c:v', 'png', '-disposition:v', 'attached_pic'] # PNG cover else: ffmpeg_cmd += ['-c:v', 'copy', '-disposition:v', 'attached_pic'] # JPEG cover (no re-encoding needed) if ffmpeg_cover is not None and ffmpeg_cover.endswith('.png'): ffmpeg_cmd += ['-pix_fmt', 'yuv420p'] ffmpeg_cmd += ['-af', 'agate=threshold=-33dB:ratio=2:attack=5:release=100,acompressor=threshold=-20dB:ratio=2.5:attack=50:release=200:makeup=0dB,loudnorm=I=-19:TP=-3:LRA=7:linear=true'] ffmpeg_cmd += ['-movflags', '+faststart', '-y', ffmpeg_final_file] if session['script_mode'] == DOCKER_UTILS: try: container = session['client'].containers.run( docker_utils_image, command=ffmpeg_cmd, volumes={session['tmp_dir']: {'bind': f'/files/{docker_dir}', 'mode': 'rw'}}, remove=True, detach=False, stdout=True, stderr=True ) print(container.decode('utf-8')) if shutil.copy(docker_final_file, final_file): return True return False except docker.errors.ContainerError as e: raise DependencyError(e) except docker.errors.ImageNotFound as e: raise DependencyError(e) except docker.errors.APIError as e: raise DependencyError(e) else: try: subprocess.run(ffmpeg_cmd, env={}, check=True) return True except subprocess.CalledProcessError as e: raise DependencyError(e) except Exception as e: raise DependencyError(e) try: chapter_files = [f for f in os.listdir(session['chapters_dir']) if f.endswith(".wav")] chapter_files = sorted(chapter_files, key=lambda x: int(re.search(r'\d+', x).group())) assembled_audio = os.path.join(session['tmp_dir'], session['metadata']['title'] + '.' + audioproc_format) metadata_file = os.path.join(session['tmp_dir'], 'metadata.txt') if assemble_audio(): if generate_ffmpeg_metadata(): final_name = session['metadata']['title'] + '.' + audiobook_format docker_final_file = os.path.join(session['tmp_dir'], final_name) final_file = os.path.join(session['audiobooks_dir'], final_name) if export_audio(): shutil.rmtree(session['tmp_dir']) return final_file return None except Exception as e: raise DependencyError(e) def replace_roman_numbers(text): def roman_to_int(s): try: roman = {'I': 1, 'V': 5, 'X': 10, 'L': 50, 'C': 100, 'D': 500, 'M': 1000, 'IV': 4, 'IX': 9, 'XL': 40, 'XC': 90, 'CD': 400, 'CM': 900} i = 0 num = 0 # Iterate over the string to calculate the integer value while i < len(s): # Check for two-character numerals (subtractive combinations) if i + 1 < len(s) and s[i:i+2] in roman: num += roman[s[i:i+2]] i += 2 else: # Add the value of the single character num += roman[s[i]] i += 1 return num except Exception as e: return s roman_chapter_pattern = re.compile( r'\b(chapter|volume|chapitre|tome|capitolo|capítulo|volumen|Kapitel|глава|том|κεφάλαιο|τόμος|capitul|poglavlje)\s' r'(M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})|[IVXLCDM]+)\b', re.IGNORECASE ) roman_numerals_with_period = re.compile( r'^(M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})|[IVXLCDM])\.+' ) def replace_chapter_match(match): chapter_word = match.group(1) roman_numeral = match.group(2) integer_value = roman_to_int(roman_numeral.upper()) return f'{chapter_word.capitalize()} {integer_value}' def replace_numeral_with_period(match): roman_numeral = match.group(1) integer_value = roman_to_int(roman_numeral) return f'{integer_value}.' text = roman_chapter_pattern.sub(replace_chapter_match, text) text = roman_numerals_with_period.sub(replace_numeral_with_period, text) return text def stop_and_detach_tts(tts=None): if tts is not None: if next(tts.parameters()).is_cuda: tts.to('cpu') del tts if torch.cuda.is_available(): torch.cuda.empty_cache() def delete_old_web_folders(root_dir): try: if not os.path.exists(root_dir): os.makedirs(root_dir) print(f'Created missing directory: {root_dir}') current_time = time.time() age_limit = current_time - interface_shared_expire * 60 * 60 # 24 hours in seconds for folder_name in os.listdir(root_dir): dir_path = os.path.join(root_dir, folder_name) if os.path.isdir(dir_path) and folder_name.startswith('web-'): folder_creation_time = os.path.getctime(dir_path) if folder_creation_time < age_limit: shutil.rmtree(dir_path) except Exception as e: raise DependencyError(e) def compare_file_metadata(f1, f2): if os.path.getsize(f1) != os.path.getsize(f2): return False if os.path.getmtime(f1) != os.path.getmtime(f2): return False return True def convert_ebook(args): try: global is_gui_process global context error = None try: if len(args['language']) == 2: lang_array = languages.get(alpha2=args['language']) if lang_array and lang_array.part3: args['language'] = lang_array.part3 else: args['language'] = None else: lang_array = languages.get(part3=args['language']) if not lang_array: args['language'] = None except Exception as e: args['language'] = None pass if args['language'] is not None and args['language'] in language_mapping.keys(): session_id = args['session'] if args['session'] is not None else str(uuid.uuid4()) session = context.get_session(session_id) session['id'] = session_id session['src'] = args['ebook'] session['script_mode'] = args['script_mode'] if args['script_mode'] is not None else NATIVE session['audiobooks_dir'] = args['audiobooks_dir'] is_gui_process = args['is_gui_process'] device = args['device'].lower() voice_file = args['voice'] language = args['language'] temperature = args['temperature'] length_penalty = args['length_penalty'] repetition_penalty = args['repetition_penalty'] top_k = args['top_k'] top_p = args['top_p'] speed = args['speed'] enable_text_splitting = args['enable_text_splitting'] if args['enable_text_splitting'] is not None else True custom_model_file = args['custom_model'] if args['custom_model'] != 'none' and args['custom_model'] is not None else None fine_tuned = args['fine_tuned'] if check_fine_tuned(args['fine_tuned'], args['language']) else None if not fine_tuned: raise ValueError('The fine tuned model does not exist.') if not os.path.splitext(args['ebook'])[1]: raise ValueError('The selected ebook file has no extension. Please select a valid file.') if session['script_mode'] == NATIVE: bool, e = check_programs('Calibre', 'calibre', '--version') if not bool: raise DependencyError(e) bool, e = check_programs('FFmpeg', 'ffmpeg', '-version') if not bool: raise DependencyError(e) elif session['script_mode'] == DOCKER_UTILS: session['client'] = docker.from_env() session['tmp_dir'] = os.path.join(processes_dir, f"ebook-{session['id']}") session['chapters_dir'] = os.path.join(session['tmp_dir'], f"chapters_{hashlib.md5(args['ebook'].encode()).hexdigest()}") session['chapters_dir_sentences'] = os.path.join(session['chapters_dir'], 'sentences') if not is_gui_process: print(f'*********** Session: {session_id}', '************* Store it in case of interruption or crash you can resume the conversion') session['custom_model_dir'] = os.path.join(models_dir,'__sessions',f"model-{session['id']}") if custom_model_file: session['custom_model'], progression_status = extract_custom_model(custom_model_file, session['custom_model_dir']) if not session['custom_model']: raise ValueError(f'{custom_model_file} could not be extracted or mandatory files are missing') if prepare_dirs(args['ebook'], session): session['filename_noext'] = os.path.splitext(os.path.basename(session['src']))[0] if not torch.cuda.is_available() or device == 'cpu': if device == 'gpu': print('GPU is not available on your device!') device = 'cpu' else: device = 'cuda' torch.device(device) print(f'Available Processor Unit: {device}') session['epub_path'] = os.path.join(session['tmp_dir'], '__' + session['filename_noext'] + '.epub') has_src_metadata = has_metadata(session['src']) if convert_to_epub(session): session['epub'] = epub.read_epub(session['epub_path'], {'ignore_ncx': True}) metadata = dict(session['metadata']) for key, value in metadata.items(): data = session['epub'].get_metadata('DC', key) if data: for value, attributes in data: if key == 'language' and not has_src_metadata: session['metadata'][key] = language else: session['metadata'][key] = value language_array = languages.get(part3=language) if language_array and language_array.part1: session['metadata']['language_iso1'] = language_array.part1 if session['metadata']['language'] == language or session['metadata']['language_iso1'] and session['metadata']['language'] == session['metadata']['language_iso1']: session['metadata']['title'] = os.path.splitext(os.path.basename(session['src']))[0] if not session['metadata']['title'] else session['metadata']['title'] session['metadata']['creator'] = False if not session['metadata']['creator'] else session['metadata']['creator'] session['cover'] = get_cover(session) if session['cover']: session['chapters'] = get_chapters(language, session) if session['chapters']: session['device'] = device session['temperature'] = temperature session['length_penalty'] = length_penalty session['repetition_penalty'] = repetition_penalty session['top_k'] = top_k session['top_p'] = top_p session['speed'] = speed session['enable_text_splitting'] = enable_text_splitting session['fine_tuned'] = fine_tuned session['voice_file'] = voice_file session['language'] = language if convert_chapters_to_audio(session): final_file = combine_audio_chapters(session) if final_file is not None: progress_status = f'Audiobook {os.path.basename(final_file)} created!' return progress_status, final_file else: error = 'combine_audio_chapters() error: final_file not created!' else: error = 'convert_chapters_to_audio() failed!' else: error = 'get_chapters() failed!' else: error = 'get_cover() failed!' else: error = f"WARNING: Ebook language: {session['metadata']['language']}, language selected: {language}" else: error = 'convert_to_epub() failed!' else: error = f"Temporary directory {session['tmp_dir']} not removed due to failure." else: error = f"Language {args['language']} is not supported." if session['cancellation_requested']: error = 'Cancelled' print(error) return error, None except Exception as e: print(f'convert_ebook() Exception: {e}') return e, None def web_interface(args): script_mode = args.script_mode is_gui_process = args.is_gui_process is_gui_shared = args.share is_converting = False audiobooks_dir = None ebook_src = None audiobook_file = None language_options = [ ( f"{details['name']} - {details['native_name']}" if details['name'] != details['native_name'] else details['name'], lang ) for lang, details in language_mapping.items() ] custom_model_options = None fine_tuned_options = list(models['xtts'].keys()) default_language_name = next((name for name, key in language_options if key == default_language_code), None) theme = gr.themes.Origin( primary_hue='amber', secondary_hue='green', neutral_hue='gray', radius_size='lg', font_mono=['JetBrains Mono', 'monospace', 'Consolas', 'Menlo', 'Liberation Mono'] ) with gr.Blocks(theme=theme) as interface: gr.HTML( ''' ''' ) gr.Markdown( f''' # Ebook2Audiobook v{version}
https://github.com/DrewThomasson/ebook2audiobook
Convert eBooks into immersive audiobooks with realistic voice TTS models.
Multiuser, multiprocessing, multithread on a geo cluster to share the conversion to the Grid. ''' ) with gr.Tabs(): gr_tab_main = gr.TabItem('Input Options') with gr_tab_main: with gr.Row(): with gr.Column(scale=3): with gr.Group(): gr_ebook_file = gr.File(label='EBook File (.epub, .mobi, .azw3, fb2, lrf, rb, snb, tcr, .pdf, .txt, .rtf, doc, .docx, .html, .odt, .azw)', file_types=['.epub', '.mobi', '.azw3', 'fb2', 'lrf', 'rb', 'snb', 'tcr', '.pdf', '.txt', '.rtf', 'doc', '.docx', '.html', '.odt', '.azw']) with gr.Group(): gr_voice_file = gr.File(label='*Cloning Voice (a .wav 24000hz for XTTS base model and 16000hz for FAIRSEQ base model, no more than 6 sec)', file_types=['.wav'], visible=interface_component_options['gr_voice_file']) gr.Markdown('

  * Optional

') with gr.Group(): gr_device = gr.Radio(label='Processor Unit', choices=['CPU', 'GPU'], value='CPU') with gr.Group(): gr_language = gr.Dropdown(label='Language', choices=[name for name, _ in language_options], value=default_language_name) with gr.Column(scale=3): gr_group_custom_model = gr.Group(visible=interface_component_options['gr_group_custom_model']) with gr_group_custom_model: gr_custom_model_file = gr.File(label='*Custom XTTS Model (a .zip containing config.json, vocab.json, model.pth, ref.wav)', file_types=['.zip']) gr_custom_model_list = gr.Dropdown(label='', choices=['none'], interactive=True) gr.Markdown('

  * Optional

') with gr.Group(): gr_session_status = gr.Textbox(label='Session') with gr.Group(): gr_tts_engine = gr.Dropdown(label='TTS Base', choices=[default_tts_engine], value=default_tts_engine, interactive=True) gr_fine_tuned = gr.Dropdown(label='Fine Tuned Models', choices=fine_tuned_options, value=default_fine_tuned, interactive=True) gr_tab_preferences = gr.TabItem('Audio Generation Preferences', visible=interface_component_options['gr_tab_preferences']) with gr_tab_preferences: gr.Markdown( ''' ### Customize Audio Generation Parameters Adjust the settings below to influence how the audio is generated. You can control the creativity, speed, repetition, and more. ''' ) gr_temperature = gr.Slider( label='Temperature', minimum=0.1, maximum=10.0, step=0.1, value=0.65, info='Higher values lead to more creative, unpredictable outputs. Lower values make it more monotone.' ) gr_length_penalty = gr.Slider( label='Length Penalty', minimum=0.5, maximum=10.0, step=0.1, value=1.0, info='Penalize longer sequences. Higher values produce shorter outputs. Not applied to custom models.' ) gr_repetition_penalty = gr.Slider( label='Repetition Penalty', minimum=1.0, maximum=10.0, step=0.1, value=2.5, info='Penalizes repeated phrases. Higher values reduce repetition.' ) gr_top_k = gr.Slider( label='Top-k Sampling', minimum=10, maximum=100, step=1, value=50, info='Lower values restrict outputs to more likely words and increase speed at which audio generates.' ) gr_top_p = gr.Slider( label='Top-p Sampling', minimum=0.1, maximum=1.0, step=.01, value=0.8, info='Controls cumulative probability for word selection. Lower values make the output more predictable and increase speed at which audio generates.' ) gr_speed = gr.Slider( label='Speed', minimum=0.5, maximum=3.0, step=0.1, value=1.0, info='Adjusts how fast the narrator will speak.' ) gr_enable_text_splitting = gr.Checkbox( label='Enable Text Splitting', value=True, info='Splits long texts into sentences to generate audio in chunks. Useful for very long inputs.' ) gr_state = gr.State(value="") # Initialize state for each user session gr_session = gr.Textbox(label='Session', visible=False) gr_conversion_progress = gr.Textbox(label='Progress') gr_convert_btn = gr.Button('Convert', variant='primary', interactive=False) gr_audio_player = gr.Audio(label='Listen', type='filepath', show_download_button=False, container=True, visible=False) gr_audiobooks_ddn = gr.Dropdown(choices=[], label='Audiobooks') gr_audiobook_link = gr.File(label='Download') gr_write_data = gr.JSON(visible=False) gr_read_data = gr.JSON(visible=False) gr_data = gr.State({}) gr_modal_html = gr.HTML() def show_modal(message): return f''' ''' def hide_modal(): return '' def update_interface(): nonlocal is_converting is_converting = False return gr.update('Convert', variant='primary', interactive=False), gr.update(), gr.update(value=audiobook_file), update_audiobooks_ddn(), hide_modal() def refresh_audiobook_list(): files = [] if audiobooks_dir is not None: if os.path.exists(audiobooks_dir): files = [f for f in os.listdir(audiobooks_dir)] files.sort(key=lambda x: os.path.getmtime(os.path.join(audiobooks_dir, x)), reverse=True) return files def change_gr_audiobooks_ddn(audiobook): if audiobooks_dir is not None: if audiobook: link = os.path.join(audiobooks_dir, audiobook) return link, link, gr.update(visible=True) return None, None, gr.update(visible=False) def update_convert_btn(upload_file, custom_model_file, session_id): session = context.get_session(session_id) if hasattr(upload_file, 'name') and not hasattr(custom_model_file, 'name'): yield gr.update(variant='primary', interactive=True) else: yield gr.update(variant='primary', interactive=False) return def update_audiobooks_ddn(): files = refresh_audiobook_list() return gr.update(choices=files, label='Audiobooks', value=files[0] if files else None) async def change_gr_ebook_file(f, session_id): nonlocal is_converting if context and session_id: session = context.get_session(session_id) if f is None: if is_converting: session['cancellation_requested'] = True yield show_modal('Cancellation requested, please wait...') return session['cancellation_requested'] = False yield hide_modal() return def change_gr_language(selected: str, session_id: str): nonlocal custom_model_options if selected == 'zzzz': new_language_name = default_language_name new_language_key = default_language_code else: new_language_name, new_language_key = next(((name, key) for name, key in language_options if key == selected), (None, None)) tts_engine_options = ['xtts'] if language_xtts.get(new_language_key, False) else ['fairseq'] fine_tuned_options = [ model_name for model_name, model_details in models.get(tts_engine_options[0], {}).items() if model_details.get('lang') == 'multi' or model_details.get('lang') == new_language_key ] custom_model_options = ['none'] if context and session_id: session = context.get_session(session_id) session['language'] = new_language_key custom_model_tts = check_custom_model_tts(session) custom_model_tts_dir = os.path.join(session['custom_model_dir'], custom_model_tts) if os.path.exists(custom_model_tts_dir): custom_model_options += os.listdir(custom_model_tts_dir) return ( gr.update(value=new_language_name), gr.update(choices=tts_engine_options, value=tts_engine_options[0]), gr.update(choices=fine_tuned_options, value=fine_tuned_options[0] if fine_tuned_options else 'none'), gr.update(choices=custom_model_options, value=custom_model_options[0]) ) def check_custom_model_tts(session): custom_model_tts = 'xtts' if not language_xtts.get(session['language']): custom_model_tts = 'fairseq' custom_model_tts_dir = os.path.join(session['custom_model_dir'], custom_model_tts) if not os.path.isdir(custom_model_tts_dir): os.makedirs(custom_model_tts_dir, exist_ok=True) return custom_model_tts def change_gr_custom_model_list(custom_model_list): if custom_model_list == 'none': return gr.update(visible=True) return gr.update(visible=False) async def change_gr_custom_model_file(custom_model_file, session_id): try: nonlocal custom_model_options, gr_custom_model_file, gr_conversion_progress if context and session_id: session = context.get_session(session_id) if custom_model_file is not None: if analyze_uploaded_file(custom_model_file): session['custom_model'], progress_status = extract_custom_model(custom_model_file, None, session) if session['custom_model']: custom_model_tts_dir = check_custom_model_tts(session) custom_model_options = ['none'] + os.listdir(os.path.join(session['custom_model_dir'], custom_model_tts_dir)) yield ( gr.update(visible=False), gr.update(choices=custom_model_options, value=session['custom_model']), gr.update(value=f"{session['custom_model']} added to the custom list") ) gr_custom_model_file = gr.File(label='*XTTS Model (a .zip containing config.json, vocab.json, model.pth, ref.wav)', value=None, file_types=['.zip']) return yield gr.update(), gr.update(), gr.update(value='Invalid file! Please upload a valid ZIP.') return except Exception as e: yield gr.update(), gr.update(), gr.update(value=f'Error: {str(e)}') return def change_gr_fine_tuned(fine_tuned): visible = False if fine_tuned == 'std': visible = True return gr.update(visible=visible) def change_gr_data(data): data['event'] = 'change_data' return data def change_gr_read_data(data): nonlocal audiobooks_dir nonlocal custom_model_options warning_text_extra = '' if not data: data = {'session_id': str(uuid.uuid4())} warning_text = f"Session: {data['session_id']}" else: if 'session_id' not in data: data['session_id'] = str(uuid.uuid4()) warning_text = data['session_id'] event = data.get('event', '') if event != 'load': return [gr.update(), gr.update(), gr.update(), gr.update(), gr.update()] session = context.get_session(data['session_id']) session['custom_model_dir'] = os.path.join(models_dir,'__sessions',f"model-{session['id']}") os.makedirs(session['custom_model_dir'], exist_ok=True) custom_model_tts_dir = check_custom_model_tts(session) custom_model_options = ['none'] + os.listdir(os.path.join(session['custom_model_dir'],custom_model_tts_dir)) if is_gui_shared: warning_text_extra = f' Note: access limit time: {interface_shared_expire} hours' audiobooks_dir = os.path.join(audiobooks_gradio_dir, f"web-{data['session_id']}") delete_old_web_folders(audiobooks_gradio_dir) else: audiobooks_dir = os.path.join(audiobooks_host_dir, f"web-{data['session_id']}") return [data, f'{warning_text}{warning_text_extra}', data['session_id'], update_audiobooks_ddn(), gr.update(choices=custom_model_options, value='none')] def submit_convert_btn( session, device, ebook_file, voice_file, language, custom_model_file, temperature, length_penalty, repetition_penalty, top_k, top_p, speed, enable_text_splitting, fine_tuned ): nonlocal is_converting args = { "is_gui_process": is_gui_process, "session": session, "script_mode": script_mode, "device": device.lower(), "ebook": ebook_file.name if ebook_file else None, "audiobooks_dir": audiobooks_dir, "voice": voice_file.name if voice_file else None, "language": next((key for name, key in language_options if name == language), None), "custom_model": next((key for name, key in language_options if name != 'none'), None), "temperature": float(temperature), "length_penalty": float(length_penalty), "repetition_penalty": float(repetition_penalty), "top_k": int(top_k), "top_p": float(top_p), "speed": float(speed), "enable_text_splitting": enable_text_splitting, "fine_tuned": fine_tuned } if args["ebook"] is None: return gr.update(value='Error: a file is required.') try: is_converting = True progress_status, audiobook_file = convert_ebook(args) if audiobook_file is None: if is_converting: return gr.update(value='Conversion cancelled.') else: return gr.update(value='Conversion failed.') else: return progress_status except Exception as e: return DependencyError(e) gr_ebook_file.change( fn=update_convert_btn, inputs=[gr_ebook_file, gr_custom_model_file, gr_session], outputs=gr_convert_btn ).then( fn=change_gr_ebook_file, inputs=[gr_ebook_file, gr_session], outputs=[gr_modal_html] ) gr_language.change( fn=lambda selected, session_id: change_gr_language(dict(language_options).get(selected, 'Unknown'), session_id), inputs=[gr_language, gr_session], outputs=[gr_language, gr_tts_engine, gr_fine_tuned, gr_custom_model_list] ) gr_audiobooks_ddn.change( fn=change_gr_audiobooks_ddn, inputs=gr_audiobooks_ddn, outputs=[gr_audiobook_link, gr_audio_player, gr_audio_player] ) gr_custom_model_file.change( fn=change_gr_custom_model_file, inputs=[gr_custom_model_file, gr_session], outputs=[gr_fine_tuned, gr_custom_model_list, gr_conversion_progress] ) gr_custom_model_list.change( fn=change_gr_custom_model_list, inputs=gr_custom_model_list, outputs=gr_fine_tuned ) gr_fine_tuned.change( fn=change_gr_fine_tuned, inputs=gr_fine_tuned, outputs=gr_group_custom_model ) gr_session.change( fn=change_gr_data, inputs=gr_data, outputs=gr_write_data ) gr_write_data.change( fn=None, inputs=gr_write_data, js=''' (data) => { localStorage.clear(); console.log(data); window.localStorage.setItem('data', JSON.stringify(data)); } ''' ) gr_read_data.change( fn=change_gr_read_data, inputs=gr_read_data, outputs=[gr_data, gr_session_status, gr_session, gr_audiobooks_ddn, gr_custom_model_list] ) gr_convert_btn.click( fn=submit_convert_btn, inputs=[ gr_session, gr_device, gr_ebook_file, gr_voice_file, gr_language, gr_custom_model_list, gr_temperature, gr_length_penalty, gr_repetition_penalty, gr_top_k, gr_top_p, gr_speed, gr_enable_text_splitting, gr_fine_tuned ], outputs=gr_conversion_progress ).then( fn=update_interface, inputs=None, outputs=[gr_convert_btn, gr_ebook_file, gr_audio_player, gr_audiobooks_ddn, gr_modal_html] ) interface.load( fn=None, js=''' () => { const dataStr = window.localStorage.getItem('data'); if (dataStr) { const obj = JSON.parse(dataStr); obj.event = 'load'; console.log(obj); return obj; } return null; } ''', outputs=gr_read_data ) try: interface.queue(default_concurrency_limit=interface_concurrency_limit).launch(server_name=interface_host, server_port=interface_port, share=is_gui_shared) except OSError as e: print(f'Connection error: {e}') except socket.error as e: print(f'Socket error: {e}') except KeyboardInterrupt: print('Server interrupted by user. Shutting down...') except Exception as e: print(f'An unexpected error occurred: {e}')