Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import copy | |
| import json | |
| import re | |
| import shutil | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from threading import Lock | |
| from typing import Iterable, List, Optional | |
| from uuid import uuid4 | |
| from fastapi import UploadFile | |
| from PIL import Image, ImageOps, UnidentifiedImageError | |
| from ..core.config import get_settings | |
| IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp"} | |
| DOC_EXTS = {".pdf", ".doc", ".docx"} | |
| DATA_EXTS = {".csv", ".xls", ".xlsx"} | |
| EXIF_NORMALIZE_EXTS = {".jpg", ".jpeg", ".png", ".webp"} | |
| UPLOAD_IMAGE_MAX_LONG_EDGE_PX = 2400 | |
| UPLOAD_JPEG_QUALITY = 82 | |
| UPLOAD_WEBP_QUALITY = 80 | |
| SESSION_ID_RE = re.compile(r"^[0-9a-f]{32}$") | |
| BUILTIN_PAGE_TEMPLATES = [ | |
| { | |
| "id": "repex:standard", | |
| "name": "Standard Job Sheet", | |
| "description": "Observations + up to two photos.", | |
| "blank": False, | |
| "variant": "full", | |
| "photo_layout": "auto", | |
| "source": "builtin", | |
| }, | |
| { | |
| "id": "repex:photos", | |
| "name": "Photo Continuation", | |
| "description": "Photo-only continuation page.", | |
| "blank": False, | |
| "variant": "photos", | |
| "photo_layout": "auto", | |
| "source": "builtin", | |
| }, | |
| { | |
| "id": "repex:blank", | |
| "name": "Blank Canvas", | |
| "description": "Blank white page.", | |
| "blank": True, | |
| "variant": "full", | |
| "photo_layout": "auto", | |
| "source": "builtin", | |
| }, | |
| ] | |
| BUILTIN_PAGE_TEMPLATE_MAP = {item["id"]: item for item in BUILTIN_PAGE_TEMPLATES} | |
| class StoredFile: | |
| id: str | |
| name: str | |
| size: int | |
| content_type: str | |
| category: str | |
| path: str | |
| def _now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _safe_name(name: str) -> str: | |
| name = Path(name).name | |
| name = re.sub(r"[^a-zA-Z0-9._-]", "_", name) | |
| return name or "upload" | |
| def _category_for(filename: str) -> str: | |
| ext = Path(filename).suffix.lower() | |
| if ext in IMAGE_EXTS: | |
| return "photos" | |
| if ext in DOC_EXTS: | |
| return "documents" | |
| if ext in DATA_EXTS: | |
| return "data_files" | |
| return "documents" | |
| def _normalize_uploaded_photo(path: Path) -> None: | |
| ext = path.suffix.lower() | |
| if ext not in EXIF_NORMALIZE_EXTS: | |
| return | |
| try: | |
| with Image.open(path) as image: | |
| normalized = ImageOps.exif_transpose(image) | |
| resampling = getattr(Image, "Resampling", None) | |
| lanczos = resampling.LANCZOS if resampling is not None else Image.LANCZOS | |
| long_edge = max(normalized.width, normalized.height) | |
| if long_edge > UPLOAD_IMAGE_MAX_LONG_EDGE_PX: | |
| ratio = UPLOAD_IMAGE_MAX_LONG_EDGE_PX / float(long_edge) | |
| normalized = normalized.resize( | |
| ( | |
| max(1, int(round(normalized.width * ratio))), | |
| max(1, int(round(normalized.height * ratio))), | |
| ), | |
| lanczos, | |
| ) | |
| if ext in {".jpg", ".jpeg"}: | |
| if normalized.mode in ("RGBA", "LA", "P"): | |
| normalized = normalized.convert("RGB") | |
| normalized.save( | |
| path, | |
| format="JPEG", | |
| quality=UPLOAD_JPEG_QUALITY, | |
| optimize=True, | |
| progressive=True, | |
| exif=b"", | |
| ) | |
| elif ext == ".webp": | |
| if normalized.mode not in ("RGB", "RGBA"): | |
| normalized = normalized.convert("RGB") | |
| normalized.save( | |
| path, | |
| format="WEBP", | |
| quality=UPLOAD_WEBP_QUALITY, | |
| method=6, | |
| ) | |
| else: # png | |
| normalized.save(path, format="PNG", optimize=True) | |
| except (UnidentifiedImageError, OSError, ValueError, TypeError): | |
| # Keep original bytes if the file cannot be decoded by Pillow. | |
| return | |
| def _validate_session_id(session_id: str) -> str: | |
| if not session_id: | |
| raise ValueError("Invalid session id.") | |
| normalized = session_id.lower() | |
| if not SESSION_ID_RE.match(normalized): | |
| raise ValueError("Invalid session id.") | |
| return normalized | |
| def _merge_text(primary: str, secondary: str) -> str: | |
| primary = (primary or "").strip() | |
| secondary = (secondary or "").strip() | |
| if not secondary: | |
| return primary | |
| if not primary: | |
| return secondary | |
| if secondary in primary: | |
| return primary | |
| return f"{primary} - {secondary}" | |
| def _normalize_template_fields(template: Optional[dict]) -> dict: | |
| if not isinstance(template, dict): | |
| return {} | |
| normalized = dict(template) | |
| item_description = _merge_text( | |
| normalized.get("item_description", ""), | |
| normalized.pop("condition_description", ""), | |
| ) | |
| if item_description: | |
| normalized["item_description"] = item_description | |
| else: | |
| normalized.pop("item_description", None) | |
| action_type = normalized.pop("action_type", "") | |
| required_action = _merge_text(action_type, normalized.get("required_action", "")) | |
| if required_action: | |
| normalized["required_action"] = required_action | |
| else: | |
| normalized.pop("required_action", None) | |
| figure_caption = _merge_text( | |
| normalized.get("figure_caption", ""), | |
| normalized.pop("figure_description", ""), | |
| ) | |
| if figure_caption: | |
| normalized["figure_caption"] = figure_caption | |
| else: | |
| normalized.pop("figure_caption", None) | |
| for legacy_key in ("accompanied_by", "project", "client_site"): | |
| normalized.pop(legacy_key, None) | |
| return normalized | |
| def _infer_template_id(page: dict) -> str: | |
| template_id = str(page.get("page_template") or "").strip() | |
| if template_id: | |
| return template_id | |
| if page.get("blank"): | |
| return "repex:blank" | |
| if str(page.get("variant") or "").strip().lower() == "photos": | |
| return "repex:photos" | |
| return "repex:standard" | |
| def _normalize_page_templates(templates: Optional[List[dict]]) -> List[dict]: | |
| normalized: List[dict] = [] | |
| seen: set[str] = set() | |
| for template in templates or []: | |
| if hasattr(template, "model_dump"): | |
| template = template.model_dump() | |
| elif hasattr(template, "dict"): | |
| template = template.dict() | |
| if not isinstance(template, dict): | |
| continue | |
| template_id = str(template.get("id") or "").strip() | |
| if not template_id or template_id in BUILTIN_PAGE_TEMPLATE_MAP: | |
| continue | |
| if template_id in seen: | |
| continue | |
| seen.add(template_id) | |
| name = str(template.get("name") or template_id).strip() or template_id | |
| variant = str(template.get("variant") or "full").strip().lower() | |
| if variant not in {"full", "photos"}: | |
| variant = "full" | |
| photo_layout = str(template.get("photo_layout") or "auto").strip().lower() | |
| if photo_layout not in {"auto", "two-column", "stacked"}: | |
| photo_layout = "auto" | |
| normalized.append( | |
| { | |
| "id": template_id, | |
| "name": name, | |
| "description": str(template.get("description") or "").strip(), | |
| "blank": bool(template.get("blank")), | |
| "variant": variant, | |
| "photo_layout": photo_layout, | |
| "source": "custom", | |
| } | |
| ) | |
| return normalized | |
| class SessionStore: | |
| def __init__(self, base_dir: Optional[Path] = None) -> None: | |
| settings = get_settings() | |
| self.base_dir = (base_dir or settings.storage_dir).resolve() | |
| self.sessions_dir = self.base_dir / "sessions" | |
| self.sessions_dir.mkdir(parents=True, exist_ok=True) | |
| self.max_upload_bytes = settings.max_upload_mb * 1024 * 1024 | |
| self._lock = Lock() | |
| self._migrate_storage() | |
| def _migrate_storage(self) -> None: | |
| for session_file in self.sessions_dir.glob("*/session.json"): | |
| try: | |
| raw = json.loads(session_file.read_text(encoding="utf-8")) | |
| except Exception: | |
| continue | |
| normalized = self._normalize_session(copy.deepcopy(raw)) | |
| if normalized != raw: | |
| try: | |
| session_file.write_text( | |
| json.dumps(normalized, indent=2), encoding="utf-8" | |
| ) | |
| except Exception: | |
| continue | |
| def _template_index(self, session: dict) -> dict: | |
| custom_templates = _normalize_page_templates(session.get("page_templates") or []) | |
| session["page_templates"] = custom_templates | |
| merged = {key: dict(value) for key, value in BUILTIN_PAGE_TEMPLATE_MAP.items()} | |
| for template in custom_templates: | |
| merged[template["id"]] = template | |
| return merged | |
| def _normalize_page(self, page: dict, template_index: dict) -> dict: | |
| template = _normalize_template_fields(page.get("template")) | |
| normalized = {**page, "template": template} | |
| template_id = _infer_template_id(normalized) | |
| definition = template_index.get(template_id) or BUILTIN_PAGE_TEMPLATE_MAP["repex:standard"] | |
| normalized["page_template"] = definition["id"] | |
| normalized["blank"] = bool(definition.get("blank")) | |
| normalized["variant"] = ( | |
| str(definition.get("variant") or normalized.get("variant") or "full") | |
| .strip() | |
| .lower() | |
| ) | |
| if normalized["variant"] not in {"full", "photos"}: | |
| normalized["variant"] = "full" | |
| if normalized.get("photo_layout") is None and definition.get("photo_layout"): | |
| normalized["photo_layout"] = definition["photo_layout"] | |
| return normalized | |
| def list_sessions(self) -> List[dict]: | |
| sessions: List[dict] = [] | |
| for session_file in sorted(self.sessions_dir.glob("*/session.json"), reverse=True): | |
| try: | |
| session = json.loads(session_file.read_text(encoding="utf-8")) | |
| session = self._normalize_session(session) | |
| sessions.append(session) | |
| except Exception: | |
| continue | |
| return sessions | |
| def create_session(self, document_no: str, inspection_date: str) -> dict: | |
| session_id = uuid4().hex | |
| now = _now_iso() | |
| session = { | |
| "id": session_id, | |
| "status": "ready", | |
| "created_at": now, | |
| "updated_at": now, | |
| "document_no": document_no, | |
| "inspection_date": inspection_date, | |
| "uploads": {"photos": [], "documents": [], "data_files": []}, | |
| "selected_photo_ids": [], | |
| "page_count": 0, | |
| "pages": [], | |
| "jobsheet_sections": [], | |
| "headings": [], | |
| "page_templates": [], | |
| } | |
| self._save_session(session) | |
| return session | |
| def validate_session_id(self, session_id: str) -> str: | |
| return _validate_session_id(session_id) | |
| def get_session(self, session_id: str) -> Optional[dict]: | |
| session_path = self._session_file(session_id) | |
| if not session_path.exists(): | |
| return None | |
| try: | |
| session = json.loads(session_path.read_text(encoding="utf-8")) | |
| return self._normalize_session(session) | |
| except Exception: | |
| return None | |
| def update_session(self, session: dict) -> None: | |
| session = self._normalize_session(session) | |
| session["updated_at"] = _now_iso() | |
| self._save_session(session) | |
| def delete_session(self, session_id: str) -> bool: | |
| session_dir = self._session_dir(session_id) | |
| if not session_dir.exists(): | |
| return False | |
| with self._lock: | |
| shutil.rmtree(session_dir, ignore_errors=False) | |
| return True | |
| def add_uploads(self, session: dict, uploads: Iterable[StoredFile]) -> dict: | |
| for item in uploads: | |
| session["uploads"].setdefault(item.category, []) | |
| session["uploads"][item.category].append( | |
| { | |
| "id": item.id, | |
| "name": item.name, | |
| "size": item.size, | |
| "content_type": item.content_type, | |
| "category": item.category, | |
| "path": item.path, | |
| } | |
| ) | |
| if not session.get("pages"): | |
| photo_count = len(session.get("uploads", {}).get("photos", []) or []) | |
| session["page_count"] = max(1, photo_count) | |
| self.update_session(session) | |
| return session | |
| def set_selected_photos(self, session: dict, selected_ids: List[str]) -> dict: | |
| session["selected_photo_ids"] = selected_ids | |
| if not session.get("pages"): | |
| session["page_count"] = max(1, len(selected_ids)) | |
| self.update_session(session) | |
| return session | |
| def set_pages(self, session: dict, pages: List[dict]) -> dict: | |
| if not pages: | |
| pages = [{"items": []}] | |
| template_index = self._template_index(session) | |
| normalized_pages = [] | |
| for page in pages: | |
| if not isinstance(page, dict): | |
| normalized_pages.append( | |
| self._normalize_page({"items": []}, template_index) | |
| ) | |
| continue | |
| normalized_pages.append(self._normalize_page(page, template_index)) | |
| # Legacy compatibility: store as a single section. | |
| session["jobsheet_sections"] = [ | |
| {"id": uuid4().hex, "title": "Section 1", "pages": normalized_pages} | |
| ] | |
| session["pages"] = [] | |
| session["page_count"] = len(normalized_pages) | |
| self.update_session(session) | |
| return session | |
| def ensure_pages(self, session: dict) -> List[dict]: | |
| # Legacy compatibility: flatten sections to pages. | |
| sections = self.ensure_sections(session) | |
| pages: List[dict] = [] | |
| for section in sections: | |
| pages.extend(section.get("pages") or []) | |
| session["page_count"] = len(pages) | |
| return pages | |
| def set_sections(self, session: dict, sections: List[dict]) -> dict: | |
| template_index = self._template_index(session) | |
| normalized: List[dict] = [] | |
| for section in sections or []: | |
| if hasattr(section, "model_dump"): | |
| section = section.model_dump() | |
| elif hasattr(section, "dict"): | |
| section = section.dict() | |
| pages = section.get("pages") or [] | |
| if pages: | |
| normalized_pages = [] | |
| for page in pages: | |
| if hasattr(page, "model_dump"): | |
| normalized_pages.append(page.model_dump()) | |
| elif hasattr(page, "dict"): | |
| normalized_pages.append(page.dict()) | |
| else: | |
| normalized_pages.append(page) | |
| pages = normalized_pages | |
| normalized_pages = [] | |
| for page in pages: | |
| if not isinstance(page, dict): | |
| normalized_pages.append( | |
| self._normalize_page({"items": []}, template_index) | |
| ) | |
| continue | |
| normalized_pages.append(self._normalize_page(page, template_index)) | |
| normalized.append( | |
| { | |
| "id": section.get("id") or uuid4().hex, | |
| "title": section.get("title") or "Section", | |
| "pages": normalized_pages if normalized_pages else [{"items": []}], | |
| } | |
| ) | |
| if not normalized: | |
| normalized = [{"id": uuid4().hex, "title": "Section 1", "pages": [{"items": []}]}] | |
| session["jobsheet_sections"] = normalized | |
| session["pages"] = [] | |
| session["page_count"] = sum(len(section.get("pages") or []) for section in normalized) | |
| self.update_session(session) | |
| return session | |
| def set_headings(self, session: dict, headings: List[dict]) -> dict: | |
| normalized: List[dict] = [] | |
| for heading in headings or []: | |
| if hasattr(heading, "model_dump"): | |
| heading = heading.model_dump() | |
| elif hasattr(heading, "dict"): | |
| heading = heading.dict() | |
| if not isinstance(heading, dict): | |
| continue | |
| number = str(heading.get("number") or "").strip() | |
| name = str(heading.get("name") or "").strip() | |
| normalized.append({"number": number, "name": name}) | |
| session["headings"] = normalized | |
| self.update_session(session) | |
| return session | |
| def set_page_templates(self, session: dict, templates: List[dict]) -> dict: | |
| session["page_templates"] = _normalize_page_templates(templates) | |
| template_index = self._template_index(session) | |
| sections = session.get("jobsheet_sections") or [] | |
| normalized_sections = [] | |
| for section in sections: | |
| if not isinstance(section, dict): | |
| continue | |
| pages = section.get("pages") or [] | |
| normalized_pages = [] | |
| for page in pages: | |
| if not isinstance(page, dict): | |
| normalized_pages.append( | |
| self._normalize_page({"items": []}, template_index) | |
| ) | |
| continue | |
| normalized_pages.append(self._normalize_page(page, template_index)) | |
| normalized_sections.append( | |
| { | |
| "id": section.get("id") or uuid4().hex, | |
| "title": section.get("title") or "Section", | |
| "pages": normalized_pages if normalized_pages else [{"items": []}], | |
| } | |
| ) | |
| if normalized_sections: | |
| session["jobsheet_sections"] = normalized_sections | |
| session["page_count"] = sum( | |
| len(section.get("pages") or []) for section in normalized_sections | |
| ) | |
| self.update_session(session) | |
| return session | |
| def ensure_sections(self, session: dict) -> List[dict]: | |
| template_index = self._template_index(session) | |
| sections = session.get("jobsheet_sections") or [] | |
| if sections: | |
| normalized_sections: List[dict] = [] | |
| for section in sections: | |
| if not isinstance(section, dict): | |
| continue | |
| pages = section.get("pages") or [] | |
| normalized_pages = [] | |
| for page in pages: | |
| if not isinstance(page, dict): | |
| normalized_pages.append( | |
| self._normalize_page({"items": []}, template_index) | |
| ) | |
| continue | |
| normalized_pages.append(self._normalize_page(page, template_index)) | |
| normalized_sections.append( | |
| { | |
| "id": section.get("id") or uuid4().hex, | |
| "title": section.get("title") or "Section", | |
| "pages": normalized_pages if normalized_pages else [{"items": []}], | |
| } | |
| ) | |
| session["jobsheet_sections"] = normalized_sections | |
| session["page_count"] = sum( | |
| len(section.get("pages") or []) for section in normalized_sections | |
| ) | |
| self.update_session(session) | |
| return normalized_sections | |
| pages = session.get("pages") or [] | |
| if not pages: | |
| selected_count = len(session.get("selected_photo_ids") or []) | |
| photo_count = len(session.get("uploads", {}).get("photos", []) or []) | |
| count = selected_count or photo_count or session.get("page_count", 1) or 1 | |
| pages = [{"items": []} for _ in range(count)] | |
| pages = [ | |
| self._normalize_page(page if isinstance(page, dict) else {"items": []}, template_index) | |
| for page in pages | |
| ] | |
| sections = [{"id": uuid4().hex, "title": "Section 1", "pages": pages}] | |
| session["jobsheet_sections"] = sections | |
| session["pages"] = [] | |
| session["page_count"] = len(pages) | |
| self.update_session(session) | |
| return sections | |
| def _normalize_session(self, session: dict) -> dict: | |
| if not isinstance(session, dict): | |
| return session | |
| document_no = _merge_text( | |
| session.get("document_no", ""), | |
| session.get("project_name", ""), | |
| ) | |
| if document_no: | |
| session["document_no"] = document_no | |
| session.pop("project_name", None) | |
| session.pop("notes", None) | |
| headings = session.get("headings") | |
| if isinstance(headings, dict): | |
| session["headings"] = [ | |
| {"number": str(key).strip(), "name": str(value).strip()} | |
| for key, value in headings.items() | |
| ] | |
| elif isinstance(headings, list): | |
| normalized_headings = [] | |
| for heading in headings: | |
| if hasattr(heading, "model_dump"): | |
| heading = heading.model_dump() | |
| elif hasattr(heading, "dict"): | |
| heading = heading.dict() | |
| if not isinstance(heading, dict): | |
| continue | |
| number = str(heading.get("number") or "").strip() | |
| name = str(heading.get("name") or "").strip() | |
| normalized_headings.append({"number": number, "name": name}) | |
| session["headings"] = normalized_headings | |
| else: | |
| session["headings"] = [] | |
| session["page_templates"] = _normalize_page_templates( | |
| session.get("page_templates") or [] | |
| ) | |
| template_index = self._template_index(session) | |
| pages = session.get("pages") or [] | |
| if pages: | |
| normalized_pages = [] | |
| for page in pages: | |
| if not isinstance(page, dict): | |
| normalized_pages.append( | |
| self._normalize_page({"items": []}, template_index) | |
| ) | |
| continue | |
| normalized_pages.append(self._normalize_page(page, template_index)) | |
| session["pages"] = normalized_pages | |
| sections = session.get("jobsheet_sections") or [] | |
| if sections: | |
| normalized_sections = [] | |
| for section in sections: | |
| if not isinstance(section, dict): | |
| continue | |
| pages = section.get("pages") or [] | |
| normalized_pages = [] | |
| for page in pages: | |
| if not isinstance(page, dict): | |
| normalized_pages.append( | |
| self._normalize_page({"items": []}, template_index) | |
| ) | |
| continue | |
| normalized_pages.append(self._normalize_page(page, template_index)) | |
| normalized_sections.append( | |
| { | |
| "id": section.get("id") or uuid4().hex, | |
| "title": section.get("title") or "Section", | |
| "pages": normalized_pages if normalized_pages else [{"items": []}], | |
| } | |
| ) | |
| session["jobsheet_sections"] = normalized_sections | |
| return session | |
| def save_upload(self, session_id: str, upload: UploadFile) -> StoredFile: | |
| filename = _safe_name(upload.filename or "upload") | |
| ext = Path(filename).suffix | |
| file_id = uuid4().hex | |
| stored_name = f"{file_id}{ext}" | |
| session_dir = self._session_dir(session_id) | |
| uploads_dir = session_dir / "uploads" | |
| uploads_dir.mkdir(parents=True, exist_ok=True) | |
| dest = uploads_dir / stored_name | |
| size = 0 | |
| with dest.open("wb") as handle: | |
| while True: | |
| chunk = upload.file.read(1024 * 1024) | |
| if not chunk: | |
| break | |
| size += len(chunk) | |
| if size > self.max_upload_bytes: | |
| handle.close() | |
| dest.unlink(missing_ok=True) | |
| raise ValueError("File exceeds maximum upload size.") | |
| handle.write(chunk) | |
| category = _category_for(filename) | |
| if category == "photos": | |
| _normalize_uploaded_photo(dest) | |
| size = dest.stat().st_size | |
| return StoredFile( | |
| id=file_id, | |
| name=filename, | |
| size=size, | |
| content_type=upload.content_type or "application/octet-stream", | |
| category=category, | |
| path=f"uploads/{stored_name}", | |
| ) | |
| def _session_dir(self, session_id: str) -> Path: | |
| safe_id = _validate_session_id(session_id) | |
| path = (self.sessions_dir / safe_id).resolve() | |
| if not str(path).startswith(str(self.sessions_dir.resolve())): | |
| raise ValueError("Invalid session id.") | |
| return path | |
| def session_dir(self, session_id: str) -> Path: | |
| return self._session_dir(session_id) | |
| def _session_file(self, session_id: str) -> Path: | |
| return self._session_dir(session_id) / "session.json" | |
| def _save_session(self, session: dict) -> None: | |
| session_dir = self._session_dir(session["id"]) | |
| session_dir.mkdir(parents=True, exist_ok=True) | |
| session_path = self._session_file(session["id"]) | |
| with self._lock: | |
| session_path.write_text(json.dumps(session, indent=2), encoding="utf-8") | |
| def resolve_upload_path(self, session: dict, file_id: str) -> Optional[Path]: | |
| uploads = session.get("uploads") or {} | |
| for items in uploads.values(): | |
| for item in items: | |
| if item.get("id") == file_id: | |
| relative = item.get("path") | |
| if relative: | |
| return self._session_dir(session["id"]) / relative | |
| return None | |