| """ZIP archive parser - extracts and delegates to txt/epub parsers.""" |
|
|
| import logging |
| import tempfile |
| import zipfile |
| from pathlib import Path |
|
|
| from .txt_parser import parse_txt |
| from .epub_parser import parse_epub |
|
|
| logger = logging.getLogger(__name__) |
|
|
| PARSERS = { |
| ".txt": parse_txt, |
| ".epub": parse_epub, |
| } |
|
|
|
|
| def parse_zip(filepath: Path) -> str: |
| """Extract a ZIP archive and parse all supported files inside. |
| |
| Args: |
| filepath: Path to the .zip file. |
| |
| Returns: |
| Combined text from all supported files in the archive. |
| """ |
| if not zipfile.is_zipfile(str(filepath)): |
| logger.error("Not a valid ZIP file: %s", filepath.name) |
| return "" |
|
|
| texts = [] |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| tmppath = Path(tmpdir) |
| try: |
| with zipfile.ZipFile(str(filepath), "r") as zf: |
| zf.extractall(tmppath) |
| except Exception as e: |
| logger.error("Failed to extract ZIP %s: %s", filepath.name, e) |
| return "" |
|
|
| |
| supported_files = [] |
| for ext, parser in PARSERS.items(): |
| for f in tmppath.rglob(f"*{ext}"): |
| if f.is_file() and not f.name.startswith("."): |
| supported_files.append((f, parser)) |
|
|
| if not supported_files: |
| logger.warning("No supported files found in ZIP: %s", filepath.name) |
| return "" |
|
|
| supported_files.sort(key=lambda x: x[0].name) |
| logger.info("Found %d supported files in ZIP %s", len(supported_files), filepath.name) |
|
|
| for f, parser in supported_files: |
| try: |
| text = parser(f) |
| if text.strip(): |
| texts.append(text) |
| logger.info(" Parsed %s (%d chars)", f.name, len(text)) |
| except Exception as e: |
| logger.error(" Failed to parse %s: %s", f.name, e) |
|
|
| return "\n\n".join(texts) |
|
|