Prior2DSM / src /streamlit_app.py
osherr's picture
Update src/streamlit_app.py
c6dd5dc verified
import os
import gc
import copy
from io import BytesIO
import cv2
import numpy as np
import rasterio
import matplotlib.pyplot as plt
import streamlit as st
import torch
import torch.nn as nn
import torch.nn.functional as F
from huggingface_hub import hf_hub_download
from torchvision.transforms.functional import normalize
# ============================================================
# CONFIG
# ============================================================
st.set_page_config(layout="wide", page_title="Prior2DSM | LoRA")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cudnn.benchmark = True
PATCH_SIZE = 16
STRIDE = 4
# Keep LoRA normalization from your local code
IMAGENET_MEAN = (0.430, 0.411, 0.296)
IMAGENET_STD = (0.213, 0.156, 0.143)
# Example files inside the HF Space repo
EXAMPLE_RGB_FILENAME = "examples/example_rgb.tif"
EXAMPLE_PRIOR_FILENAME = "examples/example_prior.tif"
# ============================================================
# HELPERS
# ============================================================
def normalize_01(arr, valid_mask=None):
a = np.asarray(arr, dtype=np.float32)
if valid_mask is None:
valid_mask = np.isfinite(a)
else:
valid_mask = np.asarray(valid_mask, dtype=bool) & np.isfinite(a)
out = np.zeros_like(a, dtype=np.float32)
if not valid_mask.any():
return out
vmin = float(np.nanmin(a[valid_mask]))
vmax = float(np.nanmax(a[valid_mask]))
denom = max(1e-8, (vmax - vmin))
out[valid_mask] = (a[valid_mask] - vmin) / denom
return np.clip(out, 0.0, 1.0)
def preview_rgb(rgb_raw):
rgb = rgb_raw.transpose(1, 2, 0).astype(np.float32)
if rgb.max() > 1.5:
rgb = rgb / (np.percentile(rgb, 98) + 1e-6)
return np.clip(rgb, 0, 1)
def draw_roi_preview(viz_rgb, x1, y1, x2, y2):
preview = (np.clip(viz_rgb, 0, 1) * 255).astype(np.uint8).copy()
cv2.rectangle(preview, (x1, y1), (x2, y2), (255, 0, 0), 2)
return preview
@st.cache_data(show_spinner=False)
def load_tiff_from_hf(repo_id, filename, repo_type="space"):
return hf_hub_download(repo_id=repo_id, filename=filename, repo_type=repo_type)
def read_rgb_tiff(path_or_bytes):
if isinstance(path_or_bytes, (str, os.PathLike)):
with rasterio.open(path_or_bytes) as src:
rgb_raw = src.read([1, 2, 3])
h_f, w_f = src.height, src.width
meta = src.meta.copy()
else:
with rasterio.open(BytesIO(path_or_bytes)) as src:
rgb_raw = src.read([1, 2, 3])
h_f, w_f = src.height, src.width
meta = src.meta.copy()
return rgb_raw, h_f, w_f, meta
def read_prior_tiff(path_or_bytes):
if isinstance(path_or_bytes, (str, os.PathLike)):
with rasterio.open(path_or_bytes) as src:
prior_raw = src.read(1).astype(np.float32)
meta = src.meta.copy()
else:
with rasterio.open(BytesIO(path_or_bytes)) as src:
prior_raw = src.read(1).astype(np.float32)
meta = src.meta.copy()
return prior_raw, meta
def init_roi_state(h_f, w_f):
if "x_center" not in st.session_state:
st.session_state["x_center"] = w_f // 2
if "y_center" not in st.session_state:
st.session_state["y_center"] = h_f // 2
if "bbox_size" not in st.session_state:
st.session_state["bbox_size"] = min(200, min(h_f, w_f))
if "use_normalized_rel" not in st.session_state:
st.session_state["use_normalized_rel"] = True
if "loaded_shape" not in st.session_state:
st.session_state["loaded_shape"] = (h_f, w_f)
prev_shape = st.session_state.get("loaded_shape", None)
if prev_shape != (h_f, w_f):
st.session_state["x_center"] = w_f // 2
st.session_state["y_center"] = h_f // 2
st.session_state["bbox_size"] = min(200, min(h_f, w_f))
st.session_state["use_normalized_rel"] = True
st.session_state["loaded_shape"] = (h_f, w_f)
# ============================================================
# MODELS
# ============================================================
class MLPDecoder(nn.Module):
def __init__(self, in_dim=1024):
super().__init__()
self.net = nn.Sequential(
nn.Linear(in_dim, 256),
nn.LayerNorm(256),
nn.GELU(),
nn.Linear(256, 128),
nn.GELU(),
nn.Linear(128, 2) # [scale, bias]
)
nn.init.zeros_(self.net[-1].weight)
self.net[-1].bias.data = torch.tensor([1.0, 0.0])
def forward(self, x):
return self.net(x)
class LoRALinear(nn.Module):
def __init__(self, base_linear, r=8, alpha=16.0):
super().__init__()
self.base = base_linear
# freeze original linear
self.base.weight.requires_grad_(False)
if getattr(self.base, "bias", None) is not None:
self.base.bias.requires_grad_(False)
self.r = r
self.alpha = alpha
self.scaling = alpha / r if r > 0 else 1.0
self.A = nn.Linear(base_linear.in_features, r, bias=False)
self.B = nn.Linear(r, base_linear.out_features, bias=False)
nn.init.kaiming_uniform_(self.A.weight, a=np.sqrt(5))
nn.init.zeros_(self.B.weight)
@property
def in_features(self):
return self.base.in_features
@property
def out_features(self):
return self.base.out_features
@property
def weight(self):
return self.base.weight
@property
def bias(self):
return self.base.bias
def forward(self, x):
return self.base(x) + self.scaling * self.B(self.A(x))
def inject_lora(model, r=8, alpha=16.0):
for blk in model.modules():
if hasattr(blk, "attn"):
if hasattr(blk.attn, "qkv") and not isinstance(blk.attn.qkv, LoRALinear):
blk.attn.qkv = LoRALinear(blk.attn.qkv, r, alpha)
if hasattr(blk.attn, "proj") and not isinstance(blk.attn.proj, LoRALinear):
blk.attn.proj = LoRALinear(blk.attn.proj, r, alpha)
return model
def get_lora_params(model):
params = []
for module in model.modules():
if isinstance(module, LoRALinear):
params.extend(list(module.A.parameters()))
params.extend(list(module.B.parameters()))
return params
# ============================================================
# MODEL LOADING
# ============================================================
@st.cache_resource
def load_models(repo_id, dav_file, dino_file):
# 1. Load Depth Anything V2 exactly like old app
dav_path = hf_hub_download(repo_id=repo_id, filename=dav_file)
from depth_anything_v2.dpt import DepthAnythingV2
dav_model = DepthAnythingV2(
encoder="vitl",
features=256,
out_channels=[256, 512, 1024, 1024]
)
dav_model.load_state_dict(torch.load(dav_path, map_location="cpu", weights_only=True))
dav_model = dav_model.to(DEVICE).eval()
# 2. Patch PyTorch config for DINOv3 exactly like old app
if hasattr(torch, "_dynamo") and hasattr(torch._dynamo, "config"):
orig_config = torch._dynamo.config
class ConfigWrapper:
def __getattr__(self, name):
return getattr(orig_config, name)
def __setattr__(self, name, value):
if name == "accumulated_cache_size_limit":
return
setattr(orig_config, name, value)
torch._dynamo.config = ConfigWrapper()
# 3. Load DINOv3 exactly like old app
dino_path = hf_hub_download(repo_id=repo_id, filename=dino_file)
from dinov3.models.vision_transformer import DinoVisionTransformer
dino_model = DinoVisionTransformer(
img_size=1024,
patch_size=16,
embed_dim=1024,
depth=24,
num_heads=16,
qkv_bias=True
).to(DEVICE).eval()
ckpt = torch.load(dino_path, map_location="cpu")
if "state_dict" in ckpt:
ckpt = ckpt["state_dict"]
clean_ckpt = {
k.replace("module.", "").replace("backbone.", "").replace("teacher.backbone.", ""): v
for k, v in ckpt.items()
}
dino_model.load_state_dict(clean_ckpt, strict=False)
return dav_model, dino_model
# ============================================================
# DEPTH ANYTHING INFERENCE
# ============================================================
@st.cache_data(show_spinner=False)
def run_dav_inference(_dav, rgb_raw, h_f, w_f):
img_448 = cv2.resize(rgb_raw.transpose(1, 2, 0), (448, 448))
dav_in = torch.tensor(img_448, device=DEVICE).permute(2, 0, 1).unsqueeze(0).float() / 255.0
with torch.no_grad():
raw_depth = _dav(dav_in)
if isinstance(raw_depth, (list, tuple)):
raw_depth = raw_depth[-1]
raw_depth = F.interpolate(
raw_depth.unsqueeze(1),
size=(h_f, w_f),
mode="bilinear",
align_corners=False
).squeeze(1)
raw_depth_map = raw_depth[0].detach().float().cpu().numpy()
valid = np.isfinite(raw_depth_map)
raw_depth_01 = normalize_01(raw_depth_map, valid)
raw_depth_01[~valid] = np.nan
return raw_depth_map, raw_depth_01
# ============================================================
# MAIN LORA PIPELINE
# ============================================================
def run_lora_pipeline(
rgb_raw,
prior_raw,
rel_map,
bbox_mask,
dino_base,
lora_r,
lora_alpha,
tto_steps,
tto_lr
):
rgb_cpu = torch.tensor(rgb_raw.astype(np.float32) / 255.0)
prior_raw_t = torch.tensor(prior_raw.astype(np.float32))
rel_cpu = torch.tensor(rel_map.astype(np.float32), device=DEVICE)
H, W = prior_raw.shape
# anchors = outside bbox and valid prior
anchor_mask_cpu = (~torch.tensor(bbox_mask)) & torch.isfinite(prior_raw_t) & (prior_raw_t != 0)
anchor_mask = anchor_mask_cpu.to(DEVICE)
prior_gpu = prior_raw_t.to(DEVICE)
dino = copy.deepcopy(dino_base)
dino = inject_lora(dino, r=lora_r, alpha=lora_alpha).to(DEVICE).train()
mlp_head = MLPDecoder(in_dim=1024).to(DEVICE).train()
for p in dino.parameters():
p.requires_grad_(False)
for p in get_lora_params(dino):
p.requires_grad_(True)
for p in mlp_head.parameters():
p.requires_grad_(True)
params = list(mlp_head.parameters()) + get_lora_params(dino)
opt = torch.optim.AdamW(params, lr=tto_lr)
rgb_tto = normalize(rgb_cpu.unsqueeze(0), IMAGENET_MEAN, IMAGENET_STD).to(DEVICE)
Hp, Wp = H // PATCH_SIZE, W // PATCH_SIZE
prior_p = F.interpolate(prior_gpu.view(1, 1, H, W), size=(Hp, Wp), mode="bilinear").flatten()
rel_p = F.interpolate(rel_cpu.view(1, 1, H, W), size=(Hp, Wp), mode="bilinear").flatten()
mask_p = F.interpolate(anchor_mask.float().view(1, 1, H, W), size=(Hp, Wp), mode="area").flatten() > 0.5
loss_hist = []
prog = st.progress(0, text="Running LoRA TTO...")
for step in range(tto_steps):
opt.zero_grad(set_to_none=True)
tokens = dino.forward_features(rgb_tto)["x_norm_patchtokens"].squeeze(0)
sb = mlp_head(tokens)
s, b = sb[:, 0], sb[:, 1]
pred_p = s * rel_p + b
loss = F.huber_loss(pred_p[mask_p], prior_p[mask_p], delta=1.0)
loss.backward()
opt.step()
loss_hist.append(float(loss.item()))
prog.progress((step + 1) / tto_steps, text=f"Running LoRA TTO... {step + 1}/{tto_steps}")
prog.empty()
dino.eval()
mlp_head.eval()
with torch.no_grad():
p, stride = PATCH_SIZE, STRIDE
rgb_pad = F.pad(rgb_cpu.unsqueeze(0), (p, p, p, p), mode="reflect")
Hp_pad, Wp_pad = rgb_pad.shape[-2:]
sb_acc = torch.zeros((2, Hp_pad // stride, Wp_pad // stride), device=DEVICE)
cnt_acc = torch.zeros((1, Hp_pad // stride, Wp_pad // stride), device=DEVICE)
rgb_norm = normalize(rgb_pad, IMAGENET_MEAN, IMAGENET_STD).to(DEVICE)
for dy in range(0, p, stride):
for dx in range(0, p, stride):
hc = ((Hp_pad - dy) // p) * p
wc = ((Wp_pad - dx) // p) * p
if hc <= 0 or wc <= 0:
continue
patch = rgb_norm[:, :, dy:dy + hc, dx:dx + wc]
t = dino.forward_features(patch)["x_norm_patchtokens"].squeeze(0)
sb_local = mlp_head(t).t().reshape(2, hc // p, wc // p)
sb_acc[:, dy // stride:dy // stride + (hc // p) * (p // stride):p // stride,
dx // stride:dx // stride + (wc // p) * (p // stride):p // stride] += sb_local
cnt_acc[:, dy // stride:dy // stride + (hc // p) * (p // stride):p // stride,
dx // stride:dx // stride + (wc // p) * (p // stride):p // stride] += 1
sb_dense = sb_acc / (cnt_acc + 1e-8)
offset = (p - (p // 2)) // stride + 1
sb_final = sb_dense[:, offset:offset + (H // stride), offset:offset + (W // stride)]
sb_hr = F.interpolate(
sb_final.unsqueeze(0),
size=(H, W),
mode="bilinear",
align_corners=False
).squeeze(0)
s_hr, b_hr = sb_hr[0], sb_hr[1]
final_dsm = (s_hr * rel_cpu + b_hr).detach().cpu().numpy()
del dino, mlp_head, opt
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
return final_dsm, loss_hist, anchor_mask_cpu.cpu().numpy()
# ============================================================
# UI
# ============================================================
st.title("Prior2DSM | LoRA")
st.markdown(
f"""
**Example TIFFs**
- [Download example RGB TIFF](https://huggingface.co/spaces/osherr/Prior2DSM/resolve/main/{EXAMPLE_RGB_FILENAME})
- [Download example Prior TIFF](https://huggingface.co/spaces/osherr/Prior2DSM/resolve/main/{EXAMPLE_PRIOR_FILENAME})
"""
)
with st.sidebar:
st.header("📂 Data")
data_mode = st.radio(
"Data source",
["Upload TIFFs", "Use example TIFFs"],
index=0
)
rgb_file = None
prior_file = None
rgb_example_path = None
prior_example_path = None
if data_mode == "Upload TIFFs":
rgb_file = st.file_uploader("RGB Image", type=["tif", "tiff"])
prior_file = st.file_uploader("LiDAR Prior", type=["tif", "tiff"])
else:
st.caption("Load demo RGB/Prior TIFFs from the Hugging Face Space.")
if st.button("Load example TIFFs"):
st.session_state["use_examples"] = True
if st.session_state.get("use_examples", False):
rgb_example_path = load_tiff_from_hf(
repo_id="osherr/Prior2DSM",
filename=EXAMPLE_RGB_FILENAME,
repo_type="space"
)
prior_example_path = load_tiff_from_hf(
repo_id="osherr/Prior2DSM",
filename=EXAMPLE_PRIOR_FILENAME,
repo_type="space"
)
st.success("Example TIFFs loaded.")
st.divider()
st.write("#### LoRA / TTO")
lora_r = st.slider("LoRA rank", 2, 32, 8, step=2)
lora_alpha = st.slider("LoRA alpha", 4.0, 64.0, 16.0, step=4.0)
tto_steps = st.slider("TTO steps", 10, 300, 100, step=10)
tto_lr = st.select_slider("TTO LR", options=[1e-4, 3e-4, 1e-3, 3e-3], value=1e-3)
has_uploaded = (rgb_file is not None and prior_file is not None)
has_examples = (
data_mode == "Use example TIFFs"
and st.session_state.get("use_examples", False)
and rgb_example_path is not None
and prior_example_path is not None
)
if has_uploaded or has_examples:
dav_m, dino_base = load_models(
repo_id="osherr/Prior2DSM",
dav_file="depth_anything_v2_vitl.pth",
dino_file="dinov3_vitl16_pretrain_sat493m-eadcf0ff.pth"
)
if has_uploaded:
rgb_raw, h_f, w_f, _ = read_rgb_tiff(rgb_file.read())
prior_raw, prior_meta = read_prior_tiff(prior_file.read())
else:
rgb_raw, h_f, w_f, _ = read_rgb_tiff(rgb_example_path)
prior_raw, prior_meta = read_prior_tiff(prior_example_path)
init_roi_state(h_f, w_f)
with st.spinner("Generating relative depth with Depth Anything V2..."):
rel_depth_map, rel_depth_01 = run_dav_inference(dav_m, rgb_raw, h_f, w_f)
st.subheader("1. ROI Selection")
viz_rgb = preview_rgb(rgb_raw)
col_img, col_ctrl = st.columns([1.2, 0.8])
with col_ctrl:
with st.form("roi_form", clear_on_submit=False):
x_center_form = st.slider(
"X center",
0, w_f - 1,
int(st.session_state["x_center"])
)
y_center_form = st.slider(
"Y center",
0, h_f - 1,
int(st.session_state["y_center"])
)
bbox_size_form = st.slider(
"BBox Size (px)",
50, min(400, min(h_f, w_f)),
int(st.session_state["bbox_size"])
)
use_normalized_rel_form = st.checkbox(
"Use normalized relative depth for LoRA",
value=bool(st.session_state["use_normalized_rel"])
)
c1, c2 = st.columns(2)
with c1:
update_roi = st.form_submit_button("Update ROI")
with c2:
run_btn = st.form_submit_button("🚀 Run LoRA Pipeline", type="primary")
if update_roi or run_btn:
st.session_state["x_center"] = x_center_form
st.session_state["y_center"] = y_center_form
st.session_state["bbox_size"] = bbox_size_form
st.session_state["use_normalized_rel"] = use_normalized_rel_form
x_center = int(st.session_state["x_center"])
y_center = int(st.session_state["y_center"])
bbox_size = int(st.session_state["bbox_size"])
use_normalized_rel = bool(st.session_state["use_normalized_rel"])
half_s = bbox_size // 2
x1, x2 = max(0, x_center - half_s), min(w_f, x_center + half_s)
y1, y2 = max(0, y_center - half_s), min(h_f, y_center + half_s)
bbox_mask = np.zeros((h_f, w_f), dtype=bool)
bbox_mask[y1:y2, x1:x2] = True
with col_img:
roi_preview = draw_roi_preview(viz_rgb, x1, y1, x2, y2)
st.image(roi_preview, caption="ROI Preview", use_container_width=True)
if run_btn:
rel_for_lora = rel_depth_01 if use_normalized_rel else rel_depth_map
with st.spinner("Running LoRA adaptation..."):
final_dsm, loss_hist, anchor_mask_np = run_lora_pipeline(
rgb_raw=rgb_raw,
prior_raw=prior_raw,
rel_map=rel_for_lora,
bbox_mask=bbox_mask,
dino_base=dino_base,
lora_r=lora_r,
lora_alpha=lora_alpha,
tto_steps=tto_steps,
tto_lr=tto_lr
)
st.subheader("Results")
tab1, tab2, tab3, tab4 = st.tabs(
["Final Result", "Relative Depth", "Loss", "Masks"]
)
with tab1:
fig, ax = plt.subplots(1, 3, figsize=(18, 6))
masked_prior = prior_raw.copy()
masked_prior[bbox_mask] = np.nan
ax[0].imshow(viz_rgb)
ax[0].add_patch(plt.Rectangle((x1, y1), x2 - x1, y2 - y1, fill=False, edgecolor="red", lw=2))
ax[0].set_title("Input RGB")
ax[0].axis("off")
ax[1].set_facecolor("black")
im1 = ax[1].imshow(masked_prior, cmap="terrain")
ax[1].add_patch(plt.Rectangle((x1, y1), x2 - x1, y2 - y1, fill=False, edgecolor="red", lw=2))
ax[1].set_title("Input LiDAR (BBox Masked)")
ax[1].axis("off")
plt.colorbar(im1, ax=ax[1], fraction=0.046)
im2 = ax[2].imshow(final_dsm, cmap="terrain")
ax[2].add_patch(plt.Rectangle((x1, y1), x2 - x1, y2 - y1, fill=False, edgecolor="red", lw=2))
ax[2].set_title("LoRA Refined DSM")
ax[2].axis("off")
plt.colorbar(im2, ax=ax[2], fraction=0.046)
st.pyplot(fig)
with tab2:
fig_rel, ax_rel = plt.subplots(1, 2, figsize=(12, 5))
im0 = ax_rel[0].imshow(rel_depth_map, cmap="viridis")
ax_rel[0].set_title("Depth Anything Raw Relative Depth")
ax_rel[0].axis("off")
plt.colorbar(im0, ax=ax_rel[0], fraction=0.046)
im1 = ax_rel[1].imshow(rel_depth_01, cmap="viridis")
ax_rel[1].set_title("Normalized Relative Depth")
ax_rel[1].axis("off")
plt.colorbar(im1, ax=ax_rel[1], fraction=0.046)
st.pyplot(fig_rel)
with tab3:
fig_loss, ax_loss = plt.subplots(figsize=(8, 3))
ax_loss.plot(loss_hist)
ax_loss.set_title("TTO Huber Loss")
ax_loss.set_yscale("log")
ax_loss.grid(True, alpha=0.3)
st.pyplot(fig_loss)
with tab4:
fig_mask, axm = plt.subplots(1, 2, figsize=(10, 4))
axm[0].imshow(bbox_mask, cmap="gray")
axm[0].set_title("Target BBox Mask")
axm[0].axis("off")
axm[1].imshow(anchor_mask_np, cmap="gray")
axm[1].set_title("Anchor Mask")
axm[1].axis("off")
st.pyplot(fig_mask)
out_buf = BytesIO()
prior_meta.update({
"driver": "GTiff",
"height": h_f,
"width": w_f,
"dtype": "float32",
"count": 1
})
with rasterio.open(out_buf, "w", **prior_meta) as dst:
dst.write(final_dsm.astype(np.float32), 1)
st.download_button(
"Download Georeferenced DSM",
out_buf.getvalue(),
file_name="lora_refined_dsm_georef.tif"
)
else:
st.info("Upload RGB and Prior TIFFs, or switch to example TIFFs in the sidebar.")