Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
""" | |
Audiobook Generator with Web GUI - Complete Version with Cover Art Preview | |
Uses espeak-ng for TTS and Calibre for ebook conversion | |
Author: AI Assistant | |
Date: 2025-06-05 | |
""" | |
import os | |
import sys | |
import subprocess | |
import tempfile | |
import shutil | |
import json | |
import re | |
from pathlib import Path | |
from typing import List, Dict, Optional, Tuple | |
import threading | |
import time | |
import wave | |
import io | |
import base64 | |
from dataclasses import dataclass | |
from urllib.parse import quote | |
import zipfile | |
import xml.etree.ElementTree as ET | |
from html.parser import HTMLParser | |
import html2text | |
from PIL import Image | |
import mutagen | |
from mutagen.id3 import ID3, APIC, TIT2, TPE1, TALB | |
from mutagen.mp4 import MP4, MP4Cover | |
from mutagen.flac import FLAC | |
from mutagen.oggvorbis import OggVorbis | |
import gradio as gr | |
from pydub import AudioSegment | |
from pydub.utils import make_chunks | |
# Global variables for progress tracking | |
current_progress = {"value": 0, "status": "Ready", "total": 0} | |
generation_thread = None | |
stop_generation = False | |
class AudiobookConfig: | |
"""Configuration for audiobook generation""" | |
input_file: str | |
output_dir: str | |
voice: str | |
speed: int | |
pitch: int | |
volume: int | |
output_format: str | |
chapter_split: bool | |
include_cover: bool | |
language: str | |
book_title: str = "Audiobook" | |
author: str = "Unknown Author" | |
class EspeakVoiceManager: | |
"""Manages espeak-ng voices and languages""" | |
def get_available_voices() -> Dict[str, List[Dict]]: | |
"""Get all available espeak-ng voices organized by language""" | |
try: | |
result = subprocess.run(['espeak-ng', '--voices'], | |
capture_output=True, text=True, check=True) | |
voices_by_lang = {} | |
lines = result.stdout.strip().split('\n')[1:] # Skip header | |
for line in lines: | |
if not line.strip(): | |
continue | |
parts = line.split() | |
if len(parts) >= 4: | |
lang_code = parts[1] | |
voice_name = parts[3] if len(parts) > 3 else parts[1] | |
gender = parts[2] if len(parts) > 2 else 'U' | |
if lang_code not in voices_by_lang: | |
voices_by_lang[lang_code] = [] | |
voices_by_lang[lang_code].append({ | |
'name': voice_name, | |
'code': lang_code, | |
'gender': gender, | |
'display': f"{voice_name} ({lang_code}) [{gender}]" | |
}) | |
return voices_by_lang | |
except Exception as e: | |
print(f"Error getting voices: {e}") | |
return {"en": [{"name": "en", "code": "en", "gender": "U", "display": "English (en) [U]"}]} | |
def get_voice_list() -> List[str]: | |
"""Get a flat list of all voices for dropdown""" | |
voices_by_lang = EspeakVoiceManager.get_available_voices() | |
voice_list = [] | |
for lang, voices in sorted(voices_by_lang.items()): | |
for voice in voices: | |
voice_list.append(voice['display']) | |
return voice_list | |
def parse_voice_selection(voice_display: str) -> Tuple[str, str]: | |
"""Parse voice selection to get voice name and language code""" | |
# Extract voice name and language code from display format | |
# Format: "voice_name (lang_code) [gender]" | |
match = re.match(r'([^(]+)\s*\(([^)]+)\)', voice_display) | |
if match: | |
voice_name = match.group(1).strip() | |
lang_code = match.group(2).strip() | |
return voice_name, lang_code | |
return "en", "en" | |
class EPUBParser: | |
"""Direct EPUB parser as fallback to Calibre""" | |
def extract_text_and_metadata(epub_path: str) -> Tuple[str, Optional[str], str, str]: | |
"""Extract text, cover, title, and author from EPUB file""" | |
try: | |
with zipfile.ZipFile(epub_path, 'r') as zip_file: | |
# Find OPF file | |
container_path = 'META-INF/container.xml' | |
if container_path not in zip_file.namelist(): | |
raise Exception("Invalid EPUB: missing container.xml") | |
container_xml = zip_file.read(container_path).decode('utf-8') | |
container_root = ET.fromstring(container_xml) | |
# Find OPF file path | |
opf_path = None | |
for rootfile in container_root.findall('.//{urn:oasis:names:tc:opendocument:xmlns:container}rootfile'): | |
opf_path = rootfile.get('full-path') | |
break | |
if not opf_path: | |
raise Exception("Could not find OPF file in EPUB") | |
# Parse OPF file | |
opf_content = zip_file.read(opf_path).decode('utf-8') | |
opf_root = ET.fromstring(opf_content) | |
# Extract metadata | |
title = "Unknown Title" | |
author = "Unknown Author" | |
metadata = opf_root.find('.//{http://www.idpf.org/2007/opf}metadata') | |
if metadata is not None: | |
# Extract title | |
title_elem = metadata.find('.//{http://purl.org/dc/elements/1.1/}title') | |
if title_elem is not None and title_elem.text: | |
title = title_elem.text.strip() | |
# Extract author | |
creator_elem = metadata.find('.//{http://purl.org/dc/elements/1.1/}creator') | |
if creator_elem is not None and creator_elem.text: | |
author = creator_elem.text.strip() | |
# Get spine order | |
spine_items = [] | |
spine = opf_root.find('.//{http://www.idpf.org/2007/opf}spine') | |
if spine is not None: | |
for itemref in spine.findall('.//{http://www.idpf.org/2007/opf}itemref'): | |
spine_items.append(itemref.get('idref')) | |
# Get manifest items | |
manifest_items = {} | |
manifest = opf_root.find('.//{http://www.idpf.org/2007/opf}manifest') | |
if manifest is not None: | |
for item in manifest.findall('.//{http://www.idpf.org/2007/opf}item'): | |
item_id = item.get('id') | |
href = item.get('href') | |
media_type = item.get('media-type') | |
manifest_items[item_id] = {'href': href, 'media-type': media_type} | |
# Extract text content | |
full_text = "" | |
opf_dir = os.path.dirname(opf_path) | |
for item_id in spine_items: | |
if item_id in manifest_items: | |
item_href = manifest_items[item_id]['href'] | |
if opf_dir: | |
file_path = f"{opf_dir}/{item_href}" | |
else: | |
file_path = item_href | |
if file_path in zip_file.namelist(): | |
try: | |
content = zip_file.read(file_path).decode('utf-8') | |
# Convert HTML to text | |
h = html2text.HTML2Text() | |
h.ignore_links = True | |
h.ignore_images = True | |
text = h.handle(content) | |
full_text += text + "\n\n" | |
except Exception as e: | |
print(f"Error reading {file_path}: {e}") | |
continue | |
# Try to extract cover | |
cover_path = EPUBParser._find_cover_image(zip_file, opf_root, opf_dir) | |
return full_text.strip(), cover_path, title, author | |
except Exception as e: | |
raise Exception(f"EPUB parsing failed: {e}") | |
def _find_cover_image(zip_file, opf_root, opf_dir) -> Optional[str]: | |
"""Find and extract cover image from EPUB""" | |
try: | |
# Look for cover in metadata | |
metadata = opf_root.find('.//{http://www.idpf.org/2007/opf}metadata') | |
if metadata is not None: | |
for meta in metadata.findall('.//{http://www.idpf.org/2007/opf}meta'): | |
if meta.get('name') == 'cover': | |
cover_id = meta.get('content') | |
break | |
else: | |
cover_id = None | |
if cover_id: | |
# Find cover in manifest | |
manifest = opf_root.find('.//{http://www.idpf.org/2007/opf}manifest') | |
if manifest is not None: | |
for item in manifest.findall('.//{http://www.idpf.org/2007/opf}item'): | |
if item.get('id') == cover_id: | |
cover_href = item.get('href') | |
if opf_dir: | |
cover_path = f"{opf_dir}/{cover_href}" | |
else: | |
cover_path = cover_href | |
if cover_path in zip_file.namelist(): | |
# Extract cover to temp file | |
cover_data = zip_file.read(cover_path) | |
temp_cover = tempfile.NamedTemporaryFile( | |
delete=False, | |
suffix=os.path.splitext(cover_path)[1] | |
) | |
temp_cover.write(cover_data) | |
temp_cover.close() | |
return temp_cover.name | |
# Fallback: look for common cover image names | |
common_cover_names = [ | |
'cover.jpg', 'cover.jpeg', 'cover.png', | |
'Cover.jpg', 'Cover.jpeg', 'Cover.png', | |
'OEBPS/cover.jpg', 'OEBPS/Cover.jpg', | |
'Images/cover.jpg', 'images/cover.jpg' | |
] | |
for name in common_cover_names: | |
if name in zip_file.namelist(): | |
cover_data = zip_file.read(name) | |
temp_cover = tempfile.NamedTemporaryFile( | |
delete=False, | |
suffix=os.path.splitext(name)[1] | |
) | |
temp_cover.write(cover_data) | |
temp_cover.close() | |
return temp_cover.name | |
except Exception as e: | |
print(f"Error extracting cover: {e}") | |
return None | |
class CalibreConverter: | |
"""Handles ebook conversion using Calibre with improved error handling""" | |
def is_calibre_available() -> bool: | |
"""Check if Calibre is installed""" | |
try: | |
result = subprocess.run(['ebook-convert', '--version'], | |
capture_output=True, text=True, check=True) | |
return True | |
except (subprocess.CalledProcessError, FileNotFoundError): | |
return False | |
def get_supported_formats() -> List[str]: | |
"""Get list of ebook formats supported by Calibre""" | |
return ['.epub', '.mobi', '.azw', '.azw3', '.fb2', '.lit', '.lrf', | |
'.pdb', '.pdf', '.txt', '.rtf', '.odt', '.docx', '.html'] | |
def extract_metadata(input_file: str) -> Tuple[str, str]: | |
"""Extract title and author from ebook""" | |
try: | |
result = subprocess.run([ | |
'ebook-meta', input_file | |
], capture_output=True, text=True, check=True) | |
title = "Unknown Title" | |
author = "Unknown Author" | |
for line in result.stdout.split('\n'): | |
if line.startswith('Title'): | |
title = line.split(':', 1)[1].strip() | |
elif line.startswith('Author(s)'): | |
author = line.split(':', 1)[1].strip() | |
return title, author | |
except Exception as e: | |
print(f"Could not extract metadata: {e}") | |
return "Unknown Title", "Unknown Author" | |
def extract_cover(input_file: str, output_dir: str) -> Optional[str]: | |
"""Extract cover image from ebook""" | |
try: | |
cover_path = os.path.join(output_dir, "cover.jpg") | |
# Try ebook-meta first | |
result = subprocess.run([ | |
'ebook-meta', input_file, '--get-cover', cover_path | |
], capture_output=True, text=True) | |
if result.returncode == 0 and os.path.exists(cover_path): | |
return cover_path | |
# If that fails, try alternative method for EPUB | |
if input_file.lower().endswith('.epub'): | |
try: | |
_, epub_cover, _, _ = EPUBParser.extract_text_and_metadata(input_file) | |
if epub_cover: | |
shutil.copy2(epub_cover, cover_path) | |
os.unlink(epub_cover) # Clean up temp file | |
return cover_path | |
except: | |
pass | |
except Exception as e: | |
print(f"Could not extract cover: {e}") | |
return None | |
def convert_to_text(input_file: str, output_dir: str) -> Tuple[str, Optional[str], str, str]: | |
"""Convert ebook to text and extract cover with metadata""" | |
txt_path = os.path.join(output_dir, "converted_text.txt") | |
cover_path = None | |
title = "Unknown Title" | |
author = "Unknown Author" | |
# Try Calibre first | |
if CalibreConverter.is_calibre_available(): | |
try: | |
# Extract metadata first | |
title, author = CalibreConverter.extract_metadata(input_file) | |
# Try different conversion parameters | |
conversion_attempts = [ | |
# Standard conversion | |
['ebook-convert', input_file, txt_path], | |
# With additional options | |
['ebook-convert', input_file, txt_path, | |
'--no-default-epub-cover', '--max-line-length=0'], | |
# More permissive conversion | |
['ebook-convert', input_file, txt_path, | |
'--no-default-epub-cover', '--max-line-length=0', | |
'--disable-markup-chapter-headings', '--linearize-tables'], | |
] | |
conversion_success = False | |
last_error = None | |
for cmd in conversion_attempts: | |
try: | |
print(f"Trying conversion with: {' '.join(cmd)}") | |
result = subprocess.run( | |
cmd, | |
capture_output=True, | |
text=True, | |
check=True, | |
timeout=300 # 5 minute timeout | |
) | |
conversion_success = True | |
break | |
except subprocess.TimeoutExpired: | |
last_error = "Conversion timed out" | |
continue | |
except subprocess.CalledProcessError as e: | |
last_error = f"Command failed with exit code {e.returncode}: {e.stderr}" | |
print(f"Conversion attempt failed: {last_error}") | |
continue | |
if conversion_success and os.path.exists(txt_path): | |
# Extract cover | |
cover_path = CalibreConverter.extract_cover(input_file, output_dir) | |
return txt_path, cover_path, title, author | |
else: | |
print(f"All Calibre conversion attempts failed. Last error: {last_error}") | |
except Exception as e: | |
print(f"Calibre conversion error: {e}") | |
# Fallback methods for different formats | |
file_ext = os.path.splitext(input_file)[1].lower() | |
if file_ext == '.epub': | |
try: | |
print("Falling back to direct EPUB parsing...") | |
text_content, cover_path, title, author = EPUBParser.extract_text_and_metadata(input_file) | |
with open(txt_path, 'w', encoding='utf-8') as f: | |
f.write(text_content) | |
return txt_path, cover_path, title, author | |
except Exception as e: | |
raise Exception(f"EPUB fallback parsing failed: {e}") | |
elif file_ext == '.txt': | |
# Just copy the text file | |
shutil.copy2(input_file, txt_path) | |
# Try to guess title from filename | |
title = os.path.splitext(os.path.basename(input_file))[0].replace('_', ' ').replace('-', ' ').title() | |
return txt_path, None, title, author | |
elif file_ext == '.html' or file_ext == '.htm': | |
try: | |
print("Converting HTML to text...") | |
with open(input_file, 'r', encoding='utf-8') as f: | |
html_content = f.read() | |
h = html2text.HTML2Text() | |
h.ignore_links = True | |
h.ignore_images = True | |
text_content = h.handle(html_content) | |
with open(txt_path, 'w', encoding='utf-8') as f: | |
f.write(text_content) | |
# Try to extract title from HTML | |
title_match = re.search(r'<title>(.*?)</title>', html_content, re.IGNORECASE) | |
if title_match: | |
title = title_match.group(1).strip() | |
return txt_path, None, title, author | |
except Exception as e: | |
raise Exception(f"HTML conversion failed: {e}") | |
else: | |
raise Exception(f"Unsupported format: {file_ext}. Calibre is required for this format but conversion failed.") | |
class TextProcessor: | |
"""Handles text processing and chapter splitting""" | |
def read_text_file(file_path: str) -> str: | |
"""Read text file with encoding detection""" | |
encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252', 'iso-8859-1'] | |
for encoding in encodings: | |
try: | |
with open(file_path, 'r', encoding=encoding) as f: | |
content = f.read() | |
# Validate that we got reasonable text | |
if len(content.strip()) > 0: | |
return content | |
except (UnicodeDecodeError, UnicodeError): | |
continue | |
raise Exception("Could not decode text file with supported encodings") | |
def split_into_chapters(text: str) -> List[Tuple[str, str]]: | |
"""Split text into chapters with improved detection""" | |
# Enhanced chapter patterns | |
chapter_patterns = [ | |
r'^Chapter\s+\d+.*$', | |
r'^CHAPTER\s+\d+.*$', | |
r'^Chapter\s+[IVXLCDM]+.*$', # Roman numerals | |
r'^CHAPTER\s+[IVXLCDM]+.*$', | |
r'^\d+\.\s+.*$', | |
r'^Part\s+\d+.*$', | |
r'^PART\s+\d+.*$', | |
r'^Book\s+\d+.*$', | |
r'^BOOK\s+\d+.*$', | |
r'^\*\*\*.*\*\*\*$', # Markdown-style separators | |
r'^---.*---$', | |
r'^###\s+.*$', # Markdown headers | |
r'^##\s+.*$', | |
r'^#\s+.*$', | |
] | |
chapters = [] | |
current_chapter = "" | |
current_title = "Introduction" | |
lines = text.split('\n') | |
for line in lines: | |
line_stripped = line.strip() | |
if not line_stripped: | |
current_chapter += '\n' | |
continue | |
# Check if this line is a chapter heading | |
is_chapter = False | |
for pattern in chapter_patterns: | |
if re.match(pattern, line_stripped, re.IGNORECASE): | |
is_chapter = True | |
break | |
# Also check for lines that are short and standalone (potential titles) | |
if not is_chapter and len(line_stripped) < 100 and len(line_stripped) > 5: | |
# Check if next few lines are empty or start content | |
words = line_stripped.split() | |
if len(words) <= 10 and any(word.lower() in ['chapter', 'part', 'book', 'prologue', 'epilogue'] for word in words): | |
is_chapter = True | |
if is_chapter and current_chapter.strip(): | |
# Save previous chapter | |
chapters.append((current_title, current_chapter.strip())) | |
current_chapter = "" | |
current_title = line_stripped | |
else: | |
current_chapter += line + '\n' | |
# Add final chapter | |
if current_chapter.strip(): | |
chapters.append((current_title, current_chapter.strip())) | |
# If no chapters found or only one very large chapter, treat as single book | |
if len(chapters) <= 1: | |
return [("Complete Book", text)] | |
return chapters | |
def clean_text_for_speech(text: str) -> str: | |
"""Clean text for better speech synthesis""" | |
# Remove excessive whitespace | |
text = re.sub(r'\s+', ' ', text) | |
# Handle common abbreviations | |
abbreviations = { | |
'Mr.': 'Mister', | |
'Mrs.': 'Missus', | |
'Ms.': 'Miss', | |
'Dr.': 'Doctor', | |
'Prof.': 'Professor', | |
'etc.': 'etcetera', | |
'vs.': 'versus', | |
'e.g.': 'for example', | |
'i.e.': 'that is', | |
'Inc.': 'Incorporated', | |
'Ltd.': 'Limited', | |
'Co.': 'Company', | |
} | |
for abbr, full in abbreviations.items(): | |
text = text.replace(abbr, full) | |
# Handle numbers and dates | |
text = re.sub(r'\b(\d{1,2})/(\d{1,2})/(\d{4})\b', r'\1 slash \2 slash \3', text) | |
text = re.sub(r'\b(\d+)\.(\d+)\b', r'\1 point \2', text) | |
# Add pauses for punctuation | |
text = text.replace('.', '. ') | |
text = text.replace(',', ', ') | |
text = text.replace(';', '; ') | |
text = text.replace(':', ': ') | |
text = text.replace('!', '! ') | |
text = text.replace('?', '? ') | |
# Remove markdown formatting | |
text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) # Bold | |
text = re.sub(r'\*(.*?)\*', r'\1', text) # Italic | |
text = re.sub(r'`(.*?)`', r'\1', text) # Code | |
text = re.sub(r'#{1,6}\s*(.*)', r'\1', text) # Headers | |
# Remove URLs | |
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) | |
# Remove multiple spaces | |
text = re.sub(r'\s+', ' ', text) | |
return text.strip() | |
class AudioMetadataManager: | |
"""Handles adding metadata and cover art to audio files""" | |
def add_metadata(audio_file: str, title: str, author: str, cover_file: Optional[str] = None): | |
"""Add metadata and cover art to audio file using mutagen""" | |
try: | |
file_ext = os.path.splitext(audio_file)[1].lower() | |
if file_ext == '.mp3': | |
AudioMetadataManager._add_mp3_metadata(audio_file, title, author, cover_file) | |
elif file_ext == '.m4a': | |
AudioMetadataManager._add_m4a_metadata(audio_file, title, author, cover_file) | |
elif file_ext == '.flac': | |
AudioMetadataManager._add_flac_metadata(audio_file, title, author, cover_file) | |
elif file_ext == '.ogg': | |
AudioMetadataManager._add_ogg_metadata(audio_file, title, author, cover_file) | |
# WAV doesn't support metadata | |
except Exception as e: | |
print(f"Error adding metadata to {audio_file}: {e}") | |
def _add_mp3_metadata(audio_file: str, title: str, author: str, cover_file: Optional[str]): | |
"""Add metadata to MP3 file""" | |
try: | |
audio = ID3(audio_file) | |
audio.add(TIT2(encoding=3, text=title)) | |
audio.add(TPE1(encoding=3, text=author)) | |
audio.add(TALB(encoding=3, text="Audiobook")) | |
if cover_file and os.path.exists(cover_file): | |
with open(cover_file, 'rb') as f: | |
cover_data = f.read() | |
# Determine MIME type | |
mime_type = 'image/jpeg' | |
if cover_file.lower().endswith('.png'): | |
mime_type = 'image/png' | |
audio.add(APIC( | |
encoding=3, | |
mime=mime_type, | |
type=3, # Cover (front) | |
desc='Cover', | |
data=cover_data | |
)) | |
audio.save() | |
except Exception as e: | |
print(f"Error adding MP3 metadata: {e}") | |
def _add_m4a_metadata(audio_file: str, title: str, author: str, cover_file: Optional[str]): | |
"""Add metadata to M4A file""" | |
try: | |
audio = MP4(audio_file) | |
audio['\xa9nam'] = title | |
audio['\xa9ART'] = author | |
audio['\xa9alb'] = "Audiobook" | |
if cover_file and os.path.exists(cover_file): | |
with open(cover_file, 'rb') as f: | |
cover_data = f.read() | |
if cover_file.lower().endswith('.png'): | |
audio['covr'] = [MP4Cover(cover_data, MP4Cover.FORMAT_PNG)] | |
else: | |
audio['covr'] = [MP4Cover(cover_data, MP4Cover.FORMAT_JPEG)] | |
audio.save() | |
except Exception as e: | |
print(f"Error adding M4A metadata: {e}") | |
def _add_flac_metadata(audio_file: str, title: str, author: str, cover_file: Optional[str]): | |
"""Add metadata to FLAC file""" | |
try: | |
audio = FLAC(audio_file) | |
audio['TITLE'] = title | |
audio['ARTIST'] = author | |
audio['ALBUM'] = "Audiobook" | |
if cover_file and os.path.exists(cover_file): | |
# FLAC cover art is more complex, skip for now | |
pass | |
audio.save() | |
except Exception as e: | |
print(f"Error adding FLAC metadata: {e}") | |
def _add_ogg_metadata(audio_file: str, title: str, author: str, cover_file: Optional[str]): | |
"""Add metadata to OGG file""" | |
try: | |
audio = OggVorbis(audio_file) | |
audio['TITLE'] = title | |
audio['ARTIST'] = author | |
audio['ALBUM'] = "Audiobook" | |
# OGG cover art is complex, skip for now | |
audio.save() | |
except Exception as e: | |
print(f"Error adding OGG metadata: {e}") | |
class AudiobookGenerator: | |
"""Main audiobook generation class with improved error handling""" | |
def __init__(self): | |
self.temp_dir = None | |
self.config = None | |
def generate_speech(self, text: str, output_file: str, voice: str, | |
speed: int, pitch: int, volume: int) -> bool: | |
"""Generate speech from text using espeak-ng with error handling""" | |
try: | |
# Split very long text into smaller chunks | |
max_chunk_size = 5000 # characters | |
if len(text) > max_chunk_size: | |
return self._generate_speech_chunked(text, output_file, voice, speed, pitch, volume) | |
cmd = [ | |
'espeak-ng', | |
'-v', voice, | |
'-s', str(speed), | |
'-p', str(pitch), | |
'-a', str(volume), | |
'-w', output_file, | |
text | |
] | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | |
if result.returncode != 0: | |
print(f"Espeak error: {result.stderr}") | |
return False | |
return os.path.exists(output_file) and os.path.getsize(output_file) > 0 | |
except subprocess.TimeoutExpired: | |
print("Speech generation timed out") | |
return False | |
except Exception as e: | |
print(f"Error generating speech: {e}") | |
return False | |
def _generate_speech_chunked(self, text: str, output_file: str, voice: str, | |
speed: int, pitch: int, volume: int) -> bool: | |
"""Generate speech for long text by splitting into chunks""" | |
try: | |
# Split text into sentences | |
sentences = re.split(r'[.!?]+', text) | |
chunks = [] | |
current_chunk = "" | |
for sentence in sentences: | |
sentence = sentence.strip() | |
if not sentence: | |
continue | |
if len(current_chunk) + len(sentence) > 4000: # Leave room for punctuation | |
if current_chunk: | |
chunks.append(current_chunk.strip() + '.') | |
current_chunk = sentence | |
else: | |
if current_chunk: | |
current_chunk += '. ' + sentence | |
else: | |
current_chunk = sentence | |
if current_chunk: | |
chunks.append(current_chunk.strip() + '.') | |
# Generate audio for each chunk | |
temp_files = [] | |
for i, chunk in enumerate(chunks): | |
temp_file = f"{output_file}_chunk_{i}.wav" | |
cmd = [ | |
'espeak-ng', | |
'-v', voice, | |
'-s', str(speed), | |
'-p', str(pitch), | |
'-a', str(volume), | |
'-w', temp_file, | |
chunk | |
] | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) | |
if result.returncode == 0 and os.path.exists(temp_file): | |
temp_files.append(temp_file) | |
else: | |
print(f"Failed to generate chunk {i}: {result.stderr}") | |
# Combine chunks | |
if temp_files: | |
self._combine_wav_files(temp_files, output_file) | |
# Clean up temp files | |
for temp_file in temp_files: | |
try: | |
os.unlink(temp_file) | |
except: | |
pass | |
return os.path.exists(output_file) | |
return False | |
except Exception as e: | |
print(f"Error in chunked speech generation: {e}") | |
return False | |
def _combine_wav_files(self, wav_files: List[str], output_file: str): | |
"""Combine multiple WAV files into one""" | |
try: | |
combined = AudioSegment.empty() | |
for wav_file in wav_files: | |
if os.path.exists(wav_file): | |
audio = AudioSegment.from_wav(wav_file) | |
combined += audio | |
# Add small pause between chunks | |
combined += AudioSegment.silent(duration=200) # 200ms | |
combined.export(output_file, format="wav") | |
except Exception as e: | |
print(f"Error combining WAV files: {e}") | |
# Fallback: just copy the first file | |
if wav_files and os.path.exists(wav_files[0]): | |
shutil.copy2(wav_files[0], output_file) | |
def convert_audio_format(self, input_file: str, output_file: str, | |
format_type: str) -> bool: | |
"""Convert audio to different formats""" | |
try: | |
audio = AudioSegment.from_wav(input_file) | |
if format_type.lower() == 'mp3': | |
audio.export(output_file, format='mp3', bitrate='128k') | |
elif format_type.lower() == 'ogg': | |
audio.export(output_file, format='ogg') | |
elif format_type.lower() == 'flac': | |
audio.export(output_file, format='flac') | |
elif format_type.lower() == 'm4a': | |
audio.export(output_file, format='mp4', codec='aac') | |
else: # wav | |
shutil.copy2(input_file, output_file) | |
return True | |
except Exception as e: | |
print(f"Error converting audio format: {e}") | |
return False | |
def generate_audiobook(self, config: AudiobookConfig, | |
progress_callback=None) -> Dict: | |
"""Main audiobook generation function with improved error handling""" | |
global current_progress, stop_generation | |
self.config = config | |
results = {"success": False, "files": [], "error": None, "metadata": {}} | |
try: | |
# Create temporary directory | |
self.temp_dir = tempfile.mkdtemp() | |
# Update progress | |
current_progress.update({"status": "Processing input file...", "value": 5}) | |
if progress_callback: | |
progress_callback(current_progress) | |
# Convert ebook to text if needed | |
text_content = "" | |
cover_file = None | |
title = config.book_title | |
author = config.author | |
file_ext = os.path.splitext(config.input_file)[1].lower() | |
if file_ext == '.txt': | |
text_content = TextProcessor.read_text_file(config.input_file) | |
# Try to guess title from filename | |
title = os.path.splitext(os.path.basename(config.input_file))[0].replace('_', ' ').replace('-', ' ').title() | |
else: | |
try: | |
text_file, cover_file, title, author = CalibreConverter.convert_to_text( | |
config.input_file, self.temp_dir | |
) | |
text_content = TextProcessor.read_text_file(text_file) | |
except Exception as e: | |
raise Exception(f"Failed to convert ebook: {str(e)}") | |
# Store metadata | |
results["metadata"] = { | |
"title": title, | |
"author": author, | |
"cover_file": cover_file | |
} | |
if stop_generation: | |
return {"success": False, "error": "Generation stopped by user"} | |
# Validate text content | |
if not text_content or len(text_content.strip()) < 10: | |
raise Exception("No readable text content found in the file") | |
# Split into chapters if requested | |
current_progress.update({"status": "Processing text...", "value": 15}) | |
if progress_callback: | |
progress_callback(current_progress) | |
if config.chapter_split: | |
chapters = TextProcessor.split_into_chapters(text_content) | |
else: | |
chapters = [("Complete Audiobook", text_content)] | |
current_progress.update({"total": len(chapters)}) | |
# Parse voice selection | |
voice_name, lang_code = EspeakVoiceManager.parse_voice_selection(config.voice) | |
# Generate audio for each chapter | |
audio_files = [] | |
for i, (chapter_title, chapter_text) in enumerate(chapters): | |
if stop_generation: | |
return {"success": False, "error": "Generation stopped by user"} | |
current_progress.update({ | |
"status": f"Generating audio for: {chapter_title[:50]}...", | |
"value": 20 + (i * 60 // len(chapters)) | |
}) | |
if progress_callback: | |
progress_callback(current_progress) | |
# Clean text for speech | |
clean_text = TextProcessor.clean_text_for_speech(chapter_text) | |
if not clean_text.strip() or len(clean_text.strip()) < 5: | |
print(f"Skipping empty chapter: {chapter_title}") | |
continue | |
# Generate speech | |
wav_file = os.path.join(self.temp_dir, f"chapter_{i:03d}.wav") | |
success = self.generate_speech( | |
clean_text, wav_file, voice_name, | |
config.speed, config.pitch, config.volume | |
) | |
if not success: | |
print(f"Failed to generate audio for chapter: {chapter_title}") | |
continue | |
audio_files.append({ | |
"wav_file": wav_file, | |
"title": chapter_title, | |
"index": i | |
}) | |
if stop_generation: | |
return {"success": False, "error": "Generation stopped by user"} | |
if not audio_files: | |
raise Exception("No audio files were successfully generated") | |
# Create final audiobook file | |
current_progress.update({"status": "Creating final audiobook...", "value": 85}) | |
if progress_callback: | |
progress_callback(current_progress) | |
# Always create a single combined file | |
safe_title = self._sanitize_filename(title) | |
final_filename = f"{safe_title}.{config.output_format}" | |
final_file = os.path.join(config.output_dir, final_filename) | |
# Combine all audio files | |
self._combine_audio_files([f["wav_file"] for f in audio_files], final_file, config.output_format) | |
if not os.path.exists(final_file): | |
raise Exception("Failed to create final audiobook file") | |
# Add metadata and cover art | |
if config.include_cover: | |
AudioMetadataManager.add_metadata(final_file, title, author, cover_file) | |
current_progress.update({"status": "Complete!", "value": 100}) | |
if progress_callback: | |
progress_callback(current_progress) | |
results["success"] = True | |
results["files"] = [{ | |
"file": final_file, | |
"title": title, | |
"size": os.path.getsize(final_file) | |
}] | |
# Also create chapter files if requested | |
if config.chapter_split and len(chapters) > 1: | |
chapter_files = [] | |
for audio_info in audio_files: | |
safe_chapter_title = self._sanitize_filename(audio_info["title"]) | |
chapter_filename = f"{audio_info['index']:03d}_{safe_chapter_title}.{config.output_format}" | |
chapter_file = os.path.join(config.output_dir, chapter_filename) | |
if self.convert_audio_format(audio_info["wav_file"], chapter_file, config.output_format): | |
if config.include_cover: | |
AudioMetadataManager.add_metadata(chapter_file, audio_info["title"], author, cover_file) | |
chapter_files.append({ | |
"file": chapter_file, | |
"title": audio_info["title"], | |
"size": os.path.getsize(chapter_file) | |
}) | |
results["files"].extend(chapter_files) | |
except Exception as e: | |
error_msg = str(e) | |
results["error"] = error_msg | |
current_progress.update({"status": f"Error: {error_msg}", "value": 0}) | |
if progress_callback: | |
progress_callback(current_progress) | |
finally: | |
# Cleanup temp directory | |
if self.temp_dir and os.path.exists(self.temp_dir): | |
try: | |
shutil.rmtree(self.temp_dir) | |
except: | |
pass | |
return results | |
def _sanitize_filename(self, filename: str) -> str: | |
"""Sanitize filename for filesystem compatibility""" | |
# Remove invalid characters | |
sanitized = re.sub(r'[<>:"/\\|?*]', '', filename) | |
# Replace spaces with underscores | |
sanitized = re.sub(r'\s+', '_', sanitized) | |
# Remove extra underscores | |
sanitized = re.sub(r'_+', '_', sanitized) | |
# Limit length | |
return sanitized[:50].strip('_') | |
def _combine_audio_files(self, audio_files: List[str], output_file: str, format_type: str): | |
"""Combine multiple audio files into one""" | |
try: | |
combined = AudioSegment.empty() | |
for audio_file in audio_files: | |
if os.path.exists(audio_file): | |
audio = AudioSegment.from_wav(audio_file) | |
combined += audio | |
# Add small pause between chapters | |
combined += AudioSegment.silent(duration=1000) # 1 second | |
# Export combined audio | |
if format_type.lower() == 'mp3': | |
combined.export(output_file, format='mp3', bitrate='128k') | |
elif format_type.lower() == 'ogg': | |
combined.export(output_file, format='ogg') | |
elif format_type.lower() == 'flac': | |
combined.export(output_file, format='flac') | |
elif format_type.lower() == 'm4a': | |
combined.export(output_file, format='mp4', codec='aac') | |
else: | |
combined.export(output_file, format='wav') | |
except Exception as e: | |
print(f"Error combining audio files: {e}") | |
raise | |
# Gradio Interface Functions | |
def check_dependencies(): | |
"""Check if required dependencies are available""" | |
deps = { | |
"espeak-ng": False, | |
"calibre": False, | |
"ffmpeg": False | |
} | |
try: | |
subprocess.run(['espeak-ng', '--version'], capture_output=True, check=True) | |
deps["espeak-ng"] = True | |
except: | |
pass | |
try: | |
subprocess.run(['ebook-convert', '--version'], capture_output=True, check=True) | |
deps["calibre"] = True | |
except: | |
pass | |
try: | |
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) | |
deps["ffmpeg"] = True | |
except: | |
pass | |
return deps | |
def get_dependency_status(): | |
"""Get formatted dependency status""" | |
deps = check_dependencies() | |
status = [] | |
for dep, available in deps.items(): | |
icon = "✅" if available else "❌" | |
status.append(f"{icon} {dep}") | |
return " | ".join(status) | |
def generate_audiobook_wrapper(input_file, voice, speed, pitch, volume, | |
output_format, chapter_split, include_cover, | |
progress=gr.Progress()): | |
"""Wrapper function for Gradio interface""" | |
global current_progress, stop_generation, generation_thread | |
if not input_file: | |
return "❌ Please select an input file", "", [], None, None | |
# Reset stop flag | |
stop_generation = False | |
# Create output directory | |
output_dir = os.path.join(os.getcwd(), "audiobook_output") | |
os.makedirs(output_dir, exist_ok=True) | |
# Create config | |
config = AudiobookConfig( | |
input_file=input_file.name, | |
output_dir=output_dir, | |
voice=voice, | |
speed=speed, | |
pitch=pitch, | |
volume=volume, | |
output_format=output_format, | |
chapter_split=chapter_split, | |
include_cover=include_cover, | |
language="auto" | |
) | |
# Generate audiobook | |
generator = AudiobookGenerator() | |
def progress_callback(prog_data): | |
if prog_data["total"] > 0: | |
progress_val = prog_data["value"] / 100.0 | |
progress(progress_val, desc=prog_data["status"]) | |
# Run generation in current thread (Gradio handles threading) | |
results = generator.generate_audiobook(config, progress_callback) | |
if results["success"]: | |
file_list = [] | |
total_size = 0 | |
for file_info in results["files"]: | |
file_list.append(file_info["file"]) | |
total_size += file_info["size"] | |
size_mb = total_size / (1024 * 1024) | |
status_msg = f"✅ Generated {len(file_list)} audio file(s) ({size_mb:.1f} MB total)" | |
# Get metadata for display | |
metadata = results.get("metadata", {}) | |
title = metadata.get("title", "Unknown Title") | |
author = metadata.get("author", "Unknown Author") | |
cover_file = metadata.get("cover_file") | |
metadata_display = f"**Title:** {title}\n**Author:** {author}" | |
return status_msg, f"Output directory: {output_dir}", file_list, metadata_display, cover_file | |
else: | |
error_msg = f"❌ Generation failed: {results.get('error', 'Unknown error')}" | |
return error_msg, "", [], "", None | |
def stop_generation_wrapper(): | |
"""Stop the current generation process""" | |
global stop_generation | |
stop_generation = True | |
return "⏹️ Stopping generation..." | |
def get_file_info(file_path): | |
"""Get information about uploaded file with cover preview""" | |
if not file_path: | |
return "No file selected", None, "" | |
try: | |
file_size = os.path.getsize(file_path.name) | |
size_mb = file_size / (1024 * 1024) | |
file_ext = os.path.splitext(file_path.name)[1].lower() | |
supported_formats = CalibreConverter.get_supported_formats() | |
is_supported = file_ext in supported_formats or file_ext == '.txt' | |
support_status = "✅ Supported" if is_supported else "❌ Not supported" | |
deps = check_dependencies() | |
if file_ext != '.txt' and not deps["calibre"] and file_ext == '.epub': | |
support_status += " (Direct EPUB parsing available)" | |
elif file_ext != '.txt' and not deps["calibre"]: | |
support_status = "❌ Requires Calibre" | |
file_info_text = f"File: {os.path.basename(file_path.name)}\nSize: {size_mb:.1f} MB\nFormat: {file_ext}\nStatus: {support_status}" | |
# Try to extract cover and metadata for preview | |
cover_image = None | |
metadata_text = "" | |
if file_ext == '.epub': | |
try: | |
temp_dir = tempfile.mkdtemp() | |
try: | |
if deps["calibre"]: | |
title, author = CalibreConverter.extract_metadata(file_path.name) | |
cover_path = CalibreConverter.extract_cover(file_path.name, temp_dir) | |
else: | |
# Use direct EPUB parsing | |
_, cover_path, title, author = EPUBParser.extract_text_and_metadata(file_path.name) | |
metadata_text = f"**Title:** {title}\n**Author:** {author}" | |
if cover_path and os.path.exists(cover_path): | |
cover_image = cover_path | |
except Exception as e: | |
print(f"Error extracting metadata: {e}") | |
finally: | |
# Don't clean up temp_dir yet if we have a cover image | |
if not cover_image: | |
try: | |
shutil.rmtree(temp_dir) | |
except: | |
pass | |
except Exception as e: | |
print(f"Error reading EPUB: {e}") | |
return file_info_text, cover_image, metadata_text | |
except Exception as e: | |
return f"Error reading file: {e}", None, "" | |
# Create Gradio Interface | |
def create_interface(): | |
"""Create the Gradio web interface""" | |
# Get available voices | |
voices = EspeakVoiceManager.get_voice_list() | |
if not voices: | |
voices = ["English (en) [U]"] | |
with gr.Blocks( | |
title="Audiobook Generator - Complete", | |
theme=gr.themes.Soft(), | |
css=""" | |
.dependency-status { font-family: monospace; font-size: 14px; } | |
.file-info { background: #f0f0f0; padding: 10px; border-radius: 5px; } | |
.cover-image { max-width: 200px; max-height: 300px; object-fit: contain; } | |
.metadata-display { background: #e8f4f8; padding: 10px; border-radius: 5px; } | |
""" | |
) as interface: | |
gr.Markdown(""" | |
# 🎧 Audiobook Generator - Complete Version | |
Convert ebooks and text files to audiobooks using espeak-ng text-to-speech. | |
Supports multiple languages, formats, and includes cover art embedding. | |
**Features:** | |
- Single combined audiobook file output (default) | |
- Optional chapter splitting | |
- Cover art preview and embedding | |
- Metadata extraction and display | |
- Multiple output formats with proper metadata | |
""") | |
# Dependency status | |
with gr.Row(): | |
dependency_status = gr.Markdown( | |
f"**Dependencies:** {get_dependency_status()}", | |
elem_classes=["dependency-status"] | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Input Settings") | |
input_file = gr.File( | |
label="Select Ebook or Text File", | |
file_types=[".txt", ".epub", ".mobi", ".azw", ".azw3", ".pdf", | |
".fb2", ".lit", ".pdb", ".rtf", ".odt", ".docx", ".html"], | |
type="filepath" | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
file_info = gr.Markdown("", elem_classes=["file-info"]) | |
book_metadata = gr.Markdown("", elem_classes=["metadata-display"]) | |
with gr.Column(scale=1): | |
cover_preview = gr.Image( | |
label="Cover Art Preview", | |
show_label=True, | |
elem_classes=["cover-image"], | |
interactive=False | |
) | |
gr.Markdown("### Voice Settings") | |
voice = gr.Dropdown( | |
choices=voices, | |
value=voices[0] if voices else "English (en) [U]", | |
label="Voice/Language", | |
info="Select the voice and language for speech synthesis" | |
) | |
with gr.Row(): | |
speed = gr.Slider( | |
minimum=80, maximum=300, value=175, | |
label="Speed (words per minute)", | |
info="Speaking speed" | |
) | |
pitch = gr.Slider( | |
minimum=0, maximum=99, value=50, | |
label="Pitch", | |
info="Voice pitch (0=low, 99=high)" | |
) | |
volume = gr.Slider( | |
minimum=0, maximum=200, value=100, | |
label="Volume", | |
info="Audio volume level" | |
) | |
gr.Markdown("### Output Settings") | |
output_format = gr.Dropdown( | |
choices=["mp3", "wav", "ogg", "flac", "m4a"], | |
value="mp3", | |
label="Output Format", | |
info="Audio file format" | |
) | |
with gr.Row(): | |
chapter_split = gr.Checkbox( | |
label="Also Create Chapter Files", | |
value=False, | |
info="Create individual chapter files in addition to the main audiobook" | |
) | |
include_cover = gr.Checkbox( | |
label="Include Cover Art", | |
value=True, | |
info="Embed cover art in audio files" | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("### Generation") | |
generate_btn = gr.Button( | |
"🎵 Generate Audiobook", | |
variant="primary", | |
size="lg" | |
) | |
stop_btn = gr.Button( | |
"⏹️ Stop Generation", | |
variant="secondary" | |
) | |
status_output = gr.Markdown("") | |
output_info = gr.Markdown("") | |
gr.Markdown("### Book Information") | |
final_metadata = gr.Markdown("", elem_classes=["metadata-display"]) | |
final_cover = gr.Image( | |
label="Final Cover Art", | |
show_label=True, | |
elem_classes=["cover-image"], | |
interactive=False | |
) | |
gr.Markdown("### Generated Files") | |
output_files = gr.File( | |
label="Download Generated Audiobook Files", | |
file_count="multiple", | |
type="filepath", | |
interactive=False | |
) | |
gr.Markdown(""" | |
### Output Behavior | |
**Default:** Creates a single combined audiobook file with proper metadata and cover art. | |
**Chapter Split Enabled:** Creates both the main audiobook file AND individual chapter files. | |
### Troubleshooting | |
**If Calibre conversion fails:** | |
- EPUB files will use direct parsing fallback | |
- HTML files will be converted using html2text | |
- Other formats require Calibre to be installed | |
**For best results:** | |
- Use text or EPUB files for most reliable conversion | |
- Install Calibre for full format support | |
- MP3 and M4A formats support cover art best | |
### Usage Instructions | |
1. **Upload a file**: Select an ebook or text file | |
2. **Preview**: View the extracted cover art and metadata | |
3. **Choose voice**: Select from available espeak-ng voices | |
4. **Adjust settings**: Configure speed, pitch, volume, and format | |
5. **Generate**: Click generate for a single audiobook file | |
6. **Download**: Get your complete audiobook with metadata | |
### Supported Formats | |
- **Input**: TXT, EPUB, MOBI, AZW, AZW3, PDF, FB2, HTML, RTF, ODT, DOCX | |
- **Output**: MP3, WAV, OGG, FLAC, M4A (with metadata support) | |
""") | |
# Event handlers | |
input_file.change( | |
fn=get_file_info, | |
inputs=[input_file], | |
outputs=[file_info, cover_preview, book_metadata] | |
) | |
generate_btn.click( | |
fn=generate_audiobook_wrapper, | |
inputs=[ | |
input_file, voice, speed, pitch, volume, | |
output_format, chapter_split, include_cover | |
], | |
outputs=[status_output, output_info, output_files, final_metadata, final_cover] | |
) | |
stop_btn.click( | |
fn=stop_generation_wrapper, | |
inputs=[], | |
outputs=[status_output] | |
) | |
return interface | |
if __name__ == "__main__": | |
print("🎧 Starting Complete Audiobook Generator...") | |
print(f"Dependencies: {get_dependency_status()}") | |
# Install required Python packages if missing | |
required_packages = ['html2text', 'pillow', 'mutagen'] | |
for package in required_packages: | |
try: | |
if package == 'pillow': | |
import PIL | |
else: | |
__import__(package.replace('-', '_')) | |
except ImportError: | |
print(f"Installing {package}...") | |
subprocess.run([sys.executable, '-m', 'pip', 'install', package]) | |
# Import after installation | |
import html2text | |
from PIL import Image | |
import mutagen | |
# Check critical dependencies | |
deps = check_dependencies() | |
if not deps["espeak-ng"]: | |
print("❌ Error: espeak-ng is required but not found!") | |
print("Please install espeak-ng:") | |
print(" Ubuntu/Debian: sudo apt install espeak-ng") | |
print(" macOS: brew install espeak-ng") | |
print(" Windows: Download from https://github.com/espeak-ng/espeak-ng/releases") | |
sys.exit(1) | |
if not deps["calibre"]: | |
print("⚠️ Warning: Calibre not found. EPUB files will use direct parsing.") | |
print("Install Calibre from: https://calibre-ebook.com/download") | |
if not deps["ffmpeg"]: | |
print("⚠️ Warning: FFmpeg not found. Using mutagen for metadata instead.") | |
try: | |
interface = create_interface() | |
interface.launch( | |
server_name="127.0.0.1", | |
server_port=7860, | |
share=True, | |
show_error=True, | |
quiet=False | |
) | |
except Exception as e: | |
print(f"❌ Error starting interface: {e}") | |
sys.exit(1) |