from __future__ import annotations import json import re import zipfile from pathlib import Path from typing import Any from shared.utils import files_locator as fl WAN_GP_SETTINGS_SUFFIXES = {".json", ".zip"} SETTINGS_BUNDLE_ATTACHMENT_KEYS = ("image_start", "image_end", "image_refs", "image_guide", "image_mask", "video_guide", "video_mask", "video_source", "audio_guide", "audio_guide2", "audio_source", "seedvc_voice_sample", "seedvc_voice_sample2", "custom_guide") def is_wangp_settings_filename(value: Any) -> bool: return Path(str(value or "").strip()).suffix.lower() in WAN_GP_SETTINGS_SUFFIXES def _cache_root() -> Path: return Path(__file__).resolve().parents[2] / "settings" / "_settings_bundle_cache" def _safe_zip_name(name: Any) -> str: text = str(name or "").strip().replace("\\", "/") if not fl.is_relative_down_path(text): return "" return text def _extract_bundle_file(zf: zipfile.ZipFile, member_name: str, cache_dir: Path) -> str: safe_name = _safe_zip_name(member_name) if len(safe_name) == 0 or safe_name not in zf.namelist(): return member_name target_name = re.sub(r"[^A-Za-z0-9._-]+", "_", safe_name) target_path = cache_dir / target_name target_path.parent.mkdir(parents=True, exist_ok=True) with zf.open(safe_name) as source, target_path.open("wb") as target: target.write(source.read()) return str(target_path.resolve()) def _extract_attachment_value(zf: zipfile.ZipFile, value: Any, cache_dir: Path) -> Any: if isinstance(value, str): if Path(value).is_absolute(): return value return _extract_bundle_file(zf, value, cache_dir) if isinstance(value, list): return [_extract_attachment_value(zf, item, cache_dir) for item in value] return value def load_first_settings_from_queue_zip(zip_path: str | Path, attachment_keys: list[str] | tuple[str, ...]) -> tuple[dict[str, Any] | None, int]: source_path = Path(zip_path).resolve() stat = source_path.stat() cache_dir = _cache_root() / f"{source_path.stem}_{int(getattr(stat, 'st_mtime_ns', int(stat.st_mtime * 1_000_000_000)))}" with zipfile.ZipFile(source_path, "r") as zf: if "queue.json" not in zf.namelist(): return None, 0 manifest = json.loads(zf.read("queue.json").decode("utf-8")) if not isinstance(manifest, list) or len(manifest) == 0: return None, 0 task = manifest[0] if not isinstance(task, dict): return None, len(manifest) params = task.get("params", task) if not isinstance(params, dict): return None, len(manifest) payload = dict(params) cache_dir.mkdir(parents=True, exist_ok=True) for key in attachment_keys: if key in payload: payload[key] = _extract_attachment_value(zf, payload[key], cache_dir) return payload, len(manifest)