testing_space / app.py
Imrao's picture
fix 1.1
36e496a verified
import os
import logging
import json
import tempfile
import zipfile
import shapely.geometry
import pyprt
import glob
import shutil
import pyproj
from shapely.ops import transform
from shapely.affinity import translate
import functools
import math
from fastapi import FastAPI, HTTPException, Body, Request
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from fastapi.staticfiles import StaticFiles
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for now
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["X-Centroid-Lat", "X-Centroid-Lon", "X-Convergence-Angle"]
)
# Mount static directory for I3S layers
# Ensure directory exists
LAYERS_DIR = os.path.join(os.path.dirname(__file__), "static", "layers")
if not os.path.exists(LAYERS_DIR):
os.makedirs(LAYERS_DIR)
app.mount("/layers", StaticFiles(directory=LAYERS_DIR), name="layers")
RPK_DIR = os.path.join(os.path.dirname(__file__), "rpk")
# Ensure RPK directory exists
if not os.path.exists(RPK_DIR):
os.makedirs(RPK_DIR)
# Assets directory β€” textures resolved from Material API are cached here so
# PRT can reference them via absolute filesystem paths.
ASSETS_DIR = os.path.join(os.path.dirname(__file__), "assets")
if not os.path.exists(ASSETS_DIR):
os.makedirs(ASSETS_DIR)
# Material API base URL β€” set via MATERIAL_API_URL env var / HF Space secret
MATERIAL_API_BASE = os.environ.get("MATERIAL_API_URL", "").rstrip("/")
# Local Materials directory β€” co-located in the same container/repo.
# Textures are resolved from here first (no HTTP needed), then HTTP fallback.
MATERIAL_API_ROOT = os.path.join(os.path.dirname(__file__), "material_api", "Materials")
class GenerateRequest(BaseModel):
rpk_name: str
geometry: Dict[str, Any] # GeoJSON Feature or Geometry
attributes: Dict[str, Any] = {}
class GenerateI3SRequest(BaseModel):
rpk_name: str
coordinates: List[float] # [lon, lat, alt, lon, lat, alt, ...]
attributes: Dict[str, Any] = {}
def get_rpk_path(filename: str):
path = os.path.join(RPK_DIR, filename)
if not os.path.exists(path):
return None
return path
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _clean_attributes(raw: Dict[str, Any]) -> Dict[str, Any]:
"""
Sanitise attribute values for PyPRT.
PyPRT's CGA engine only accepts ``float | bool | str`` per attribute.
Any value that is ``None``, a complex type (list/dict/set), or otherwise
un-serialisable is dropped so PyPRT falls back to the CGA default value
rather than raising an internal ``AttributeError: 'NoneType' …``.
"""
cleaned: Dict[str, Any] = {}
for k, v in raw.items():
if v is None:
continue # drop – PyPRT will use the CGA rule default
if isinstance(v, bool):
cleaned[k] = bool(v)
elif isinstance(v, float):
cleaned[k] = float(v)
elif isinstance(v, int):
cleaned[k] = float(v) # PyPRT requires float, not int
elif isinstance(v, str):
if v == '':
continue # drop empty strings – PyPRT can't handle them
cleaned[k] = str(v)
else:
# Skip lists, dicts, sets, etc. β€” not valid CGA attribute types
logger.warning(f"Dropping attribute '{k}' with unsupported type {type(v).__name__}")
return cleaned
def _resolve_texture_bytes(rel_path: str) -> Optional[bytes]:
"""
Load the raw bytes for a Material API texture path (``/texture/...``).
Resolution order:
1. Local filesystem: ``MATERIAL_API_ROOT / <path-without-/texture/-prefix>``
Handles .tif/.tiff β†’ .jpg/.png fallback (same logic as Material API).
2. HTTP: ``MATERIAL_API_BASE + rel_path`` (only if MATERIAL_API_BASE is set).
Returns raw bytes on success, or ``None`` if the texture cannot be found.
"""
if not rel_path.startswith("/texture/"):
return None
tex_rel = rel_path[len("/texture/"):] # e.g. "Architectural/Cladding/Aluminium/Textures/file.jpg"
local_path = os.path.join(MATERIAL_API_ROOT, tex_rel.replace("/", os.sep))
# .tif/.tiff β†’ look for browser-compatible sibling (same as Material API tex_url())
_, ext = os.path.splitext(local_path)
if ext.lower() in (".tif", ".tiff"):
for alt_ext in (".jpg", ".jpeg", ".png", ".webp"):
alt = os.path.splitext(local_path)[0] + alt_ext
if os.path.isfile(alt):
local_path = alt
break
else:
local_path = "" # no alternative
if local_path and os.path.isfile(local_path):
try:
with open(local_path, "rb") as f:
data = f.read()
logger.info(f"Texture resolved locally: {local_path}")
return data
except Exception as exc:
logger.warning(f"Local texture read failed ({local_path}): {exc}")
# HTTP fallback β€” used when Material API runs as a separate service
if MATERIAL_API_BASE:
url = f"{MATERIAL_API_BASE}{rel_path}"
try:
import requests as _requests
resp = _requests.get(url, timeout=20)
resp.raise_for_status()
logger.info(f"Texture downloaded via HTTP: {url}")
return resp.content
except Exception as exc:
logger.warning(f"Texture HTTP download failed ({url}): {exc}")
return None
def _inject_textures_into_rpk(rpk_path: str, attrs: Dict[str, Any]) -> tuple:
"""
For every texture attribute (value starts with '/texture/'):
1. Fetch the texture bytes via _resolve_texture_bytes.
2. Write them into a per-request subdirectory of ASSETS_DIR.
3. Replace the attribute value with the absolute filesystem path so PRT
loads it directly.
Supports multiple texture attributes in one call β€” each is written to the
same per-request subdirectory.
Returns ``(rpk_path, modified_attrs, req_assets_dir)``.
``req_assets_dir`` is a subdirectory of ASSETS_DIR that the caller must
delete with shutil.rmtree() after generate_model() completes.
RPK is not modified.
"""
texture_attrs = {
k: v for k, v in attrs.items()
if isinstance(v, str) and v.startswith("/texture/")
}
if not texture_attrs:
return rpk_path, attrs, None
# Isolated per-request subdirectory β€” keeps concurrent requests separate
# and lets us clean up with a single shutil.rmtree() call.
req_id = str(uuid.uuid4())
req_assets_dir = os.path.join(ASSETS_DIR, req_id)
os.makedirs(req_assets_dir, exist_ok=True)
modified = dict(attrs)
for key, rel_path in texture_attrs.items():
tex_data = _resolve_texture_bytes(rel_path)
if tex_data is None:
logger.warning(f"Texture '{key}' not found β€” dropping, CGA will use default")
modified.pop(key, None)
continue
filename = os.path.basename(rel_path)
dest = os.path.join(req_assets_dir, filename)
try:
with open(dest, "wb") as f:
f.write(tex_data)
# PRT's CGA URI resolver supports file:// URIs for external textures.
# Coordinate-based InitialShape has no base URI, so relative paths
# only resolve inside the RPK. file:// is the only reliable external format.
# On Linux: dest="/app/assets/uuid/file.jpg" β†’ "file:///app/assets/uuid/file.jpg"
file_uri = "file://" + dest
modified[key] = file_uri
logger.info(f"Texture '{key}' β†’ {dest} (CGA attr: '{file_uri}')")
except Exception as exc:
logger.warning(f"Texture write failed for '{key}': {exc} β€” dropping")
modified.pop(key, None)
return rpk_path, modified, req_assets_dir
# RPKs that control report emission via boolean attributes β€” always force them ON.
_RPK_REPORT_FORCE_ATTRS: Dict[str, Dict[str, Any]] = {
"BLDG_Units.rpk": {"Reports": True, "report": True},
}
# RPKs that are geometry-only and crash PyEncoder / ignore user attributes.
# For these: skip the report-extraction pass and always generate with {} attrs.
GEOMETRY_ONLY_RPKS: set = {
"translateModel.rpk",
}
def _ensure_report_attrs(rpk_name: str, attrs: Dict[str, Any]) -> Dict[str, Any]:
"""Merge any mandatory report-enabling attributes for the given RPK.
Some RPKs gate their ``report()`` calls behind boolean attributes
(``Reports``, ``report``). This helper ensures those are always
``True`` so the PyEncoder pass reliably returns data.
"""
forced = _RPK_REPORT_FORCE_ATTRS.get(rpk_name, {})
if not forced:
return attrs
merged = dict(attrs) # shallow copy
merged.update(forced) # override / inject
return merged
def _extract_reports(
initial_shape: "pyprt.InitialShape",
clean_attrs: Dict[str, Any],
rpk_path: str,
) -> Dict[str, Any]:
"""
Run a dedicated PyEncoder pass (emitGeometry=False, emitReport=True) to
collect the CGA report dictionary.
Background
----------
``GeneratedModel.get_report()`` is **only** populated when the
``com.esri.pyprt.PyEncoder`` is used. File-based encoders such as
``GLTFEncoder`` and ``I3SEncoder`` write geometry to disk and return an
*empty* list from ``generate_model()``, so iterating over that list
never reaches ``get_report()``.
Reference: https://esri.github.io/pyprt/apidoc/pyprt.pyprt.html
"""
reports: Dict[str, Any] = {}
try:
mg = pyprt.ModelGenerator([initial_shape])
report_models = mg.generate_model(
[clean_attrs],
rpk_path,
"com.esri.pyprt.PyEncoder",
{"emitReport": True, "emitGeometry": False},
)
for m in report_models:
rep = m.get_report()
if rep:
reports.update(rep)
logger.info(f"CGA reports extracted: {list(reports.keys())}")
except Exception as exc:
logger.warning(f"Report extraction failed (non-fatal): {exc}")
return reports
@app.get("/rpks")
async def list_rpks():
"""List available RPK files."""
files = [f for f in os.listdir(RPK_DIR) if f.endswith(".rpk")]
return {"rpks": files}
@app.get("/rpk/{filename}/info")
async def get_rpk_info(filename: str):
"""Get attribute information for a specific RPK."""
rpk_path = get_rpk_path(filename)
if not rpk_path:
raise HTTPException(status_code=404, detail="RPK not found")
try:
# Get RPK attributes info using PyPRT
attrs_info = pyprt.get_rpk_attributes_info(rpk_path)
formatted_attrs = []
# Handle List of Objects return type (standard PyPRT)
if hasattr(attrs_info, '__iter__'):
for attr in attrs_info:
if hasattr(attr, 'get_name'):
name = attr.get_name()
attr_type = str(attr.get_type())
default_val = attr.get_default_value()
annotations = []
if hasattr(attr, 'get_annotations'):
try:
py_annotations = attr.get_annotations()
logger.info(f"Attr {name} annotations: {py_annotations}")
for anno in py_annotations:
key = None
args = []
if hasattr(anno, 'get_key'):
key = anno.get_key()
elif hasattr(anno, 'key'):
key = anno.key
if hasattr(anno, 'get_arguments'):
args = anno.get_arguments()
elif hasattr(anno, 'arguments'):
args = anno.arguments
if key:
annotations.append({"key": key, "arguments": args})
except Exception as e:
logger.error(f"Error fetching annotations for {name}: {e}")
formatted_attrs.append({
"name": name,
"type": attr_type,
"defaultValue": default_val,
"annotations": annotations
})
elif isinstance(attr, str):
# Fallback if list of strings
formatted_attrs.append({
"name": attr,
"type": "string",
"defaultValue": ""
})
return {"attributes": formatted_attrs}
except Exception as e:
logger.error(f"Error inspecting RPK: {e}")
# Return empty attributes instead of 500 if inspection fails
return {"attributes": []}
@app.post("/generate")
async def generate_model(request: GenerateRequest):
"""Generate a 3D model from geometry and RPK."""
rpk_path = get_rpk_path(request.rpk_name)
if not rpk_path:
raise HTTPException(status_code=404, detail="RPK not found")
try:
# Parse Geometry
# Expected input is a GeoJSON Feature or Geometry
geom_dict = request.geometry
if geom_dict.get("type") == "Feature":
geom_dict = geom_dict.get("geometry")
shape = shapely.geometry.shape(geom_dict)
# Create Initial Shape for PyPRT
# PyPRT expects a list of InitialShapes.
# For a polygon, we pass definitions.
# Helper to convert Shapely polygon to PyPRT InitialShape
if geom_dict.get("type") != "Polygon":
raise HTTPException(status_code=400, detail="Only Polygons are supported")
# 1. Validate geometry (repair if needed)
if not shape.is_valid:
shape = shape.buffer(0)
# NOTE: Do NOT call shapely.ops.orient() here.
# The frontend guarantees CCW winding via ensureCCW() (Shoelace formula).
# Previously orient(sign=-1.0) forced CW, which caused mirroring when
# the frontend was already sending CW or CCW depending on draw direction.
# 2. Re-center Geometry
# PyPRT generates at (0,0,0). We need to shift the polygon so its centroid is at (0,0).
centroid = shape.centroid
logger.info(f"Geometry Centroid: {centroid.x}, {centroid.y}")
# Translate shape so centroid is at (0,0)
shape_centered = translate(shape, xoff=-centroid.x, yoff=-centroid.y)
coords = list(shape_centered.exterior.coords)
# Remove last point if duplicate (closed loop)
if coords[0] == coords[-1]:
coords = coords[:-1]
# 3. Flatten coordinates for PyPRT InitialShape
# Frontend ENU convention: x = east, y = north
# PyPRT/CGA convention: X = east, Y = height (up), Z = south (depth, negated)
# CGA Z increases southward, but ENU Y increases northward β€” negate to align.
# Mapping: [enu_x, 0, -enu_y] β†’ [CGA_X, CGA_Y=0, CGA_Z]
flattened_coords = []
for p in coords:
flattened_coords.extend([p[0], 0, -p[1]])
indices = list(range(len(coords)))
face_counts = [len(coords)]
# Sanitise attributes
clean_attributes = _clean_attributes(request.attributes)
# Inject any Material-API textures into a temp RPK copy so CGA resolves them
rpk_path, clean_attributes, tex_temp_dir = _inject_textures_into_rpk(rpk_path, clean_attributes)
logger.info(f"Generating with RPK: {rpk_path}")
logger.info(f"Clean attributes: {clean_attributes}")
initial_shape = pyprt.InitialShape(flattened_coords, indices, face_counts)
# Setup Model Generator
export_options = {
"outputPath": tempfile.mkdtemp(),
"outputFilename": "model",
"emitReport": True,
"emitGeometry": True
}
is_geometry_only = request.rpk_name in GEOMETRY_ONLY_RPKS
try:
if is_geometry_only:
# Geometry-only RPKs (e.g. translateModel) crash PyEncoder.
# Skip report extraction entirely and always use empty attrs.
reports_dict = {}
model_generator = pyprt.ModelGenerator([initial_shape])
model_generator.generate_model(
[{}],
rpk_path,
"com.esri.prt.codecs.GLTFEncoder",
export_options,
)
else:
# Pass 1 – extract CGA reports via PyEncoder
report_attrs = _ensure_report_attrs(request.rpk_name, clean_attributes)
reports_dict = _extract_reports(initial_shape, report_attrs, rpk_path)
# Pass 2 – generate the actual GLB geometry
model_generator = pyprt.ModelGenerator([initial_shape])
models = model_generator.generate_model(
[clean_attributes],
rpk_path,
"com.esri.prt.codecs.GLTFEncoder",
export_options,
)
except Exception as e:
# Any PyPRT error (AttributeError, TypeError, RuntimeError…)
# Retry with empty attrs so the CGA rule uses its defaults.
logger.warning(
f"PyPRT error with supplied attrs ({type(e).__name__}: {e}). "
"Retrying with CGA defaults (empty attrs)."
)
try:
reports_dict = {} if is_geometry_only else _extract_reports(
initial_shape,
_ensure_report_attrs(request.rpk_name, {}),
rpk_path,
)
model_generator = pyprt.ModelGenerator([initial_shape])
model_generator.generate_model(
[{}],
rpk_path,
"com.esri.prt.codecs.GLTFEncoder",
export_options,
)
except Exception as e2:
logger.error(f"PyPRT Generation Error (retry failed): {e2}")
import traceback; traceback.print_exc()
raise HTTPException(status_code=500, detail=f"PyPRT Generation Failed: {str(e2)}")
output_path = export_options["outputPath"]
# PyPRT GLTFEncoder usually spits out .glb by default, or .gltf
generated_files = glob.glob(os.path.join(output_path, "*.glb"))
if not generated_files:
generated_files = glob.glob(os.path.join(output_path, "*.gltf"))
if generated_files:
glb_path = generated_files[0]
logger.info(f"Found GLTF/GLB at {glb_path}")
# Return JSONResponse where we provide the file download link and the reports
# But the requirement from frontend is typically:
# - if we want URL and reports together, we need a JSON response
# - we can statically serve the generated file if it's placed in a static dir.
# Right now, FileResponse is returned directly.
uuid_folder = str(uuid.uuid4())
serve_dir = os.path.join(LAYERS_DIR, uuid_folder)
os.makedirs(serve_dir, exist_ok=True)
final_glb_path = os.path.join(serve_dir, "model.glb")
shutil.copy(glb_path, final_glb_path)
# Cleanup downloaded textures now that generation is done
if tex_temp_dir and os.path.exists(tex_temp_dir):
shutil.rmtree(tex_temp_dir, ignore_errors=True)
return JSONResponse(content={
"url": f"/layers/{uuid_folder}/model.glb",
"reports": reports_dict,
"message": "GLB Generated Natively"
})
else:
logger.error(f"No GLB/GLTF file found in {output_path}")
raise HTTPException(status_code=500, detail="PyPRT Generation failed: No GLB file created")
except Exception as e:
# Cleanup downloaded textures on error path too
if 'tex_temp_dir' in locals() and tex_temp_dir and os.path.exists(tex_temp_dir):
shutil.rmtree(tex_temp_dir, ignore_errors=True)
logger.error(f"Generation error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/generate_i3s")
async def generate_i3s(request: GenerateRequest):
"""Generate an I3S Layer (SLPK unpacked) using same logic as GLB generation."""
# 0. Cleanup Old Layers
try:
if os.path.exists(LAYERS_DIR):
for item in os.listdir(LAYERS_DIR):
item_path = os.path.join(LAYERS_DIR, item)
if os.path.isdir(item_path):
shutil.rmtree(item_path)
except Exception as e:
logger.warning(f"Cleanup failed: {e}")
rpk_path = get_rpk_path(request.rpk_name)
if not rpk_path:
raise HTTPException(status_code=404, detail="RPK not found")
try:
# 1. Parse Geometry β€” extract ECEF center before stripping the Feature wrapper.
# geometry.properties.center = {x,y,z} ECEF Cartesian3 set by DrawTools.tsx.
geom_dict = request.geometry
ecef_center = None
if geom_dict.get("type") == "Feature":
props = geom_dict.get("properties") or {}
c = props.get("center") or {}
if c and "x" in c and "y" in c and "z" in c:
ecef_center = (float(c["x"]), float(c["y"]), float(c["z"]))
geom_dict = geom_dict.get("geometry")
shape = shapely.geometry.shape(geom_dict)
if geom_dict.get("type") != "Polygon":
raise HTTPException(status_code=400, detail="Only Polygons are supported")
if not shape.is_valid:
shape = shape.buffer(0)
lon, lat = None, None
if ecef_center:
ecef_to_wgs84 = pyproj.Transformer.from_crs("EPSG:4978", "EPSG:4326", always_xy=True)
lon, lat, _ = ecef_to_wgs84.transform(ecef_center[0], ecef_center[1], ecef_center[2])
logger.info(f"I3S centroid: lon={lon:.6f} lat={lat:.6f}")
else:
logger.warning("No ECEF center in geometry properties β€” I3S placement will be incorrect")
# 2. Re-center geometry identical to /generate (ENU local metres, centroid at origin).
# The I3S encoder places the model geographically via globalOffset (ECEF center).
centroid = shape.centroid
shape_centered = translate(shape, xoff=-centroid.x, yoff=-centroid.y)
coords = list(shape_centered.exterior.coords)
if coords[0] == coords[-1]:
coords = coords[:-1]
# CGA convention: X=east, Y=up(0 for footprint), Z=south (-north)
flattened_coords = []
for p in coords:
flattened_coords.extend([p[0], 0, -p[1]])
indices = list(range(len(coords)))
face_counts = [len(coords)]
# Sanitise attributes
clean_attributes = _clean_attributes(request.attributes)
rpk_path, clean_attributes, tex_temp_dir = _inject_textures_into_rpk(rpk_path, clean_attributes)
initial_shape = pyprt.InitialShape(flattened_coords, indices, face_counts)
# 3. Encoder Options β€” Global scene, WGS84 (EPSG:4326).
# globalOffset [lon, lat, 0] anchors the locally-generated model geographically.
layer_id = str(uuid.uuid4())
output_dir = os.path.join(LAYERS_DIR, layer_id)
os.makedirs(output_dir, exist_ok=True)
enc_options = {
'sceneType': 'Global',
'sceneWkid': '4326',
'baseName': 'SceneLayer',
'sceneName': 'SceneLayer',
'writePackage': False, # write unpacked folder structure, not .slpk ZIP
'outputPath': output_dir,
}
if lon is not None and lat is not None:
enc_options['globalOffset'] = [lon, lat, 0.0]
logger.info(f"Generating I3S to {output_dir}")
# Pass 1 – extract CGA reports via PyEncoder
# (I3SEncoder returns an empty list, so get_report() never fires)
report_attrs = _ensure_report_attrs(request.rpk_name, clean_attributes)
reports_dict = _extract_reports(initial_shape, report_attrs, rpk_path)
# Pass 2 – generate the actual I3S scene layer
model_generator = pyprt.ModelGenerator([initial_shape])
models = model_generator.generate_model(
[clean_attributes],
rpk_path,
'com.esri.prt.codecs.I3SEncoder',
enc_options,
)
# (models is empty for file encoders β€” that is expected)
# Verify output
# The encoder usually creates a subfolder based on sceneName or baseName, or just dumps in outputPath?
# With layerType="Path", it usually creates a folder structure like:
# output_dir/SceneLayer.slpk (if file) OR output_dir/nodepages/... (if Path)
# Let's check what's inside output_dir
# Usually for 'Path' it produces 'scenelayer.json' directly in outputPath?
# Or inside a subdirectory 'baseName'?
# We'll return the URL.
# Construction: /layers/{layer_id}/...
# We might need to find the .json file.
# Search for the I3S entry point (3dSceneLayer.json)
json_file_path = None
debug_files = []
for root, dirs, files in os.walk(output_dir):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), output_dir)
debug_files.append(rel_path)
if file == "3dSceneLayer.json":
# Found it!
# Construct URL path relative to /layers mount
# e.g. root is .../static/layers/<uuid>/SceneLayer
# we need /layers/<uuid>/SceneLayer/3dSceneLayer.json
# rel_path_from_layers_dir
rel_from_layers = os.path.relpath(os.path.join(root, file), LAYERS_DIR)
# replace backslashes if on windows (though usually linux on HF)
rel_from_layers = rel_from_layers.replace("\\", "/")
json_file_path = f"/layers/{rel_from_layers}"
if not json_file_path:
# writePackage:False may be unsupported in this PyPRT build β€” the
# encoder falls back to writing a .slpk ZIP. Extract it so we can
# serve the individual files via StaticFiles.
slpk_files = glob.glob(os.path.join(output_dir, "*.slpk"))
if slpk_files:
extract_dir = os.path.join(output_dir, "extracted")
os.makedirs(extract_dir, exist_ok=True)
logger.info(f"Extracting SLPK: {slpk_files[0]} β†’ {extract_dir}")
with zipfile.ZipFile(slpk_files[0], "r") as zf:
zf.extractall(extract_dir)
debug_files.clear()
for root, dirs, files in os.walk(extract_dir):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), extract_dir)
debug_files.append("extracted/" + rel_path.replace("\\", "/"))
if file == "3dSceneLayer.json":
rel_from_layers = os.path.relpath(
os.path.join(root, file), LAYERS_DIR
).replace("\\", "/")
json_file_path = f"/layers/{rel_from_layers}"
if not json_file_path:
logger.error(f"Could not find 3dSceneLayer.json in {output_dir}")
raise HTTPException(status_code=500, detail="I3S generation produced no 3dSceneLayer.json")
# Cleanup downloaded textures
if tex_temp_dir and os.path.exists(tex_temp_dir):
shutil.rmtree(tex_temp_dir, ignore_errors=True)
return {
"layerUrl": json_file_path,
"layerId": layer_id,
"reports": reports_dict,
"message": "I3S Layer Generated",
"debug_files": debug_files[:20] # Return first 20 files for debugging
}
except Exception as e:
if 'tex_temp_dir' in locals() and tex_temp_dir and os.path.exists(tex_temp_dir):
shutil.rmtree(tex_temp_dir, ignore_errors=True)
logger.error(f"I3S Generation error: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.middleware("http")
async def i3s_smart_middleware(request: Request, call_next):
# Intercept requests to /layers/
if request.url.path.startswith("/layers"):
path = request.url.path
# Determine local path
# URL: /layers/<uuid>/...
# Local: .../static/layers/<uuid>/...
rel_path = path[len("/layers"):]
if rel_path.startswith("/"): rel_path = rel_path[1:]
local_path = os.path.join(LAYERS_DIR, rel_path.replace("/", os.sep))
# Logic to map REST-style I3S requests to File System
target_file = None
# 1. Check if exact file exists
if os.path.isfile(local_path):
pass # Let static handler take it
else:
# 2. Handle I3S Conventions
# Case A: Layer Root (e.g. .../layers/0r or .../layers/0r/)
# If directory exists, check for 3dSceneLayer.json
if os.path.isdir(local_path):
possible_json = os.path.join(local_path, "3dSceneLayer.json")
if os.path.isfile(possible_json):
target_file = possible_json
# Case B: Nodes (e.g. .../nodes/root or .../nodes/15)
# Expects: .../nodes/root/3dNodeIndexDocument.json
elif "/nodes/" in path:
# Handle sub-resources of nodes
clean_path = local_path.rstrip(os.sep)
if "/geometries/" in path:
# .../geometries/0 -> .../geometries/0.bin
possible_bin = clean_path + ".bin"
if os.path.isfile(possible_bin):
target_file = possible_bin
media_type="application/octet-stream"
elif "/features/" in path:
# .../features/0 -> .../features/0.json
possible_json = clean_path + ".json"
if os.path.isfile(possible_json):
target_file = possible_json
elif "/textures/" in path:
# Textures are tricky, often .jpg or .bin.dds
# Just try appending extensions
for ext in [".jpg", ".png", ".bin.dds", ".dds"]:
possible_tex = clean_path + ext
if os.path.isfile(possible_tex):
target_file = possible_tex
break
else:
# It is a Node itself (e.g. .../nodes/1)
# If directory, look for 3dNodeIndexDocument.json
if os.path.isdir(clean_path):
possible_doc = os.path.join(clean_path, "3dNodeIndexDocument.json")
if os.path.isfile(possible_doc):
target_file = possible_doc
# Case C: NodePages (e.g. .../nodepages/0 or .../nodepages/0/)
# Expects: .../nodepages/0.json
elif "/nodepages/" in path:
# Strip trailing slash if present to cleanly append .json
clean_local_path = local_path.rstrip(os.sep)
# Check if adding .json helps
possible_json_page = clean_local_path + ".json"
if os.path.isfile(possible_json_page):
target_file = possible_json_page
if target_file:
logger.info(f"Serving I3S Resource: {path} -> {target_file}")
return FileResponse(target_file, media_type="application/json")
response = await call_next(request)
return response
@app.get("/")
async def root():
return {"message": "CityPyPRT 3D Generation API"}
# ---------------------------------------------------------------------------
# Dedicated report endpoint
# ---------------------------------------------------------------------------
@app.post("/report")
async def get_model_report(request: GenerateRequest):
"""
Return **only** the CGA report dict for a given geometry + RPK, without
writing any geometry files to disk.
This uses ``com.esri.pyprt.PyEncoder`` with ``emitReport=True`` and
``emitGeometry=False`` β€” the only encoder that populates
``GeneratedModel.get_report()``.
Request body (same as /generate)
---------------------------------
.. code-block:: json
{
"rpk_name": "Building.rpk",
"geometry": { "type": "Polygon", "coordinates": [...] },
"attributes": { "buildingHeight": 30.0 }
}
Response
--------
.. code-block:: json
{
"report": { "Ground Floor Area": 250.0, "Building Volume": 3200.0 },
"rpk_name": "Building.rpk"
}
"""
rpk_path = get_rpk_path(request.rpk_name)
if not rpk_path:
raise HTTPException(status_code=404, detail=f"RPK '{request.rpk_name}' not found")
try:
# --- Parse geometry (same logic as /generate) ---
geom_dict = request.geometry
if geom_dict.get("type") == "Feature":
geom_dict = geom_dict.get("geometry")
if geom_dict.get("type") != "Polygon":
raise HTTPException(status_code=400, detail="Only Polygon geometries are supported")
shape = shapely.geometry.shape(geom_dict)
if not shape.is_valid:
shape = shape.buffer(0)
shape = shapely.ops.orient(shape, sign=-1.0) # CW, consistent with /generate
centroid = shape.centroid
shape_centered = translate(shape, xoff=-centroid.x, yoff=-centroid.y)
coords = list(shape_centered.exterior.coords)
if coords[0] == coords[-1]:
coords = coords[:-1]
# Same CGA Z-axis negation as /generate: ENU Y = north, CGA Z = south
flattened_coords = []
for p in coords:
flattened_coords.extend([p[0], 0, -p[1]])
indices = list(range(len(coords)))
face_counts = [len(coords)]
# --- Sanitise attributes ---
clean_attributes = _clean_attributes(request.attributes)
# Inject any Material-API textures into a temp RPK copy so CGA resolves them
rpk_path, clean_attributes, tex_temp_dir = _inject_textures_into_rpk(rpk_path, clean_attributes)
initial_shape = pyprt.InitialShape(flattened_coords, indices, face_counts)
# --- Run PyEncoder report pass ---
try:
report_attrs = _ensure_report_attrs(request.rpk_name, clean_attributes)
reports_dict = _extract_reports(initial_shape, report_attrs, rpk_path)
except (AttributeError, TypeError) as e:
logger.warning(
f"Report endpoint: attribute error with supplied attrs ({e}). "
"Retrying with CGA defaults."
)
reports_dict = _extract_reports(initial_shape, _ensure_report_attrs(request.rpk_name, {}), rpk_path)
if tex_temp_dir and os.path.exists(tex_temp_dir):
shutil.rmtree(tex_temp_dir, ignore_errors=True)
return JSONResponse(content={
"report": reports_dict,
"rpk_name": request.rpk_name,
})
except HTTPException:
raise
except Exception as exc:
if 'tex_temp_dir' in locals() and tex_temp_dir and os.path.exists(tex_temp_dir):
shutil.rmtree(tex_temp_dir, ignore_errors=True)
logger.error(f"Report endpoint error: {exc}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(exc))