Spaces:
Sleeping
Sleeping
| # Chapter_Extractor.py - Module-level chapter extraction functions | |
| import os | |
| import re | |
| import sys | |
| import json | |
| import threading | |
| import time | |
| import shutil | |
| import hashlib | |
| import warnings | |
| # Lazy import for PatternManager to speed up ProcessPoolExecutor worker startup on Windows | |
| # The heavy TransateKRtoEN import is deferred until actually needed | |
| _PatternManager = None | |
| _PM = None | |
| def _get_pattern_manager(): | |
| """Lazy initialization of PatternManager to avoid slow imports in worker processes""" | |
| global _PatternManager, _PM | |
| if _PatternManager is None: | |
| from TransateKRtoEN import PatternManager as PM_Class | |
| _PatternManager = PM_Class | |
| _PM = PM_Class() | |
| return _PM | |
| # For backward compatibility - property-like access | |
| class _LazyPM: | |
| def __getattr__(self, name): | |
| return getattr(_get_pattern_manager(), name) | |
| PM = _LazyPM() | |
| from bs4 import BeautifulSoup | |
| try: | |
| from bs4 import XMLParsedAsHTMLWarning | |
| warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) | |
| except ImportError: | |
| pass | |
| from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed | |
| from collections import Counter | |
| # Stop request function (can be overridden) | |
| def is_stop_requested(): | |
| """Check if stop has been requested - default implementation""" | |
| return False | |
| # Progress bar for terminal output | |
| class ProgressBar: | |
| """Simple in-place progress bar for terminal output""" | |
| _last_line_length = 0 | |
| def update(cls, current, total, prefix="Progress", bar_length=30): | |
| if total == 0: | |
| return | |
| percent = min(100, int(100 * current / total)) | |
| filled = int(bar_length * current / total) | |
| bar = '█' * filled + '░' * (bar_length - filled) | |
| line = f"\r{prefix}: [{bar}] {current}/{total} ({percent}%)" | |
| if len(line) < cls._last_line_length: | |
| line += ' ' * (cls._last_line_length - len(line)) | |
| cls._last_line_length = len(line) | |
| print(line, end='', flush=True) | |
| def finish(cls): | |
| print() | |
| cls._last_line_length = 0 | |
| # Helper for resource filename sanitization | |
| def sanitize_resource_filename(filename): | |
| """Sanitize resource filenames to be filesystem-safe""" | |
| import unicodedata | |
| # Normalize unicode - use NFC to preserve Korean/CJK characters | |
| # NFKD decomposes Korean Hangul into jamo components, corrupting them | |
| filename = unicodedata.normalize('NFC', filename) | |
| # Remove or replace problematic characters | |
| filename = re.sub(r'[<>:"/\\|?*]', '_', filename) | |
| return filename | |
| def _get_best_parser(): | |
| """Determine the best parser available, preferring lxml for CJK text""" | |
| try: | |
| import lxml | |
| return 'lxml' | |
| except ImportError: | |
| return 'html.parser' | |
| def _sort_by_opf_spine(chapters, opf_path): | |
| """Sort chapters according to OPF spine order""" | |
| try: | |
| import xml.etree.ElementTree as ET | |
| # Read OPF file | |
| with open(opf_path, 'r', encoding='utf-8') as f: | |
| opf_content = f.read() | |
| # Parse OPF | |
| root = ET.fromstring(opf_content) | |
| # Find namespaces | |
| ns = {'opf': 'http://www.idpf.org/2007/opf'} | |
| if root.tag.startswith('{'): | |
| default_ns = root.tag[1:root.tag.index('}')] | |
| ns = {'opf': default_ns} | |
| # Build manifest map (id -> href) | |
| manifest = {} | |
| for item in root.findall('.//opf:manifest/opf:item', ns): | |
| item_id = item.get('id') | |
| href = item.get('href') | |
| if item_id and href: | |
| manifest[item_id] = href | |
| # Get spine order | |
| spine_order = [] | |
| spine = root.find('.//opf:spine', ns) | |
| if spine is not None: | |
| for itemref in spine.findall('opf:itemref', ns): | |
| idref = itemref.get('idref') | |
| if idref and idref in manifest: | |
| href = manifest[idref] | |
| spine_order.append(href) | |
| if not spine_order: | |
| print("⚠️ No spine order found in OPF, keeping original order") | |
| return chapters | |
| # Create a mapping of filenames to spine position | |
| spine_map = {} | |
| for idx, href in enumerate(spine_order): | |
| # Try different matching strategies | |
| basename = os.path.basename(href) | |
| spine_map[basename] = idx | |
| spine_map[href] = idx | |
| # Also store without extension for flexible matching | |
| name_no_ext = os.path.splitext(basename)[0] | |
| spine_map[name_no_ext] = idx | |
| print(f"📋 OPF spine contains {len(spine_order)} items") | |
| # Sort chapters based on spine order | |
| def get_spine_position(chapter): | |
| # Try to match chapter to spine | |
| filename = chapter.get('filename', '') | |
| basename = chapter.get('original_basename', '') | |
| # Try exact filename match | |
| if filename in spine_map: | |
| return spine_map[filename] | |
| # Try basename match | |
| if basename in spine_map: | |
| return spine_map[basename] | |
| # Try basename of filename | |
| if filename: | |
| fname_base = os.path.basename(filename) | |
| if fname_base in spine_map: | |
| return spine_map[fname_base] | |
| # Try without extension | |
| if basename: | |
| if basename + '.html' in spine_map: | |
| return spine_map[basename + '.html'] | |
| if basename + '.xhtml' in spine_map: | |
| return spine_map[basename + '.xhtml'] | |
| # Fallback to chapter number * 1000 (to sort after spine items) | |
| return 1000000 + chapter.get('num', 0) | |
| # Sort chapters | |
| sorted_chapters = sorted(chapters, key=get_spine_position) | |
| # Renumber chapters based on new order | |
| for idx, chapter in enumerate(sorted_chapters, 1): | |
| chapter['spine_order'] = idx | |
| # Optionally update chapter numbers to match spine order | |
| # chapter['num'] = idx # Uncomment if you want to renumber | |
| # Log reordering info | |
| reordered_count = 0 | |
| for idx, chapter in enumerate(sorted_chapters): | |
| original_idx = chapters.index(chapter) | |
| if original_idx != idx: | |
| reordered_count += 1 | |
| if reordered_count > 0: | |
| print(f"🔄 Reordered {reordered_count} chapters to match OPF spine") | |
| else: | |
| print(f"✅ Chapter order already matches OPF spine") | |
| return sorted_chapters | |
| except Exception as e: | |
| print(f"⚠️ Could not sort by OPF spine: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return chapters | |
| def protect_angle_brackets_with_korean(text: str) -> str: | |
| """Protect CJK text in angle brackets from HTML parsing""" | |
| if text is None: | |
| return "" | |
| import re | |
| # Extended pattern to include Korean, Chinese, and Japanese characters | |
| cjk_pattern = r'[가-힣ㄱ-ㅎㅏ-ㅣ一-龿ぁ-ゟァ-ヿ]' | |
| bracket_pattern = rf'<([^<>]*{cjk_pattern}[^<>]*)>' | |
| def replace_brackets(match): | |
| content = match.group(1) | |
| return f'<{content}>' | |
| return re.sub(bracket_pattern, replace_brackets, text) | |
| def ensure_all_opf_chapters_extracted(zf, chapters, out): | |
| """Ensure ALL chapters from OPF spine are extracted, not just what ChapterExtractor found""" | |
| # Parse OPF to get ALL chapters in spine | |
| opf_chapters = [] | |
| try: | |
| # Find content.opf | |
| opf_content = None | |
| for name in zf.namelist(): | |
| if name.endswith('content.opf'): | |
| opf_content = zf.read(name) | |
| break | |
| if not opf_content: | |
| return chapters # No OPF, return original | |
| import xml.etree.ElementTree as ET | |
| root = ET.fromstring(opf_content) | |
| # Handle namespaces | |
| ns = {'opf': 'http://www.idpf.org/2007/opf'} | |
| if root.tag.startswith('{'): | |
| default_ns = root.tag[1:root.tag.index('}')] | |
| ns = {'opf': default_ns} | |
| # Get manifest | |
| manifest = {} | |
| for item in root.findall('.//opf:manifest/opf:item', ns): | |
| item_id = item.get('id') | |
| href = item.get('href') | |
| media_type = item.get('media-type', '') | |
| if item_id and href and ('html' in media_type.lower() or href.endswith(('.html', '.xhtml', '.htm'))): | |
| manifest[item_id] = href | |
| # Get spine order | |
| spine = root.find('.//opf:spine', ns) | |
| if spine: | |
| for itemref in spine.findall('opf:itemref', ns): | |
| idref = itemref.get('idref') | |
| if idref and idref in manifest: | |
| href = manifest[idref] | |
| filename = os.path.basename(href) | |
| # Skip nav, toc, cover - BUT only if filename has NO numbers | |
| # Files with numbers like 'nav01', 'toc05' are real chapters | |
| import re | |
| has_numbers = bool(re.search(r'\d', filename)) | |
| if not has_numbers and any(skip in filename.lower() for skip in ['nav', 'toc', 'cover']): | |
| continue | |
| opf_chapters.append(href) | |
| print(f"📚 OPF spine contains {len(opf_chapters)} chapters") | |
| # Check which OPF chapters are missing from extraction | |
| extracted_files = set() | |
| for c in chapters: | |
| if 'filename' in c: | |
| extracted_files.add(c['filename']) | |
| if 'original_basename' in c: | |
| extracted_files.add(c['original_basename']) | |
| missing_chapters = [] | |
| for opf_chapter in opf_chapters: | |
| basename = os.path.basename(opf_chapter) | |
| if basename not in extracted_files and opf_chapter not in extracted_files: | |
| missing_chapters.append(opf_chapter) | |
| if missing_chapters: | |
| print(f"⚠️ {len(missing_chapters)} chapters in OPF but not extracted!") | |
| print(f" Missing: {missing_chapters[:5]}{'...' if len(missing_chapters) > 5 else ''}") | |
| # Extract the missing chapters | |
| for href in missing_chapters: | |
| try: | |
| # Read the chapter content | |
| content = zf.read(href).decode('utf-8') | |
| # Extract chapter number | |
| import re | |
| basename = os.path.basename(href) | |
| matches = re.findall(r'(\d+)', basename) | |
| if matches: | |
| chapter_num = int(matches[-1]) | |
| else: | |
| chapter_num = len(chapters) + 1 | |
| # Create chapter entry | |
| from bs4 import BeautifulSoup | |
| parser = 'lxml' if 'lxml' in sys.modules else 'html.parser' | |
| soup = BeautifulSoup(content, parser) | |
| # Get title | |
| title = "Chapter " + str(chapter_num) | |
| title_tag = soup.find('title') | |
| if title_tag: | |
| title = title_tag.get_text().strip() or title | |
| else: | |
| for tag in ['h1', 'h2', 'h3']: | |
| header = soup.find(tag) | |
| if header: | |
| title = header.get_text().strip() or title | |
| break | |
| # Save the chapter file | |
| output_filename = f"chapter_{chapter_num:04d}_{basename}" | |
| output_path = os.path.join(out, output_filename) | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| # Add to chapters list | |
| new_chapter = { | |
| 'num': chapter_num, | |
| 'title': title, | |
| 'body': content, | |
| 'filename': href, | |
| 'original_basename': basename, | |
| 'file_size': len(content), | |
| 'has_images': bool(soup.find_all('img')), | |
| 'detection_method': 'opf_recovery', | |
| 'content_hash': None # Will be calculated later | |
| } | |
| chapters.append(new_chapter) | |
| print(f" ✅ Recovered chapter {chapter_num}: {basename}") | |
| except Exception as e: | |
| print(f" ❌ Failed to extract {href}: {e}") | |
| # Re-sort chapters by number | |
| chapters.sort(key=lambda x: x['num']) | |
| print(f"✅ Total chapters after OPF recovery: {len(chapters)}") | |
| except Exception as e: | |
| print(f"⚠️ Error checking OPF chapters: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return chapters | |
| def extract_chapters(zf, output_dir, parser=None, progress_callback=None, pattern_manager=None): | |
| """Extract chapters and all resources from EPUB using ThreadPoolExecutor | |
| Args: | |
| zf: ZipFile object of the EPUB | |
| output_dir: Output directory for extracted files | |
| parser: BeautifulSoup parser to use ('lxml' or 'html.parser') | |
| progress_callback: Optional callback for progress updates | |
| pattern_manager: Optional PatternManager instance for chapter detection | |
| """ | |
| import time | |
| # Initialize defaults if not provided | |
| if parser is None: | |
| parser = _get_best_parser() | |
| # pattern_manager is no longer used - kept for API compatibility | |
| # Check stop at the very beginning | |
| if is_stop_requested(): | |
| print("❌ Extraction stopped by user") | |
| return [] | |
| print("🚀 Starting EPUB extraction with ThreadPoolExecutor...") | |
| print(f"📄 Using parser: {parser} {'(optimized for CJK)' if parser == 'lxml' else '(standard)'}") | |
| # Initial progress | |
| if progress_callback: | |
| progress_callback("Starting EPUB extraction...") | |
| # First, extract and save content.opf for reference | |
| for name in zf.namelist(): | |
| if name.endswith('.opf'): | |
| try: | |
| opf_content = zf.read(name).decode('utf-8', errors='ignore') | |
| opf_output_path = os.path.join(output_dir, 'content.opf') | |
| with open(opf_output_path, 'w', encoding='utf-8') as f: | |
| f.write(opf_content) | |
| print(f"📋 Saved OPF file: {name} → content.opf") | |
| break | |
| except Exception as e: | |
| print(f"⚠️ Could not save OPF file: {e}") | |
| # Get extraction mode from environment | |
| extraction_mode = os.getenv("EXTRACTION_MODE", "smart").lower() | |
| print(f"✅ Using {extraction_mode.capitalize()} extraction mode") | |
| # Get number of workers from environment or use default | |
| max_workers = int(os.getenv("EXTRACTION_WORKERS", "2")) | |
| print(f"🔧 Using {max_workers} workers for parallel processing") | |
| extracted_resources = _extract_all_resources(zf, output_dir, progress_callback) | |
| # Check stop after resource extraction | |
| if is_stop_requested(): | |
| print("❌ Extraction stopped by user") | |
| return [] | |
| metadata_path = os.path.join(output_dir, 'metadata.json') | |
| if os.path.exists(metadata_path): | |
| print("📋 Loading existing metadata...") | |
| with open(metadata_path, 'r', encoding='utf-8') as f: | |
| metadata = json.load(f) | |
| else: | |
| print("📋 Extracting fresh metadata...") | |
| metadata = _extract_epub_metadata(zf) | |
| print(f"📋 Extracted metadata: {list(metadata.keys())}") | |
| chapters, detected_language = _extract_chapters_universal(zf, extraction_mode, parser, progress_callback, pattern_manager) | |
| # Sort chapters according to OPF spine order if available | |
| opf_path = os.path.join(output_dir, 'content.opf') | |
| if os.path.exists(opf_path) and chapters: | |
| print("📋 Sorting chapters according to OPF spine order...") | |
| chapters = _sort_by_opf_spine(chapters, opf_path) | |
| print(f"✅ Chapters sorted according to OPF reading order") | |
| # Check stop after chapter extraction | |
| if is_stop_requested(): | |
| print("❌ Extraction stopped by user") | |
| return [] | |
| if not chapters: | |
| print("❌ No chapters could be extracted!") | |
| return [] | |
| chapters_info_path = os.path.join(output_dir, 'chapters_info.json') | |
| chapters_info = [] | |
| chapters_info_lock = threading.Lock() | |
| def process_chapter(chapter): | |
| """Process a single chapter""" | |
| # Check stop in worker | |
| if is_stop_requested(): | |
| return None | |
| info = { | |
| 'num': chapter['num'], | |
| 'title': chapter['title'], | |
| 'original_filename': chapter.get('filename', ''), | |
| 'has_images': chapter.get('has_images', False), | |
| 'image_count': chapter.get('image_count', 0), | |
| 'text_length': chapter.get('file_size', len(chapter.get('body', ''))), | |
| 'detection_method': chapter.get('detection_method', 'unknown'), | |
| 'content_hash': chapter.get('content_hash', '') | |
| } | |
| if chapter.get('has_images'): | |
| try: | |
| soup = BeautifulSoup(chapter.get('body', ''), parser) | |
| images = soup.find_all('img') | |
| info['images'] = [img.get('src', '') for img in images] | |
| except: | |
| info['images'] = [] | |
| return info | |
| # Process chapters in parallel | |
| print(f"🔄 Processing {len(chapters)} chapters in parallel...") | |
| if progress_callback: | |
| progress_callback(f"Processing {len(chapters)} chapters...") | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all tasks | |
| future_to_chapter = { | |
| executor.submit(process_chapter, chapter): chapter | |
| for chapter in chapters | |
| } | |
| # Process completed tasks | |
| completed = 0 | |
| for future in as_completed(future_to_chapter): | |
| if is_stop_requested(): | |
| print("❌ Extraction stopped by user") | |
| # Cancel remaining futures | |
| for f in future_to_chapter: | |
| f.cancel() | |
| return [] | |
| try: | |
| result = future.result() | |
| if result: | |
| with chapters_info_lock: | |
| chapters_info.append(result) | |
| completed += 1 | |
| # Yield to GUI periodically (can be disabled for max speed) | |
| if completed % 5 == 0 and os.getenv("ENABLE_GUI_YIELD", "1") == "1": | |
| time.sleep(0.001) | |
| # Progress updates | |
| if completed % 10 == 0 or completed == len(chapters): | |
| if progress_callback: | |
| progress_msg = f"Processed {completed}/{len(chapters)} chapters" | |
| progress_callback(progress_msg) | |
| else: | |
| # Show progress bar in terminal | |
| ProgressBar.update(completed, len(chapters), prefix="📊 Processing metadata") | |
| except Exception as e: | |
| chapter = future_to_chapter[future] | |
| print(f" ❌ Error processing chapter {chapter['num']}: {e}") | |
| # Finish progress bar | |
| if not progress_callback: | |
| ProgressBar.finish() | |
| # Sort chapters_info by chapter number to maintain order | |
| chapters_info.sort(key=lambda x: x['num']) | |
| print(f"✅ Successfully processed {len(chapters_info)} chapters") | |
| with open(chapters_info_path, 'w', encoding='utf-8') as f: | |
| json.dump(chapters_info, f, ensure_ascii=False, indent=2) | |
| print(f"💾 Saved detailed chapter info to: chapters_info.json") | |
| metadata.update({ | |
| 'chapter_count': len(chapters), | |
| 'detected_language': detected_language, | |
| 'extracted_resources': extracted_resources, | |
| 'extraction_mode': extraction_mode, | |
| 'extraction_summary': { | |
| 'total_chapters': len(chapters), | |
| 'chapter_range': f"{chapters[0]['num']}-{chapters[-1]['num']}", | |
| 'resources_extracted': sum(len(files) for files in extracted_resources.values()) | |
| } | |
| }) | |
| metadata['chapter_titles'] = { | |
| str(c['num']): c['title'] for c in chapters | |
| } | |
| with open(metadata_path, 'w', encoding='utf-8') as f: | |
| json.dump(metadata, f, ensure_ascii=False, indent=2) | |
| print(f"💾 Saved comprehensive metadata to: {metadata_path}") | |
| _create_extraction_report(output_dir, metadata, chapters, extracted_resources) | |
| _log_extraction_summary(chapters, extracted_resources, detected_language) | |
| print(f"🔍 VERIFICATION: {extraction_mode.capitalize()} chapter extraction completed successfully") | |
| print(f"⚡ Used {max_workers} workers for parallel processing") | |
| return chapters | |
| def _extract_all_resources(zf, output_dir, progress_callback=None): | |
| """Extract all resources with parallel processing""" | |
| import time | |
| extracted_resources = { | |
| 'css': [], | |
| 'fonts': [], | |
| 'images': [], | |
| 'epub_structure': [], | |
| 'other': [] | |
| } | |
| # Check if already extracted | |
| extraction_marker = os.path.join(output_dir, '.resources_extracted') | |
| if os.path.exists(extraction_marker): | |
| print("📦 Resources already extracted, skipping...") | |
| return _count_existing_resources(output_dir, extracted_resources) | |
| _cleanup_old_resources(output_dir) | |
| # Create directories | |
| for resource_type in ['css', 'fonts', 'images']: | |
| os.makedirs(os.path.join(output_dir, resource_type), exist_ok=True) | |
| # Only print if no callback (avoid duplicates in subprocess) | |
| if not progress_callback: | |
| print(f"📦 Extracting resources in parallel...") | |
| # Get list of files to process | |
| file_list = [f for f in zf.namelist() if not f.endswith('/') and os.path.basename(f)] | |
| # Thread-safe lock for extracted_resources | |
| resource_lock = threading.Lock() | |
| def extract_single_resource(file_path): | |
| if is_stop_requested(): | |
| return None | |
| try: | |
| file_data = zf.read(file_path) | |
| resource_info = _categorize_resource(file_path, os.path.basename(file_path)) | |
| if resource_info: | |
| resource_type, target_dir, safe_filename = resource_info | |
| target_path = os.path.join(output_dir, target_dir, safe_filename) if target_dir else os.path.join(output_dir, safe_filename) | |
| with open(target_path, 'wb') as f: | |
| f.write(file_data) | |
| # Thread-safe update | |
| with resource_lock: | |
| extracted_resources[resource_type].append(safe_filename) | |
| return (resource_type, safe_filename) | |
| except Exception as e: | |
| print(f"[WARNING] Failed to extract {file_path}: {e}") | |
| return None | |
| # Process files in parallel | |
| total_resources = len(file_list) | |
| extracted_count = 0 | |
| # Use same worker count as chapter processing | |
| resource_workers = int(os.getenv("EXTRACTION_WORKERS", "2")) | |
| with ThreadPoolExecutor(max_workers=resource_workers) as executor: | |
| futures = {executor.submit(extract_single_resource, file_path): file_path | |
| for file_path in file_list} | |
| for future in as_completed(futures): | |
| if is_stop_requested(): | |
| executor.shutdown(wait=False) | |
| break | |
| extracted_count += 1 | |
| # Progress update every 20 files | |
| if extracted_count % 20 == 0: | |
| if progress_callback: | |
| progress_callback(f"Extracting resources: {extracted_count}/{total_resources}") | |
| else: | |
| # Print progress bar in terminal | |
| ProgressBar.update(extracted_count, total_resources, prefix="📦 Extracting resources") | |
| # Yield to GUI periodically (can be disabled for max speed) | |
| if extracted_count % 10 == 0 and os.getenv("ENABLE_GUI_YIELD", "1") == "1": | |
| time.sleep(0.001) | |
| result = future.result() | |
| if result: | |
| resource_type, filename = result | |
| # Only print for important resources | |
| if extracted_count < 10 or resource_type in ['css', 'fonts']: | |
| print(f" 📄 Extracted {resource_type}: {filename}") | |
| # Show 100% completion | |
| if progress_callback: | |
| progress_callback(f"Extracting resources: {total_resources}/{total_resources}") | |
| else: | |
| ProgressBar.update(total_resources, total_resources, prefix="📦 Extracting resources") | |
| ProgressBar.finish() | |
| # Mark as complete | |
| with open(extraction_marker, 'w') as f: | |
| f.write(f"Resources extracted at {time.time()}") | |
| _validate_critical_files(output_dir, extracted_resources) | |
| return extracted_resources | |
| def _extract_chapters_universal(zf, extraction_mode="smart", parser=None, progress_callback=None, pattern_manager=None): | |
| """Universal chapter extraction with four modes: smart, comprehensive, full, enhanced | |
| All modes now properly merge Section/Chapter pairs | |
| Enhanced mode uses html2text for superior text processing | |
| Now with parallel processing for improved performance | |
| """ | |
| # Initialize defaults if not provided | |
| if parser is None: | |
| parser = _get_best_parser() | |
| # pattern_manager is no longer used - kept for API compatibility | |
| # Check stop at the beginning | |
| if is_stop_requested(): | |
| print("❌ Chapter extraction stopped by user") | |
| return [], 'unknown' | |
| # Import time for yielding | |
| import time | |
| # Initialize enhanced extractor if using enhanced mode | |
| enhanced_extractor = None | |
| enhanced_filtering = extraction_mode # Default fallback | |
| preserve_structure = True | |
| # Check if user wants to translate special files (info.xhtml, message.xhtml, etc.) | |
| # By default, skip them as they're typically metadata/navigation | |
| translate_special = os.getenv('TRANSLATE_SPECIAL_FILES', '0') == '1' | |
| if translate_special: | |
| print("📝 Special files translation is ENABLED (info.xhtml, message.xhtml, etc.)") | |
| else: | |
| print("📝 Special files translation is DISABLED - skipping navigation/metadata files") | |
| if extraction_mode == "enhanced": | |
| print("🚀 Initializing Enhanced extraction mode with html2text...") | |
| # Get enhanced mode configuration from environment | |
| enhanced_filtering = os.getenv("ENHANCED_FILTERING", "smart") | |
| # Avoid 'full' with html2text to prevent XML declaration artifacts; use 'comprehensive' instead | |
| if str(enhanced_filtering).lower() == 'full': | |
| enhanced_filtering = 'comprehensive' | |
| preserve_structure = os.getenv("ENHANCED_PRESERVE_STRUCTURE", "1") == "1" | |
| print(f" • Enhanced filtering level: {enhanced_filtering}") | |
| print(f" • Preserve structure: {preserve_structure}") | |
| # Try to initialize enhanced extractor | |
| try: | |
| # Import our enhanced extractor (assume it's in the same directory or importable) | |
| from enhanced_text_extractor import EnhancedTextExtractor | |
| enhanced_extractor = EnhancedTextExtractor( | |
| filtering_mode=enhanced_filtering, | |
| preserve_structure=preserve_structure | |
| ) | |
| print("✅ Enhanced text extractor initialized successfully") | |
| except ImportError as e: | |
| print(f"❌ Enhanced text extractor module not found: {e}") | |
| print(f"❌ Cannot use enhanced extraction mode. Please install enhanced_text_extractor or select a different extraction mode.") | |
| raise e | |
| except Exception as e: | |
| print(f"❌ Enhanced extractor initialization failed: {e}") | |
| print(f"❌ Cannot use enhanced extraction mode. Please select a different extraction mode.") | |
| raise e | |
| chapters = [] | |
| sample_texts = [] | |
| # First phase: Collect HTML files | |
| html_files = [] | |
| file_list = zf.namelist() | |
| total_files = len(file_list) | |
| # Update progress for file collection | |
| if progress_callback and total_files > 100: | |
| progress_callback(f"Scanning {total_files} files in EPUB...") | |
| elif total_files > 100 and not progress_callback: | |
| # Print initial message for progress bar (only if no callback) | |
| print(f"📂 Scanning {total_files} files in EPUB...") | |
| for idx, name in enumerate(file_list): | |
| # Check stop while collecting files | |
| if is_stop_requested(): | |
| print("❌ Chapter extraction stopped by user") | |
| return [], 'unknown' | |
| # Yield to GUI every 50 files (can be disabled for max speed) | |
| if idx % 50 == 0 and idx > 0: | |
| if os.getenv("ENABLE_GUI_YIELD", "1") == "1": | |
| time.sleep(0.001) # Brief yield to GUI | |
| if total_files > 100: | |
| if progress_callback: | |
| progress_callback(f"Scanning files: {idx}/{total_files}") | |
| else: | |
| # Print progress bar in terminal | |
| ProgressBar.update(idx, total_files, prefix="📂 Scanning files") | |
| if name.lower().endswith(('.xhtml', '.html', '.htm')): | |
| basename = os.path.basename(name).lower() | |
| # Skip cover files unless special file translation is enabled | |
| if basename in ['cover.html', 'cover.xhtml', 'cover.htm']: | |
| if not translate_special: | |
| print(f"[SKIP] Cover file excluded: {name}") | |
| continue | |
| else: | |
| print(f"[INCLUDE] Cover file included (special files enabled): {name}") | |
| # All filtering is now controlled by TRANSLATE_SPECIAL_FILES toggle and extraction mode | |
| # No hardcoded special file patterns | |
| html_files.append(name) | |
| # Print final 100% progress update before finishing | |
| if total_files > 100: | |
| if progress_callback: | |
| progress_callback(f"Scanning files: {total_files}/{total_files}") | |
| else: | |
| # Show 100% completion | |
| ProgressBar.update(total_files, total_files, prefix="📂 Scanning files") | |
| # Finish progress bar if we were using it | |
| if total_files > 100 and not progress_callback: | |
| ProgressBar.finish() | |
| # Update mode description to include enhanced mode | |
| mode_description = { | |
| "smart": "potential content files", | |
| "comprehensive": "HTML files", | |
| "full": "ALL HTML/XHTML files (no filtering)", | |
| "enhanced": f"files (enhanced with {enhanced_filtering} filtering)" | |
| } | |
| print(f"📚 Found {len(html_files)} {mode_description.get(extraction_mode, 'files')} in EPUB") | |
| # Sort files to ensure proper order | |
| html_files.sort() | |
| # Check if merging is disabled via environment variable | |
| disable_merging = os.getenv("DISABLE_CHAPTER_MERGING", "0") == "1" | |
| processed_files = set() | |
| merge_candidates = {} # Store potential merges without reading files yet | |
| if disable_merging: | |
| print("📌 Chapter merging is DISABLED - processing all files independently") | |
| else: | |
| print("📌 Chapter merging is ENABLED") | |
| # Only do merging logic if not disabled | |
| file_groups = {} | |
| # Group files by their base number to detect Section/Chapter pairs | |
| for file_path in html_files: | |
| filename = os.path.basename(file_path) | |
| # Try different patterns to extract base number | |
| base_num = None | |
| # Pattern 1: "No00014" from "No00014Section.xhtml" | |
| match = re.match(r'(No\d+)', filename) | |
| if match: | |
| base_num = match.group(1) | |
| else: | |
| # Pattern 2: "0014" from "0014_section.html" or "0014_chapter.html" | |
| match = re.match(r'^(\d+)[_\-]', filename) | |
| if match: | |
| base_num = match.group(1) | |
| else: | |
| # Pattern 3: Just numbers at the start | |
| match = re.match(r'^(\d+)', filename) | |
| if match: | |
| base_num = match.group(1) | |
| if base_num: | |
| if base_num not in file_groups: | |
| file_groups[base_num] = [] | |
| file_groups[base_num].append(file_path) | |
| # Identify merge candidates WITHOUT reading files yet | |
| for base_num, group_files in sorted(file_groups.items()): | |
| if len(group_files) == 2: | |
| # Check if we have a Section/Chapter pair based on filenames only | |
| section_file = None | |
| chapter_file = None | |
| for file_path in group_files: | |
| basename = os.path.basename(file_path) | |
| # More strict detection - must have 'section' or 'chapter' in the filename | |
| if 'section' in basename.lower() and 'chapter' not in basename.lower(): | |
| section_file = file_path | |
| elif 'chapter' in basename.lower() and 'section' not in basename.lower(): | |
| chapter_file = file_path | |
| if section_file and chapter_file: | |
| # Store as potential merge candidate | |
| merge_candidates[chapter_file] = section_file | |
| processed_files.add(section_file) | |
| print(f"[DEBUG] Potential merge candidate: {base_num}") | |
| print(f" Section: {os.path.basename(section_file)}") | |
| print(f" Chapter: {os.path.basename(chapter_file)}") | |
| # Filter out section files that were marked for merging | |
| files_to_process = [] | |
| for file_path in html_files: | |
| if not disable_merging and file_path in processed_files: | |
| print(f"[DEBUG] Skipping section file: {file_path}") | |
| continue | |
| files_to_process.append(file_path) | |
| print(f"📚 Processing {len(files_to_process)} files after merge analysis") | |
| if progress_callback: | |
| progress_callback(f"Preparing to process {len(files_to_process)} chapters...") | |
| # Initialize collections for aggregating results | |
| file_size_groups = {} | |
| h1_count = 0 | |
| h2_count = 0 | |
| skipped_files = [] | |
| # Progress tracking | |
| total_files = len(files_to_process) | |
| # Prepare arguments for parallel processing | |
| zip_file_path = zf.filename | |
| # Process files in parallel or sequentially based on file count | |
| # Only print if no callback (avoid duplicates) | |
| if not progress_callback: | |
| print(f"🚀 Processing {len(files_to_process)} HTML files...") | |
| # Initial progress - no message needed, progress bar will show | |
| candidate_chapters = [] # For smart mode | |
| chapters_direct = [] # For other modes | |
| # Decide whether to use parallel processing | |
| use_parallel = len(files_to_process) > 10 | |
| if use_parallel: | |
| # Get worker count from environment variable | |
| max_workers = int(os.getenv("EXTRACTION_WORKERS", "2")) | |
| print(f"📦 Using parallel processing with {max_workers} workers...") | |
| if progress_callback: | |
| progress_callback(f"Starting {max_workers} extraction workers...") | |
| # Use ProcessPoolExecutor for true multi-process parallelism | |
| # Now that all functions are at module level and picklable, we can use processes | |
| with ProcessPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all files for processing | |
| future_to_file = { | |
| executor.submit( | |
| _process_single_html_file, | |
| file_path=file_path, | |
| file_index=idx, | |
| zip_file_path=zip_file_path, | |
| parser=parser, | |
| merge_candidates=merge_candidates, | |
| disable_merging=disable_merging, | |
| enhanced_extractor=enhanced_extractor, | |
| extraction_mode=extraction_mode, | |
| enhanced_filtering=enhanced_filtering, | |
| preserve_structure=preserve_structure, | |
| protect_angle_brackets_func=protect_angle_brackets_with_korean, | |
| pattern_manager=pattern_manager, | |
| files_to_process=files_to_process, | |
| is_stop_requested=is_stop_requested | |
| ): (file_path, idx) | |
| for idx, file_path in enumerate(files_to_process) | |
| } | |
| # Collect results as they complete with progress tracking | |
| processed_count = 0 | |
| for future in as_completed(future_to_file): | |
| if is_stop_requested(): | |
| print("❌ Chapter processing stopped by user") | |
| executor.shutdown(wait=False) | |
| return [], 'unknown' | |
| try: | |
| # Unpack result from _process_single_html_file | |
| result = future.result() | |
| chapter_info, h1_found, h2_found, file_size, sample_text, skipped_info = result | |
| # Update progress | |
| processed_count += 1 | |
| if processed_count % 5 == 0: | |
| if progress_callback: | |
| progress_msg = f"Processing chapters: {processed_count}/{total_files} ({processed_count*100//total_files}%)" | |
| progress_callback(progress_msg) | |
| else: | |
| # Print progress bar in terminal | |
| ProgressBar.update(processed_count, total_files, prefix="📚 Processing chapters") | |
| # Aggregate header counts | |
| if h1_found: | |
| h1_count += 1 | |
| if h2_found: | |
| h2_count += 1 | |
| # Collect file size groups and sample texts | |
| if chapter_info: | |
| effective_mode = enhanced_filtering if extraction_mode == "enhanced" else extraction_mode | |
| if effective_mode == "smart" and file_size > 0: | |
| if file_size not in file_size_groups: | |
| file_size_groups[file_size] = [] | |
| file_path, _ = future_to_file[future] | |
| file_size_groups[file_size].append(file_path) | |
| # Collect sample texts | |
| if sample_text and len(sample_texts) < 5: | |
| sample_texts.append(sample_text) | |
| # For smart mode when merging is enabled, collect candidates | |
| # Otherwise, add directly to chapters | |
| if effective_mode == "smart" and not disable_merging: | |
| candidate_chapters.append(chapter_info) | |
| else: | |
| chapters_direct.append(chapter_info) | |
| # Collect skipped info | |
| if skipped_info: | |
| skipped_files.append(skipped_info) | |
| except Exception as e: | |
| file_path, idx = future_to_file[future] | |
| print(f"[ERROR] Process error processing {file_path}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Show 100% completion | |
| if progress_callback: | |
| progress_callback(f"Processing chapters: {total_files}/{total_files} (100%)") | |
| else: | |
| ProgressBar.update(total_files, total_files, prefix="📚 Processing chapters") | |
| else: | |
| print("📦 Using sequential processing (small file count)...") | |
| # Process files sequentially for small EPUBs | |
| for idx, file_path in enumerate(files_to_process): | |
| if is_stop_requested(): | |
| print("❌ Chapter processing stopped by user") | |
| return [], 'unknown' | |
| # Call the module-level function directly | |
| result = _process_single_html_file( | |
| file_path=file_path, | |
| file_index=idx, | |
| zip_file_path=zip_file_path, | |
| parser=parser, | |
| merge_candidates=merge_candidates, | |
| disable_merging=disable_merging, | |
| enhanced_extractor=enhanced_extractor, | |
| extraction_mode=extraction_mode, | |
| enhanced_filtering=enhanced_filtering, | |
| preserve_structure=preserve_structure, | |
| protect_angle_brackets_func=protect_angle_brackets_with_korean, | |
| pattern_manager=pattern_manager, | |
| files_to_process=files_to_process, | |
| is_stop_requested=is_stop_requested | |
| ) | |
| # Unpack result | |
| chapter_info, h1_found, h2_found, file_size, sample_text, skipped_info = result | |
| # Update progress | |
| if (idx + 1) % 5 == 0: | |
| if progress_callback: | |
| progress_msg = f"Processing chapters: {idx+1}/{total_files} ({(idx+1)*100//total_files}%)" | |
| progress_callback(progress_msg) | |
| else: | |
| # Print progress bar in terminal | |
| ProgressBar.update(idx+1, total_files, prefix="📚 Processing chapters") | |
| # Aggregate header counts | |
| if h1_found: | |
| h1_count += 1 | |
| if h2_found: | |
| h2_count += 1 | |
| # Collect file size groups and sample texts | |
| if chapter_info: | |
| effective_mode = enhanced_filtering if extraction_mode == "enhanced" else extraction_mode | |
| if effective_mode == "smart" and file_size > 0: | |
| if file_size not in file_size_groups: | |
| file_size_groups[file_size] = [] | |
| file_size_groups[file_size].append(file_path) | |
| # Collect sample texts | |
| if sample_text and len(sample_texts) < 5: | |
| sample_texts.append(sample_text) | |
| # For smart mode when merging is enabled, collect candidates | |
| # Otherwise, add directly to chapters | |
| if effective_mode == "smart" and not disable_merging: | |
| candidate_chapters.append(chapter_info) | |
| else: | |
| chapters_direct.append(chapter_info) | |
| # Collect skipped info | |
| if skipped_info: | |
| skipped_files.append(skipped_info) | |
| # Show 100% completion for sequential mode | |
| if progress_callback: | |
| progress_callback(f"Processing chapters: {total_files}/{total_files} (100%)") | |
| else: | |
| ProgressBar.update(total_files, total_files, prefix="📚 Processing chapters") | |
| # Final progress update and cleanup progress bar | |
| if not progress_callback: | |
| ProgressBar.finish() | |
| else: | |
| progress_callback(f"Chapter processing complete: {len(candidate_chapters) + len(chapters_direct)} chapters") | |
| # Print skip summary if any files were skipped | |
| if skipped_files: | |
| print(f"\n📊 Skipped {len(skipped_files)} files during processing:") | |
| empty_count = sum(1 for _, reason, _ in skipped_files if reason == 'empty') | |
| if empty_count > 0: | |
| print(f" • {empty_count} nearly empty files") | |
| # Show first 3 examples if debug enabled | |
| if os.getenv('DEBUG_SKIP_MESSAGES', '0') == '1' and skipped_files: | |
| print(" Examples:") | |
| for path, reason, size in skipped_files[:3]: | |
| print(f" - {os.path.basename(path)} ({size} chars)") | |
| # Sort direct chapters by file index to maintain order | |
| chapters_direct.sort(key=lambda x: x["file_index"]) | |
| # Post-process smart mode candidates (only when merging is enabled) | |
| effective_mode = enhanced_filtering if extraction_mode == "enhanced" else extraction_mode | |
| if effective_mode == "smart" and candidate_chapters and not disable_merging: | |
| # Check stop before post-processing | |
| if is_stop_requested(): | |
| print("❌ Chapter post-processing stopped by user") | |
| return chapters, 'unknown' | |
| print(f"\n[SMART MODE] Processing {len(candidate_chapters)} candidate files...") | |
| # Sort candidates by file index to maintain order | |
| candidate_chapters.sort(key=lambda x: x["file_index"]) | |
| # Debug: Show what files we have | |
| section_files = [c for c in candidate_chapters if 'section' in c['original_basename'].lower()] | |
| chapter_files = [c for c in candidate_chapters if 'chapter' in c['original_basename'].lower() and 'section' not in c['original_basename'].lower()] | |
| other_files = [c for c in candidate_chapters if c not in section_files and c not in chapter_files] | |
| print(f" 📊 File breakdown:") | |
| print(f" • Section files: {len(section_files)}") | |
| print(f" • Chapter files: {len(chapter_files)}") | |
| print(f" • Other files: {len(other_files)}") | |
| # Original smart mode logic when merging is enabled | |
| # First, separate files with detected chapter numbers from those without | |
| numbered_chapters = [] | |
| unnumbered_chapters = [] | |
| for idx, chapter in enumerate(candidate_chapters): | |
| # Yield periodically during categorization (can be disabled for max speed) | |
| if idx % 10 == 0 and idx > 0 and os.getenv("ENABLE_GUI_YIELD", "1") == "1": | |
| time.sleep(0.001) | |
| if chapter["num"] is not None: | |
| numbered_chapters.append(chapter) | |
| else: | |
| unnumbered_chapters.append(chapter) | |
| print(f" • Files with chapter numbers: {len(numbered_chapters)}") | |
| print(f" • Files without chapter numbers: {len(unnumbered_chapters)}") | |
| # Check if we have hash-based filenames (no numbered chapters found) | |
| if not numbered_chapters and unnumbered_chapters: | |
| print(" ⚠️ No chapter numbers found - likely hash-based filenames") | |
| print(" → Using file order as chapter sequence") | |
| # Sort by file index to maintain order | |
| unnumbered_chapters.sort(key=lambda x: x["file_index"]) | |
| # Assign sequential numbers | |
| for i, chapter in enumerate(unnumbered_chapters, 1): | |
| chapter["num"] = i | |
| chapter["detection_method"] = f"{extraction_mode}_hash_filename_sequential" if extraction_mode == "enhanced" else "hash_filename_sequential" | |
| if not chapter["title"] or chapter["title"] == chapter["original_basename"]: | |
| chapter["title"] = f"Chapter {i}" | |
| chapters = unnumbered_chapters | |
| else: | |
| # We have some numbered chapters | |
| chapters = numbered_chapters | |
| # For unnumbered files, check if they might be duplicates or appendices | |
| if unnumbered_chapters: | |
| print(f" → Analyzing {len(unnumbered_chapters)} unnumbered files...") | |
| # Get the max chapter number | |
| max_num = max(c["num"] for c in numbered_chapters) | |
| # Check each unnumbered file | |
| for chapter in unnumbered_chapters: | |
| # Check stop in post-processing loop | |
| if is_stop_requested(): | |
| print("❌ Chapter post-processing stopped by user") | |
| return chapters, 'unknown' | |
| # Check if it's very small (might be a separator or note) | |
| if chapter["file_size"] < 200: | |
| # Collect for summary instead of printing | |
| # Note: _smart_mode_skips defined in outer scope | |
| _smart_mode_skips.append(('small', chapter['filename'], chapter['file_size'])) | |
| continue | |
| # Check if it has similar size to existing chapters (might be duplicate) | |
| size = chapter["file_size"] | |
| similar_chapters = [c for c in numbered_chapters | |
| if abs(c["file_size"] - size) < 50] | |
| if similar_chapters: | |
| # Might be a duplicate, skip it (collect for summary) | |
| _smart_mode_skips.append(('duplicate', chapter['filename'], len(similar_chapters))) | |
| continue | |
| # Otherwise, add as appendix | |
| max_num += 1 | |
| chapter["num"] = max_num | |
| chapter["detection_method"] = f"{extraction_mode}_appendix_sequential" if extraction_mode == "enhanced" else "appendix_sequential" | |
| if not chapter["title"] or chapter["title"] == chapter["original_basename"]: | |
| chapter["title"] = f"Appendix {max_num}" | |
| chapters.append(chapter) | |
| print(f" [ADD] Added as chapter {max_num}: {chapter['filename']}") | |
| else: | |
| # For other modes or smart mode with merging disabled | |
| chapters = chapters_direct | |
| # Print smart mode skip summary if any | |
| if '_smart_mode_skips' in locals() and _smart_mode_skips: | |
| print(f"\n📊 Smart mode filtering summary:") | |
| small_count = sum(1 for reason, _, _ in _smart_mode_skips if reason == 'small') | |
| dup_count = sum(1 for reason, _, _ in _smart_mode_skips if reason == 'duplicate') | |
| if small_count > 0: | |
| print(f" • Skipped {small_count} very small files") | |
| if dup_count > 0: | |
| print(f" • Skipped {dup_count} possible duplicates") | |
| # Show examples if debug enabled | |
| if os.getenv('DEBUG_SKIP_MESSAGES', '0') == '1': | |
| print(" Examples:") | |
| for reason, filename, detail in _smart_mode_skips[:3]: | |
| if reason == 'small': | |
| print(f" - {filename} ({detail} chars)") | |
| else: | |
| print(f" - {filename} (similar to {detail} chapters)") | |
| # Clear the list | |
| _smart_mode_skips = [] | |
| # Sort chapters by number | |
| chapters.sort(key=lambda x: x["num"]) | |
| # Ensure chapter numbers are integers | |
| # When merging is disabled, all chapters should have integer numbers anyway | |
| for chapter in chapters: | |
| if isinstance(chapter["num"], float): | |
| chapter["num"] = int(chapter["num"]) | |
| # Final validation | |
| if chapters: | |
| print(f"\n✅ Final chapter count: {len(chapters)}") | |
| print(f" • Chapter range: {chapters[0]['num']} - {chapters[-1]['num']}") | |
| # Enhanced mode summary | |
| if extraction_mode == "enhanced": | |
| enhanced_count = sum(1 for c in chapters if c.get('enhanced_extraction', False)) | |
| total_chars = sum(len(c.get('body', '')) for c in chapters if c.get('enhanced_extraction', False)) | |
| avg_chars = total_chars // enhanced_count if enhanced_count > 0 else 0 | |
| print(f" 🚀 Enhanced extraction: {enhanced_count}/{len(chapters)} chapters, {total_chars:,} total chars (avg: {avg_chars:,})") | |
| # Check for gaps | |
| chapter_nums = [c["num"] for c in chapters] | |
| expected_nums = list(range(min(chapter_nums), max(chapter_nums) + 1)) | |
| missing = set(expected_nums) - set(chapter_nums) | |
| if missing: | |
| print(f" ⚠️ Missing chapter numbers: {sorted(missing)}") | |
| # Language detection | |
| combined_sample = ' '.join(sample_texts) if effective_mode == "smart" else '' | |
| detected_language = _detect_content_language(combined_sample) if combined_sample else 'unknown' | |
| if chapters: | |
| _print_extraction_summary(chapters, detected_language, extraction_mode, | |
| h1_count if effective_mode == "smart" else 0, | |
| h2_count if effective_mode == "smart" else 0, | |
| file_size_groups if effective_mode == "smart" else {}) | |
| return chapters, detected_language | |
| def _extract_chapter_info(soup, file_path, content_text, html_content, pattern_manager): | |
| """Extract chapter number and title from various sources with parallel pattern matching""" | |
| chapter_num = None | |
| chapter_title = None | |
| detection_method = None | |
| # SPECIAL HANDLING: When we have Section/Chapter pairs, differentiate them | |
| filename = os.path.basename(file_path) | |
| # Handle different naming patterns for Section/Chapter files | |
| if ('section' in filename.lower() or '_section' in filename.lower()) and 'chapter' not in filename.lower(): | |
| # For Section files, add 0.1 to the base number | |
| # Try different patterns | |
| match = re.search(r'No(\d+)', filename) | |
| if not match: | |
| match = re.search(r'^(\d+)[_\-]', filename) | |
| if not match: | |
| match = re.search(r'^(\d+)', filename) | |
| if match: | |
| base_num = int(match.group(1)) | |
| chapter_num = base_num + 0.1 # Section gets .1 | |
| detection_method = "filename_section_special" | |
| elif ('chapter' in filename.lower() or '_chapter' in filename.lower()) and 'section' not in filename.lower(): | |
| # For Chapter files, use the base number | |
| # Try different patterns | |
| match = re.search(r'No(\d+)', filename) | |
| if not match: | |
| match = re.search(r'^(\d+)[_\-]', filename) | |
| if not match: | |
| match = re.search(r'^(\d+)', filename) | |
| if match: | |
| chapter_num = int(match.group(1)) | |
| detection_method = "filename_chapter_special" | |
| # If not handled by special logic, continue with normal extraction | |
| if not chapter_num: | |
| # Try filename first - use parallel pattern matching for better performance | |
| chapter_patterns = [(pattern, flags, method) for pattern, flags, method in PM.CHAPTER_PATTERNS | |
| if method.endswith('_number')] | |
| if len(chapter_patterns) > 3: # Only parallelize if we have enough patterns | |
| # Parallel pattern matching for filename | |
| with ThreadPoolExecutor(max_workers=min(4, len(chapter_patterns))) as executor: | |
| def try_pattern(pattern_info): | |
| pattern, flags, method = pattern_info | |
| match = re.search(pattern, file_path, flags) | |
| if match: | |
| try: | |
| num_str = match.group(1) | |
| if num_str.isdigit(): | |
| return int(num_str), f"filename_{method}" | |
| elif method == 'chinese_chapter_cn': | |
| from TransateKRtoEN import PatternManager | |
| pm = None # No longer needed | |
| converted = _convert_chinese_number(num_str, pm) | |
| if converted: | |
| return converted, f"filename_{method}" | |
| except (ValueError, IndexError): | |
| pass | |
| return None, None | |
| # Submit all patterns | |
| futures = [executor.submit(try_pattern, pattern_info) for pattern_info in chapter_patterns] | |
| # Check results as they complete | |
| for future in as_completed(futures): | |
| try: | |
| num, method = future.result() | |
| if num: | |
| chapter_num = num | |
| detection_method = method | |
| # Cancel remaining futures | |
| for f in futures: | |
| f.cancel() | |
| break | |
| except Exception: | |
| continue | |
| else: | |
| # Sequential processing for small pattern sets | |
| for pattern, flags, method in chapter_patterns: | |
| match = re.search(pattern, file_path, flags) | |
| if match: | |
| try: | |
| num_str = match.group(1) | |
| if num_str.isdigit(): | |
| chapter_num = int(num_str) | |
| detection_method = f"filename_{method}" | |
| break | |
| elif method == 'chinese_chapter_cn': | |
| from TransateKRtoEN import PatternManager | |
| pm = None # No longer needed | |
| converted = _convert_chinese_number(num_str, pm) | |
| if converted: | |
| chapter_num = converted | |
| detection_method = f"filename_{method}" | |
| break | |
| except (ValueError, IndexError): | |
| continue | |
| # Try content if not found in filename | |
| if not chapter_num: | |
| # Check ignore settings for batch translation | |
| batch_translate_active = os.getenv('BATCH_TRANSLATE_HEADERS', '0') == '1' | |
| use_title_tag = os.getenv('USE_TITLE', '0') == '1' or not batch_translate_active | |
| ignore_header_tags = os.getenv('IGNORE_HEADER', '0') == '1' and batch_translate_active | |
| # Prepare all text sources to check in parallel | |
| text_sources = [] | |
| # Add title tag if using titles | |
| if use_title_tag and soup.title and soup.title.string: | |
| title_text = soup.title.string.strip() | |
| text_sources.append(("title", title_text, True)) # True means this can be chapter_title | |
| # Add headers if not ignored | |
| if not ignore_header_tags: | |
| for header_tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: | |
| headers = soup.find_all(header_tag) | |
| for header in headers[:3]: # Limit to first 3 of each type | |
| header_text = header.get_text(strip=True) | |
| if header_text: | |
| text_sources.append((f"header_{header_tag}", header_text, True)) | |
| # Add first paragraphs | |
| first_elements = soup.find_all(['p', 'div'])[:5] | |
| for elem in first_elements: | |
| elem_text = elem.get_text(strip=True) | |
| if elem_text: | |
| text_sources.append(("content", elem_text, False)) # False means don't use as chapter_title | |
| # Process text sources in parallel if we have many | |
| if len(text_sources) > 5: | |
| with ThreadPoolExecutor(max_workers=min(6, len(text_sources))) as executor: | |
| def extract_from_source(source_info): | |
| source_type, text, can_be_title = source_info | |
| num, method = _extract_from_text(text, source_type, pattern_manager) | |
| return num, method, text if (num and can_be_title) else None | |
| # Submit all text sources | |
| future_to_source = {executor.submit(extract_from_source, source): source | |
| for source in text_sources} | |
| # Process results as they complete | |
| for future in as_completed(future_to_source): | |
| try: | |
| num, method, title = future.result() | |
| if num: | |
| chapter_num = num | |
| detection_method = method | |
| if title and not chapter_title: | |
| chapter_title = title | |
| # Cancel remaining futures | |
| for f in future_to_source: | |
| f.cancel() | |
| break | |
| except Exception: | |
| continue | |
| else: | |
| # Sequential processing for small text sets | |
| for source_type, text, can_be_title in text_sources: | |
| num, method = _extract_from_text(text, source_type, pattern_manager) | |
| if num: | |
| chapter_num = num | |
| detection_method = method | |
| if can_be_title and not chapter_title: | |
| chapter_title = text | |
| break | |
| # Final fallback to filename patterns | |
| if not chapter_num: | |
| filename_base = os.path.basename(file_path) | |
| # Parallel pattern matching for filename extraction | |
| if len(PM.FILENAME_EXTRACT_PATTERNS) > 3: | |
| with ThreadPoolExecutor(max_workers=min(4, len(PM.FILENAME_EXTRACT_PATTERNS))) as executor: | |
| def try_filename_pattern(pattern): | |
| match = re.search(pattern, filename_base, re.IGNORECASE) | |
| if match: | |
| try: | |
| return int(match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| return None | |
| futures = [executor.submit(try_filename_pattern, pattern) | |
| for pattern in PM.FILENAME_EXTRACT_PATTERNS] | |
| for future in as_completed(futures): | |
| try: | |
| num = future.result() | |
| if num: | |
| chapter_num = num | |
| detection_method = "filename_number" | |
| for f in futures: | |
| f.cancel() | |
| break | |
| except Exception: | |
| continue | |
| else: | |
| # Sequential for small pattern sets | |
| for pattern in PM.FILENAME_EXTRACT_PATTERNS: | |
| match = re.search(pattern, filename_base, re.IGNORECASE) | |
| if match: | |
| chapter_num = int(match.group(1)) | |
| detection_method = "filename_number" | |
| break | |
| # Extract title if not already found (with ignore settings support) | |
| if not chapter_title: | |
| # Check settings for batch translation | |
| batch_translate_active = os.getenv('BATCH_TRANSLATE_HEADERS', '0') == '1' | |
| use_title_tag = os.getenv('USE_TITLE', '0') == '1' or not batch_translate_active | |
| ignore_header_tags = os.getenv('IGNORE_HEADER', '0') == '1' and batch_translate_active | |
| # Try title tag if using titles | |
| if use_title_tag and soup.title and soup.title.string: | |
| chapter_title = soup.title.string.strip() | |
| # Try header tags if not ignored and no title found | |
| if not chapter_title and not ignore_header_tags: | |
| for header_tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: | |
| header = soup.find(header_tag) | |
| if header: | |
| chapter_title = header.get_text(strip=True) | |
| break | |
| # Final fallback | |
| if not chapter_title: | |
| chapter_title = f"Chapter {chapter_num}" if chapter_num else None | |
| chapter_title = re.sub(r'\s+', ' ', chapter_title).strip() if chapter_title else None | |
| return chapter_num, chapter_title, detection_method | |
| def _extract_from_text(text, source_type, pattern_manager): | |
| """Extract chapter number from text using patterns with parallel matching for large pattern sets""" | |
| # Get patterns that don't end with '_number' | |
| text_patterns = [(pattern, flags, method) for pattern, flags, method in PM.CHAPTER_PATTERNS | |
| if not method.endswith('_number')] | |
| # Only use parallel processing if we have many patterns | |
| if len(text_patterns) > 5: | |
| with ThreadPoolExecutor(max_workers=min(4, len(text_patterns))) as executor: | |
| def try_text_pattern(pattern_info): | |
| pattern, flags, method = pattern_info | |
| match = re.search(pattern, text, flags) | |
| if match: | |
| try: | |
| num_str = match.group(1) | |
| if num_str.isdigit(): | |
| return int(num_str), f"{source_type}_{method}" | |
| elif method == 'chinese_chapter_cn': | |
| from TransateKRtoEN import PatternManager | |
| pm = None # No longer needed | |
| converted = _convert_chinese_number(num_str, pm) | |
| if converted: | |
| return converted, f"{source_type}_{method}" | |
| except (ValueError, IndexError): | |
| pass | |
| return None, None | |
| # Submit all patterns | |
| futures = [executor.submit(try_text_pattern, pattern_info) for pattern_info in text_patterns] | |
| # Check results as they complete | |
| for future in as_completed(futures): | |
| try: | |
| num, method = future.result() | |
| if num: | |
| # Cancel remaining futures | |
| for f in futures: | |
| f.cancel() | |
| return num, method | |
| except Exception: | |
| continue | |
| else: | |
| # Sequential processing for small pattern sets | |
| for pattern, flags, method in text_patterns: | |
| match = re.search(pattern, text, flags) | |
| if match: | |
| try: | |
| num_str = match.group(1) | |
| if num_str.isdigit(): | |
| return int(num_str), f"{source_type}_{method}" | |
| elif method == 'chinese_chapter_cn': | |
| from TransateKRtoEN import PatternManager | |
| pm = None # No longer needed | |
| converted = _convert_chinese_number(num_str, pm) | |
| if converted: | |
| return converted, f"{source_type}_{method}" | |
| except (ValueError, IndexError): | |
| continue | |
| return None, None | |
| def _convert_chinese_number(cn_num, pattern_manager): | |
| """Convert Chinese number to integer""" | |
| if cn_num in PM.CHINESE_NUMS: | |
| return PM.CHINESE_NUMS[cn_num] | |
| if '十' in cn_num: | |
| parts = cn_num.split('十') | |
| if len(parts) == 2: | |
| tens = PM.CHINESE_NUMS.get(parts[0], 1) if parts[0] else 1 | |
| ones = PM.CHINESE_NUMS.get(parts[1], 0) if parts[1] else 0 | |
| return tens * 10 + ones | |
| return None | |
| def _detect_content_language( text_sample): | |
| """Detect the primary language of content with parallel processing for large texts""" | |
| # For very short texts, use sequential processing | |
| if len(text_sample) < 1000: | |
| scripts = { | |
| 'korean': 0, | |
| 'japanese_hiragana': 0, | |
| 'japanese_katakana': 0, | |
| 'chinese': 0, | |
| 'latin': 0 | |
| } | |
| for char in text_sample: | |
| code = ord(char) | |
| if 0xAC00 <= code <= 0xD7AF: | |
| scripts['korean'] += 1 | |
| elif 0x3040 <= code <= 0x309F: | |
| scripts['japanese_hiragana'] += 1 | |
| elif 0x30A0 <= code <= 0x30FF: | |
| scripts['japanese_katakana'] += 1 | |
| elif 0x4E00 <= code <= 0x9FFF: | |
| scripts['chinese'] += 1 | |
| elif 0x0020 <= code <= 0x007F: | |
| scripts['latin'] += 1 | |
| else: | |
| # For longer texts, use parallel processing | |
| # Split text into chunks for parallel processing | |
| chunk_size = max(500, len(text_sample) // (os.cpu_count() or 4)) | |
| chunks = [text_sample[i:i + chunk_size] for i in range(0, len(text_sample), chunk_size)] | |
| # Thread-safe accumulator | |
| scripts_lock = threading.Lock() | |
| scripts = { | |
| 'korean': 0, | |
| 'japanese_hiragana': 0, | |
| 'japanese_katakana': 0, | |
| 'chinese': 0, | |
| 'latin': 0 | |
| } | |
| def process_chunk(text_chunk): | |
| """Process a chunk of text and return script counts""" | |
| local_scripts = { | |
| 'korean': 0, | |
| 'japanese_hiragana': 0, | |
| 'japanese_katakana': 0, | |
| 'chinese': 0, | |
| 'latin': 0 | |
| } | |
| for char in text_chunk: | |
| code = ord(char) | |
| if 0xAC00 <= code <= 0xD7AF: | |
| local_scripts['korean'] += 1 | |
| elif 0x3040 <= code <= 0x309F: | |
| local_scripts['japanese_hiragana'] += 1 | |
| elif 0x30A0 <= code <= 0x30FF: | |
| local_scripts['japanese_katakana'] += 1 | |
| elif 0x4E00 <= code <= 0x9FFF: | |
| local_scripts['chinese'] += 1 | |
| elif 0x0020 <= code <= 0x007F: | |
| local_scripts['latin'] += 1 | |
| return local_scripts | |
| # Process chunks in parallel | |
| with ThreadPoolExecutor(max_workers=min(os.cpu_count() or 4, len(chunks))) as executor: | |
| # Submit all chunks | |
| futures = [executor.submit(process_chunk, chunk) for chunk in chunks] | |
| # Collect results | |
| for future in as_completed(futures): | |
| try: | |
| chunk_scripts = future.result() | |
| # Thread-safe accumulation | |
| with scripts_lock: | |
| for script, count in chunk_scripts.items(): | |
| scripts[script] += count | |
| except Exception as e: | |
| print(f"[WARNING] Error processing chunk in language detection: {e}") | |
| # Language determination logic (same as original) | |
| total_cjk = scripts['korean'] + scripts['japanese_hiragana'] + scripts['japanese_katakana'] + scripts['chinese'] | |
| if scripts['korean'] > total_cjk * 0.3: | |
| return 'korean' | |
| elif scripts['japanese_hiragana'] + scripts['japanese_katakana'] > total_cjk * 0.2: | |
| return 'japanese' | |
| elif scripts['chinese'] > total_cjk * 0.3: | |
| return 'chinese' | |
| elif scripts['latin'] > len(text_sample) * 0.7: | |
| return 'english' | |
| else: | |
| return 'unknown' | |
| # Global flag to track if language has been printed | |
| _language_printed = False | |
| def _print_extraction_summary( chapters, detected_language, extraction_mode, h1_count, h2_count, file_size_groups): | |
| """Print extraction summary""" | |
| global _language_printed | |
| print(f"\n📊 Chapter Extraction Summary ({extraction_mode.capitalize()} Mode):") | |
| print(f" • Total chapters extracted: {len(chapters)}") | |
| # Format chapter range handling both int and float | |
| first_num = chapters[0]['num'] | |
| last_num = chapters[-1]['num'] | |
| print(f" • Chapter range: {first_num} to {last_num}") | |
| # Only print detected language once per session | |
| if not _language_printed and detected_language and detected_language != 'unknown': | |
| print(f" 🌐 Detected language: {detected_language}") | |
| _language_printed = True | |
| if extraction_mode == "smart": | |
| print(f" • Primary header type: {'<h2>' if h2_count > h1_count else '<h1>'}") | |
| image_only_count = sum(1 for c in chapters if c.get('is_image_only', False)) | |
| text_only_count = sum(1 for c in chapters if not c.get('has_images', False) and c.get('file_size', 0) >= 500) | |
| mixed_count = sum(1 for c in chapters if c.get('has_images', False) and c.get('file_size', 0) >= 500) | |
| empty_count = sum(1 for c in chapters if c.get('file_size', 0) < 50) | |
| print(f" • Text-only chapters: {text_only_count}") | |
| print(f" • Image-only chapters: {image_only_count}") | |
| print(f" • Mixed content chapters: {mixed_count}") | |
| print(f" • Empty/minimal content: {empty_count}") | |
| # Check for merged chapters | |
| merged_count = sum(1 for c in chapters if c.get('was_merged', False)) | |
| if merged_count > 0: | |
| print(f" • Merged chapters: {merged_count}") | |
| # Check for missing chapters (only for integer sequences) | |
| expected_chapters = set(range(chapters[0]['num'], chapters[-1]['num'] + 1)) | |
| actual_chapters = set(c['num'] for c in chapters) | |
| missing = expected_chapters - actual_chapters | |
| if missing: | |
| print(f" ⚠️ Missing chapter numbers: {sorted(missing)}") | |
| if extraction_mode == "smart": | |
| method_stats = Counter(c['detection_method'] for c in chapters) | |
| print(f" 📈 Detection methods used:") | |
| for method, count in method_stats.most_common(): | |
| print(f" • {method}: {count} chapters") | |
| large_groups = [size for size, files in file_size_groups.items() if len(files) > 1] | |
| if large_groups: | |
| print(f" ⚠️ Found {len(large_groups)} file size groups with potential duplicates") | |
| else: | |
| print(f" • Empty/placeholder: {empty_count}") | |
| if extraction_mode == "full": | |
| print(f" 🔍 Full extraction preserved all HTML structure and tags") | |
| def _extract_epub_metadata(zf): | |
| """Extract comprehensive metadata from EPUB file including all custom fields""" | |
| meta = {} | |
| # Use lxml for XML if available | |
| try: | |
| import lxml | |
| xml_parser = 'lxml-xml' | |
| except ImportError: | |
| xml_parser = 'xml' | |
| try: | |
| for name in zf.namelist(): | |
| if name.lower().endswith('.opf'): | |
| opf_content = zf.read(name) | |
| soup = BeautifulSoup(opf_content, xml_parser) | |
| # Extract ALL Dublin Core elements (expanded list) | |
| dc_elements = ['title', 'creator', 'subject', 'description', | |
| 'publisher', 'contributor', 'date', 'type', | |
| 'format', 'identifier', 'source', 'language', | |
| 'relation', 'coverage', 'rights'] | |
| for element in dc_elements: | |
| tag = soup.find(element) | |
| if tag and tag.get_text(strip=True): | |
| meta[element] = tag.get_text(strip=True) | |
| # Extract ALL meta tags (not just series) | |
| meta_tags = soup.find_all('meta') | |
| for meta_tag in meta_tags: | |
| # Try different attribute names for the metadata name | |
| name = meta_tag.get('name') or meta_tag.get('property', '') | |
| content = meta_tag.get('content', '') | |
| if name and content: | |
| # Store original name for debugging | |
| original_name = name | |
| # Clean up common prefixes | |
| if name.startswith('calibre:'): | |
| name = name[8:] # Remove 'calibre:' prefix | |
| elif name.startswith('dc:'): | |
| name = name[3:] # Remove 'dc:' prefix | |
| elif name.startswith('opf:'): | |
| name = name[4:] # Remove 'opf:' prefix | |
| # Normalize the field name - replace hyphens with underscores | |
| name = name.replace('-', '_') | |
| # Don't overwrite if already exists (prefer direct tags over meta tags) | |
| if name not in meta: | |
| meta[name] = content | |
| # Debug output for custom fields | |
| if original_name != name: | |
| print(f" • Found custom field: {original_name} → {name}") | |
| # Special handling for series information (maintain compatibility) | |
| if 'series' not in meta: | |
| series_tags = soup.find_all('meta', attrs={'name': lambda x: x and 'series' in x.lower()}) | |
| for series_tag in series_tags: | |
| series_name = series_tag.get('content', '') | |
| if series_name: | |
| meta['series'] = series_name | |
| break | |
| # Extract refines metadata (used by some EPUB creators) | |
| refines_metas = soup.find_all('meta', attrs={'refines': True}) | |
| for refine in refines_metas: | |
| property_name = refine.get('property', '') | |
| content = refine.get_text(strip=True) or refine.get('content', '') | |
| if property_name and content: | |
| # Clean property name | |
| if ':' in property_name: | |
| property_name = property_name.split(':')[-1] | |
| property_name = property_name.replace('-', '_') | |
| if property_name not in meta: | |
| meta[property_name] = content | |
| # Log extraction summary | |
| print(f"📋 Extracted {len(meta)} metadata fields") | |
| # Show standard vs custom fields | |
| standard_keys = {'title', 'creator', 'language', 'subject', 'description', | |
| 'publisher', 'date', 'identifier', 'source', 'rights', | |
| 'contributor', 'type', 'format', 'relation', 'coverage'} | |
| custom_keys = set(meta.keys()) - standard_keys | |
| if custom_keys: | |
| print(f"📋 Standard fields: {len(standard_keys & set(meta.keys()))}") | |
| print(f"📋 Custom fields found: {sorted(custom_keys)}") | |
| # Show sample values for custom fields (truncated) | |
| for key in sorted(custom_keys)[:5]: # Show first 5 custom fields | |
| value = str(meta[key]) | |
| if len(value) > 50: | |
| value = value[:47] + "..." | |
| print(f" • {key}: {value}") | |
| if len(custom_keys) > 5: | |
| print(f" • ... and {len(custom_keys) - 5} more custom fields") | |
| break | |
| except Exception as e: | |
| print(f"[WARNING] Failed to extract metadata: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return meta | |
| def _categorize_resource( file_path, file_name): | |
| """Categorize a file and return (resource_type, target_dir, safe_filename)""" | |
| file_path_lower = file_path.lower() | |
| file_name_lower = file_name.lower() | |
| if file_path_lower.endswith('.css'): | |
| return 'css', 'css', sanitize_resource_filename(file_name) | |
| elif file_path_lower.endswith(('.ttf', '.otf', '.woff', '.woff2', '.eot')): | |
| return 'fonts', 'fonts', sanitize_resource_filename(file_name) | |
| elif file_path_lower.endswith(('.jpg', '.jpeg', '.png', '.gif', '.svg', '.bmp', '.webp')): | |
| return 'images', 'images', sanitize_resource_filename(file_name) | |
| elif (file_path_lower.endswith(('.opf', '.ncx')) or | |
| file_name_lower == 'container.xml' or | |
| 'container.xml' in file_path_lower): | |
| if 'container.xml' in file_path_lower: | |
| safe_filename = 'container.xml' | |
| else: | |
| safe_filename = file_name | |
| return 'epub_structure', None, safe_filename | |
| elif file_path_lower.endswith(('.js', '.xml', '.txt')): | |
| return 'other', None, sanitize_resource_filename(file_name) | |
| return None | |
| def _cleanup_old_resources( output_dir): | |
| """Clean up old resource directories and EPUB structure files""" | |
| print("🧹 Cleaning up any existing resource directories...") | |
| cleanup_success = True | |
| for resource_type in ['css', 'fonts', 'images']: | |
| resource_dir = os.path.join(output_dir, resource_type) | |
| if os.path.exists(resource_dir): | |
| try: | |
| shutil.rmtree(resource_dir) | |
| print(f" 🗑️ Removed old {resource_type} directory") | |
| except PermissionError as e: | |
| print(f" ⚠️ Cannot remove {resource_type} directory (permission denied) - will merge with existing files") | |
| cleanup_success = False | |
| except Exception as e: | |
| print(f" ⚠️ Error removing {resource_type} directory: {e} - will merge with existing files") | |
| cleanup_success = False | |
| epub_structure_files = ['container.xml', 'content.opf', 'toc.ncx'] | |
| for epub_file in epub_structure_files: | |
| input_path = os.path.join(output_dir, epub_file) | |
| if os.path.exists(input_path): | |
| try: | |
| os.remove(input_path) | |
| print(f" 🗑️ Removed old {epub_file}") | |
| except PermissionError: | |
| print(f" ⚠️ Cannot remove {epub_file} (permission denied) - will use existing file") | |
| except Exception as e: | |
| print(f" ⚠️ Error removing {epub_file}: {e}") | |
| try: | |
| for file in os.listdir(output_dir): | |
| if file.lower().endswith(('.opf', '.ncx')): | |
| file_path = os.path.join(output_dir, file) | |
| try: | |
| os.remove(file_path) | |
| print(f" 🗑️ Removed old EPUB file: {file}") | |
| except PermissionError: | |
| print(f" ⚠️ Cannot remove {file} (permission denied)") | |
| except Exception as e: | |
| print(f" ⚠️ Error removing {file}: {e}") | |
| except Exception as e: | |
| print(f"⚠️ Error scanning for EPUB files: {e}") | |
| if not cleanup_success: | |
| print("⚠️ Some cleanup operations failed due to file permissions") | |
| print(" The program will continue and merge with existing files") | |
| return cleanup_success | |
| def _count_existing_resources( output_dir, extracted_resources): | |
| """Count existing resources when skipping extraction""" | |
| for resource_type in ['css', 'fonts', 'images', 'epub_structure']: | |
| if resource_type == 'epub_structure': | |
| epub_files = [] | |
| for file in ['container.xml', 'content.opf', 'toc.ncx']: | |
| if os.path.exists(os.path.join(output_dir, file)): | |
| epub_files.append(file) | |
| try: | |
| for file in os.listdir(output_dir): | |
| if file.lower().endswith(('.opf', '.ncx')) and file not in epub_files: | |
| epub_files.append(file) | |
| except: | |
| pass | |
| extracted_resources[resource_type] = epub_files | |
| else: | |
| resource_dir = os.path.join(output_dir, resource_type) | |
| if os.path.exists(resource_dir): | |
| try: | |
| files = [f for f in os.listdir(resource_dir) if os.path.isfile(os.path.join(resource_dir, f))] | |
| extracted_resources[resource_type] = files | |
| except: | |
| extracted_resources[resource_type] = [] | |
| total_existing = sum(len(files) for files in extracted_resources.values()) | |
| print(f"✅ Found {total_existing} existing resource files") | |
| return extracted_resources | |
| def _validate_critical_files( output_dir, extracted_resources): | |
| """Validate that critical EPUB files were extracted""" | |
| total_extracted = sum(len(files) for files in extracted_resources.values()) | |
| print(f"✅ Extracted {total_extracted} resource files:") | |
| for resource_type, files in extracted_resources.items(): | |
| if files: | |
| if resource_type == 'epub_structure': | |
| print(f" • EPUB Structure: {len(files)} files") | |
| for file in files: | |
| print(f" - {file}") | |
| else: | |
| print(f" • {resource_type.title()}: {len(files)} files") | |
| critical_files = ['container.xml'] | |
| missing_critical = [f for f in critical_files if not os.path.exists(os.path.join(output_dir, f))] | |
| if missing_critical: | |
| print(f"⚠️ WARNING: Missing critical EPUB files: {missing_critical}") | |
| print(" This may prevent proper EPUB reconstruction!") | |
| else: | |
| print("✅ All critical EPUB structure files extracted successfully") | |
| opf_files = [f for f in extracted_resources['epub_structure'] if f.lower().endswith('.opf')] | |
| if not opf_files: | |
| print("⚠️ WARNING: No OPF file found! This will prevent EPUB reconstruction.") | |
| else: | |
| print(f"✅ Found OPF file(s): {opf_files}") | |
| def _create_extraction_report( output_dir, metadata, chapters, extracted_resources): | |
| """Create comprehensive extraction report with HTML file tracking""" | |
| report_path = os.path.join(output_dir, 'extraction_report.txt') | |
| with open(report_path, 'w', encoding='utf-8') as f: | |
| f.write("EPUB Extraction Report\n") | |
| f.write("=" * 50 + "\n\n") | |
| f.write(f"EXTRACTION MODE: {metadata.get('extraction_mode', 'unknown').upper()}\n\n") | |
| f.write("METADATA:\n") | |
| for key, value in metadata.items(): | |
| if key not in ['chapter_titles', 'extracted_resources', 'extraction_mode']: | |
| f.write(f" {key}: {value}\n") | |
| f.write(f"\nCHAPTERS ({len(chapters)}):\n") | |
| text_chapters = [] | |
| image_only_chapters = [] | |
| mixed_chapters = [] | |
| for chapter in chapters: | |
| if chapter.get('has_images') and chapter.get('file_size', 0) < 500: | |
| image_only_chapters.append(chapter) | |
| elif chapter.get('has_images') and chapter.get('file_size', 0) >= 500: | |
| mixed_chapters.append(chapter) | |
| else: | |
| text_chapters.append(chapter) | |
| if text_chapters: | |
| f.write(f"\n TEXT CHAPTERS ({len(text_chapters)}):\n") | |
| for c in text_chapters: | |
| f.write(f" {c['num']:3d}. {c['title']} ({c['detection_method']})\n") | |
| if c.get('original_html_file'): | |
| f.write(f" → {c['original_html_file']}\n") | |
| if image_only_chapters: | |
| f.write(f"\n IMAGE-ONLY CHAPTERS ({len(image_only_chapters)}):\n") | |
| for c in image_only_chapters: | |
| f.write(f" {c['num']:3d}. {c['title']} (images: {c.get('image_count', 0)})\n") | |
| if c.get('original_html_file'): | |
| f.write(f" → {c['original_html_file']}\n") | |
| if 'body' in c: | |
| try: | |
| soup = BeautifulSoup(c['body'], 'html.parser') | |
| images = soup.find_all('img') | |
| for img in images[:3]: | |
| src = img.get('src', 'unknown') | |
| f.write(f" • Image: {src}\n") | |
| if len(images) > 3: | |
| f.write(f" • ... and {len(images) - 3} more images\n") | |
| except: | |
| pass | |
| if mixed_chapters: | |
| f.write(f"\n MIXED CONTENT CHAPTERS ({len(mixed_chapters)}):\n") | |
| for c in mixed_chapters: | |
| f.write(f" {c['num']:3d}. {c['title']} (text: {c.get('file_size', 0)} chars, images: {c.get('image_count', 0)})\n") | |
| if c.get('original_html_file'): | |
| f.write(f" → {c['original_html_file']}\n") | |
| f.write(f"\nRESOURCES EXTRACTED:\n") | |
| for resource_type, files in extracted_resources.items(): | |
| if files: | |
| if resource_type == 'epub_structure': | |
| f.write(f" EPUB Structure: {len(files)} files\n") | |
| for file in files: | |
| f.write(f" - {file}\n") | |
| else: | |
| f.write(f" {resource_type.title()}: {len(files)} files\n") | |
| for file in files[:5]: | |
| f.write(f" - {file}\n") | |
| if len(files) > 5: | |
| f.write(f" ... and {len(files) - 5} more\n") | |
| f.write(f"\nHTML FILES WRITTEN:\n") | |
| html_files_written = metadata.get('html_files_written', 0) | |
| f.write(f" Total: {html_files_written} files\n") | |
| f.write(f" Location: Main directory and 'originals' subdirectory\n") | |
| f.write(f"\nPOTENTIAL ISSUES:\n") | |
| issues = [] | |
| if image_only_chapters: | |
| issues.append(f" • {len(image_only_chapters)} chapters contain only images (may need OCR)") | |
| missing_html = sum(1 for c in chapters if not c.get('original_html_file')) | |
| if missing_html > 0: | |
| issues.append(f" • {missing_html} chapters failed to write HTML files") | |
| if not extracted_resources.get('epub_structure'): | |
| issues.append(" • No EPUB structure files found (may affect reconstruction)") | |
| if not issues: | |
| f.write(" None detected - extraction appears successful!\n") | |
| else: | |
| for issue in issues: | |
| f.write(issue + "\n") | |
| print(f"📄 Saved extraction report to: {report_path}") | |
| def _log_extraction_summary( chapters, extracted_resources, detected_language, html_files_written=0): | |
| """Log final extraction summary with HTML file information""" | |
| extraction_mode = chapters[0].get('extraction_mode', 'unknown') if chapters else 'unknown' | |
| print(f"\n✅ {extraction_mode.capitalize()} extraction complete!") | |
| print(f" 📚 Chapters: {len(chapters)}") | |
| print(f" 📄 HTML files written: {html_files_written}") | |
| print(f" 🎨 Resources: {sum(len(files) for files in extracted_resources.values())}") | |
| print(f" 🌍 Language: {detected_language}") | |
| image_only_count = sum(1 for c in chapters if c.get('has_images') and c.get('file_size', 0) < 500) | |
| if image_only_count > 0: | |
| print(f" 📸 Image-only chapters: {image_only_count}") | |
| epub_files = extracted_resources.get('epub_structure', []) | |
| if epub_files: | |
| print(f" 📋 EPUB Structure: {len(epub_files)} files ({', '.join(epub_files)})") | |
| else: | |
| print(f" ⚠️ No EPUB structure files extracted!") | |
| print(f"\n🔍 Pre-flight check readiness:") | |
| print(f" ✅ HTML files: {'READY' if html_files_written > 0 else 'NOT READY'}") | |
| print(f" ✅ Metadata: READY") | |
| print(f" ✅ Resources: READY") | |
| def _process_single_html_file( | |
| file_path, | |
| file_index, | |
| zip_file_path, | |
| parser, | |
| merge_candidates, | |
| disable_merging, | |
| enhanced_extractor, | |
| extraction_mode, | |
| enhanced_filtering, | |
| preserve_structure, | |
| protect_angle_brackets_func, | |
| pattern_manager, | |
| files_to_process, | |
| is_stop_requested | |
| ): | |
| """Process a single HTML file from an EPUB - standalone function for multiprocessing. | |
| This function is at module level to be picklable for ProcessPoolExecutor. | |
| All needed data must be passed as parameters. | |
| Returns: | |
| tuple: (chapter_info, h1_found, h2_found, file_size, sample_text, skipped_info) | |
| - chapter_info: dict with chapter data, or None if skipped/error | |
| - h1_found: bool indicating if h1 tags were found | |
| - h2_found: bool indicating if h2 tags were found | |
| - file_size: int size of content text | |
| - sample_text: str text sample for language detection | |
| - skipped_info: tuple (file_path, reason, detail) if skipped, else None | |
| """ | |
| from bs4 import BeautifulSoup | |
| import os | |
| import zipfile | |
| # Check stop | |
| if is_stop_requested(): | |
| return None, False, False, 0, '', None | |
| try: | |
| # Open our own ZipFile instance for thread safety | |
| with zipfile.ZipFile(zip_file_path, 'r') as zf: | |
| # Read file data | |
| file_data = zf.read(file_path) | |
| # Decode the file data | |
| html_content = None | |
| detected_encoding = None | |
| for encoding in ['utf-8', 'utf-16', 'gb18030', 'shift_jis', 'euc-kr', 'gbk', 'big5']: | |
| try: | |
| html_content = file_data.decode(encoding) | |
| detected_encoding = encoding | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if not html_content: | |
| print(f"[WARNING] Could not decode {file_path}") | |
| return None, False, False, 0, '', None | |
| # Check if this file needs merging | |
| if not disable_merging and file_path in merge_candidates: | |
| section_file = merge_candidates[file_path] | |
| print(f"[DEBUG] Processing merge for: {file_path}") | |
| try: | |
| # Read section file with our own ZipFile | |
| with zipfile.ZipFile(zip_file_path, 'r') as zf: | |
| section_data = zf.read(section_file) | |
| section_html = None | |
| for encoding in ['utf-8', 'utf-16', 'gb18030', 'shift_jis', 'euc-kr', 'gbk', 'big5']: | |
| try: | |
| section_html = section_data.decode(encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if section_html: | |
| # Quick check if section is small enough to merge | |
| section_soup = BeautifulSoup(section_html, parser) | |
| section_text = section_soup.get_text(strip=True) | |
| if len(section_text) < 200: # Merge if section is small | |
| # Extract body content | |
| chapter_soup = BeautifulSoup(html_content, parser) | |
| if section_soup.body: | |
| section_body_content = ''.join(str(child) for child in section_soup.body.children) | |
| else: | |
| section_body_content = section_html | |
| if chapter_soup.body: | |
| chapter_body_content = ''.join(str(child) for child in chapter_soup.body.children) | |
| else: | |
| chapter_body_content = html_content | |
| # Merge content | |
| html_content = section_body_content + "\n<hr/>\n" + chapter_body_content | |
| print(f" → MERGED: Section ({len(section_text)} chars) + Chapter") | |
| else: | |
| print(f" → NOT MERGED: Section too large ({len(section_text)} chars)") | |
| except Exception as e: | |
| print(f"[WARNING] Failed to merge {file_path}: {e}") | |
| # === ENHANCED EXTRACTION POINT === | |
| content_html = None | |
| content_text = None | |
| chapter_title = None | |
| enhanced_extraction_used = False | |
| # Determine whether to use enhanced extractor | |
| use_enhanced = enhanced_extractor and extraction_mode == "enhanced" | |
| force_bs_traditional = False | |
| try: | |
| force_bs = os.getenv('FORCE_BS_FOR_TRADITIONAL', '0') == '1' | |
| model_env = os.getenv('MODEL', '') | |
| # Check for traditional translation API (inline to avoid circular imports) | |
| is_traditional_api = model_env in ['deepl', 'google-translate', 'google-translate-free'] or model_env.startswith('deepl/') or model_env.startswith('google-translate/') | |
| if force_bs and is_traditional_api: | |
| use_enhanced = False | |
| force_bs_traditional = True | |
| except Exception: | |
| pass | |
| # Use enhanced extractor if available and allowed | |
| if use_enhanced: | |
| clean_content, _, chapter_title = enhanced_extractor.extract_chapter_content( | |
| html_content, enhanced_filtering | |
| ) | |
| enhanced_extraction_used = True | |
| content_html = clean_content | |
| content_text = clean_content | |
| # BeautifulSoup method (only for non-enhanced modes) | |
| if not enhanced_extraction_used: | |
| if extraction_mode == "enhanced" and not force_bs_traditional: | |
| print(f"❌ Skipping {file_path} - enhanced extraction required but not available") | |
| return None, False, False, 0, '', None | |
| # Parse the (possibly merged) content | |
| protected_html = protect_angle_brackets_func(html_content) | |
| soup = BeautifulSoup(protected_html, parser) | |
| # Get effective mode for filtering | |
| effective_filtering = enhanced_filtering if extraction_mode == "enhanced" else extraction_mode | |
| # In full mode, keep the entire HTML structure | |
| if effective_filtering == "full": | |
| content_html = html_content | |
| content_text = soup.get_text(strip=True) | |
| else: | |
| # Smart and comprehensive modes extract body content | |
| if soup.body: | |
| content_html = str(soup.body) | |
| content_text = soup.body.get_text(strip=True) | |
| else: | |
| content_html = html_content | |
| content_text = soup.get_text(strip=True) | |
| # Extract title (with ignore settings support) | |
| chapter_title = None | |
| # Check settings for batch translation | |
| batch_translate_active = os.getenv('BATCH_TRANSLATE_HEADERS', '0') == '1' | |
| use_title_tag = os.getenv('USE_TITLE', '0') == '1' or not batch_translate_active | |
| ignore_header_tags = os.getenv('IGNORE_HEADER', '0') == '1' and batch_translate_active | |
| # Extract from title tag if using titles | |
| if use_title_tag and soup.title and soup.title.string: | |
| chapter_title = soup.title.string.strip() | |
| # Extract from header tags if not ignored and no title found | |
| if not chapter_title and not ignore_header_tags: | |
| for header_tag in ['h1', 'h2', 'h3']: | |
| header = soup.find(header_tag) | |
| if header: | |
| chapter_title = header.get_text(strip=True) | |
| break | |
| # Fallback to filename if nothing found | |
| if not chapter_title: | |
| chapter_title = os.path.splitext(os.path.basename(file_path))[0] | |
| # Get the effective extraction mode for processing logic | |
| effective_mode = enhanced_filtering if extraction_mode == "enhanced" else extraction_mode | |
| # Skip truly empty files in smart mode | |
| if effective_mode == "smart" and not disable_merging and len(content_text.strip()) < 10: | |
| skipped_info = (file_path, 'empty', len(content_text)) | |
| return None, False, False, 0, '', skipped_info | |
| # Get actual chapter number based on original position | |
| actual_chapter_num = files_to_process.index(file_path) + 1 | |
| # Mode-specific logic | |
| detection_method = None | |
| h1_found = False | |
| h2_found = False | |
| if effective_mode == "comprehensive" or effective_mode == "full": | |
| # For comprehensive/full mode, use sequential numbering | |
| chapter_num = actual_chapter_num | |
| if not chapter_title: | |
| chapter_title = os.path.splitext(os.path.basename(file_path))[0] | |
| detection_method = f"{extraction_mode}_sequential" if extraction_mode == "enhanced" else f"{effective_mode}_sequential" | |
| elif effective_mode == "smart": | |
| # For smart mode, when merging is disabled, use sequential numbering | |
| if disable_merging: | |
| chapter_num = actual_chapter_num | |
| if not chapter_title: | |
| chapter_title = os.path.splitext(os.path.basename(file_path))[0] | |
| detection_method = f"{extraction_mode}_sequential_no_merge" if extraction_mode == "enhanced" else "sequential_no_merge" | |
| else: | |
| # When merging is enabled, try to extract chapter info | |
| protected_html = protect_angle_brackets_func(html_content) | |
| soup = BeautifulSoup(protected_html, parser) | |
| # Count headers | |
| h1_tags = soup.find_all('h1') | |
| h2_tags = soup.find_all('h2') | |
| h1_found = len(h1_tags) > 0 | |
| h2_found = len(h2_tags) > 0 | |
| # Extract chapter number and title | |
| chapter_num, extracted_title, detection_method = _extract_chapter_info( | |
| soup, file_path, content_text, html_content, pattern_manager | |
| ) | |
| # Use extracted title if we don't have one | |
| if extracted_title and not chapter_title: | |
| chapter_title = extracted_title | |
| # For hash-based filenames, chapter_num might be None | |
| if chapter_num is None: | |
| chapter_num = actual_chapter_num | |
| detection_method = f"{extraction_mode}_sequential_fallback" if extraction_mode == "enhanced" else "sequential_fallback" | |
| print(f"[DEBUG] No chapter number found in {file_path}, assigning: {chapter_num}") | |
| # Filter content_html for title/header settings (before processing) | |
| batch_translate_active = os.getenv('BATCH_TRANSLATE_HEADERS', '0') == '1' | |
| use_title_tag = os.getenv('USE_TITLE', '0') == '1' or not batch_translate_active | |
| ignore_header_tags = os.getenv('IGNORE_HEADER', '0') == '1' and batch_translate_active | |
| remove_duplicate_h1_p = os.getenv('REMOVE_DUPLICATE_H1_P', '0') == '1' | |
| if (not use_title_tag or ignore_header_tags or remove_duplicate_h1_p) and content_html and not enhanced_extraction_used: | |
| # Parse the content HTML to remove unused tags | |
| content_soup = BeautifulSoup(content_html, parser) | |
| # Remove title tags if not using titles | |
| if not use_title_tag: | |
| for title_tag in content_soup.find_all('title'): | |
| title_tag.decompose() | |
| # Remove header tags if ignored | |
| if ignore_header_tags: | |
| for header_tag in content_soup.find_all(['h1', 'h2', 'h3']): | |
| header_tag.decompose() | |
| # Remove duplicate H1+P pairs (where P immediately follows H1 with same text) | |
| if remove_duplicate_h1_p: | |
| for h1_tag in content_soup.find_all('h1'): | |
| # Skip split marker H1 tags | |
| h1_id = h1_tag.get('id', '') | |
| if h1_id and h1_id.startswith('split-'): | |
| continue | |
| h1_text = h1_tag.get_text(strip=True) | |
| if 'SPLIT MARKER' in h1_text: | |
| continue | |
| # Get the next sibling (skipping whitespace/text nodes) | |
| next_sibling = h1_tag.find_next_sibling() | |
| if next_sibling and next_sibling.name == 'p': | |
| # Compare text content (stripped) | |
| p_text = next_sibling.get_text(strip=True) | |
| if h1_text == p_text: | |
| # Remove the duplicate paragraph | |
| next_sibling.decompose() | |
| # Update content_html with filtered version | |
| content_html = str(content_soup) | |
| # Process images and metadata | |
| protected_html = protect_angle_brackets_func(html_content) | |
| soup = BeautifulSoup(protected_html, parser) | |
| images = soup.find_all('img') | |
| has_images = len(images) > 0 | |
| is_image_only_chapter = has_images and len(content_text.strip()) < 500 | |
| if is_image_only_chapter: | |
| print(f"[DEBUG] Image-only chapter detected: {file_path} ({len(images)} images, {len(content_text)} chars)") | |
| # Calculate content hash (inline to avoid circular imports) | |
| import hashlib | |
| content_hash = hashlib.sha256(content_html.encode('utf-8', errors='ignore')).hexdigest() | |
| file_size = len(content_text) | |
| sample_text = content_text[:500] if effective_mode == "smart" else '' | |
| # Ensure chapter_num is always an integer | |
| if isinstance(chapter_num, float): | |
| chapter_num = int(chapter_num) | |
| # Create chapter info | |
| chapter_info = { | |
| "num": chapter_num, | |
| "title": chapter_title or f"Chapter {chapter_num}", | |
| "body": content_html, | |
| "filename": file_path, | |
| # IMPORTANT: For PDFs, we must preserve the original filename including extension | |
| # so that chapter_splitter.py can detect it as PDF content. | |
| # But we also want to preserve the basename for display/logging. | |
| "source_file": os.path.basename(zip_file_path) if zip_file_path else file_path, | |
| "original_filename": os.path.basename(file_path), | |
| "original_basename": os.path.splitext(os.path.basename(file_path))[0], | |
| "content_hash": content_hash, | |
| "detection_method": detection_method if detection_method else "pending", | |
| "file_size": file_size, | |
| "has_images": has_images, | |
| "image_count": len(images), | |
| "is_empty": len(content_text.strip()) == 0, | |
| "is_image_only": is_image_only_chapter, | |
| "extraction_mode": extraction_mode, | |
| "file_index": file_index | |
| } | |
| # Add enhanced extraction info if used | |
| if enhanced_extraction_used: | |
| chapter_info["enhanced_extraction"] = True | |
| chapter_info["enhanced_filtering"] = enhanced_filtering | |
| chapter_info["preserve_structure"] = preserve_structure | |
| # Store original HTML for image restoration | |
| chapter_info["original_html"] = html_content | |
| # Add merge info if applicable | |
| if not disable_merging and file_path in merge_candidates: | |
| chapter_info["was_merged"] = True | |
| chapter_info["merged_with"] = merge_candidates[file_path] | |
| if effective_mode == "smart": | |
| chapter_info["language_sample"] = content_text[:500] | |
| # Debug for section files | |
| if 'section' in chapter_info['original_basename'].lower(): | |
| print(f"[DEBUG] Added section file to candidates: {chapter_info['original_basename']} (size: {chapter_info['file_size']})") | |
| return chapter_info, h1_found, h2_found, file_size, sample_text, None | |
| except Exception as e: | |
| print(f"[ERROR] Failed to process {file_path}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None, False, False, 0, '', None | |