Spaces:
Running
on
Zero
Running
on
Zero
import PIL | |
import PIL.Image | |
from PIL.Image import Image | |
from src.rr_logging_utils import ( | |
log_camera, | |
create_svd_blueprint, | |
) | |
from src.pose_utils import generate_camera_parameters | |
from src.camera_parameters import PinholeParameters | |
from src.depth_utils import image_to_depth | |
from src.image_warping import image_depth_warping | |
from src.sigma_utils import load_lambda_ts | |
from src.nerfstudio_data import frames_to_nerfstudio | |
import gradio as gr | |
from gradio_rerun import Rerun | |
import rerun as rr | |
import rerun.blueprint as rrb | |
import numpy as np | |
import PIL | |
import torch | |
from pathlib import Path | |
import threading | |
from queue import SimpleQueue | |
import trimesh | |
import subprocess | |
import mmcv | |
from uuid import uuid4 | |
from typing import Final, Literal | |
from jaxtyping import Float64, Float32, UInt8 | |
from monopriors.relative_depth_models import ( | |
get_relative_predictor, | |
) | |
from src.custom_diffusers_pipeline.svd import StableVideoDiffusionPipeline | |
from src.custom_diffusers_pipeline.scheduler import EulerDiscreteScheduler | |
try: | |
import spaces # type: ignore | |
IN_SPACES = True | |
except ImportError: | |
print("Not running on Zero") | |
IN_SPACES = False | |
SVD_HEIGHT: Final[int] = 576 | |
SVD_WIDTH: Final[int] = 1024 | |
NEAR: Final[float] = 0.0001 | |
FAR: Final[float] = 500.0 | |
if gr.NO_RELOAD: | |
DepthAnythingV2Predictor = get_relative_predictor("DepthAnythingV2Predictor")( | |
device="cuda" | |
) | |
SVD_PIPE = StableVideoDiffusionPipeline.from_pretrained( | |
"stabilityai/stable-video-diffusion-img2vid-xt", | |
torch_dtype=torch.float16, | |
variant="fp16", | |
) | |
SVD_PIPE.to("cuda") | |
scheduler = EulerDiscreteScheduler.from_config(SVD_PIPE.scheduler.config) | |
SVD_PIPE.scheduler = scheduler | |
def svd_render_threaded( | |
image_o: PIL.Image.Image, | |
masks: Float64[torch.Tensor, "b 72 128"], | |
cond_image: PIL.Image.Image, | |
lambda_ts: Float64[torch.Tensor, "n b"], | |
num_denoise_iters: Literal[2, 25, 50, 100], | |
weight_clamp: float, | |
log_queue: SimpleQueue | None = None, | |
): | |
frames: list[PIL.Image.Image] = SVD_PIPE( | |
[image_o], | |
log_queue=log_queue, | |
temp_cond=cond_image, | |
mask=masks, | |
lambda_ts=lambda_ts, | |
weight_clamp=weight_clamp, | |
num_frames=25, | |
decode_chunk_size=8, | |
num_inference_steps=num_denoise_iters, | |
).frames[0] | |
log_queue.put(frames) | |
if IN_SPACES: | |
svd_render_threaded = spaces.GPU(svd_render_threaded) | |
def gradio_warped_image( | |
image_path: str, | |
num_denoise_iters: Literal[2, 25, 50, 100], | |
direction: Literal["left", "right"], | |
degrees_per_frame: int | float, | |
major_radius: float = 60.0, | |
minor_radius: float = 70.0, | |
num_frames: int = 25, # StableDiffusion Video generates 25 frames | |
progress=gr.Progress(track_tqdm=True), | |
): | |
# ensure that the degrees per frame is a float | |
degrees_per_frame = float(degrees_per_frame) | |
image_path: Path = Path(image_path) if isinstance(image_path, str) else image_path | |
assert image_path.exists(), f"Image file not found: {image_path}" | |
save_path: Path = image_path.parent / f"{image_path.stem}_{uuid4()}" | |
# setup rerun logging | |
stream = rr.binary_stream() | |
parent_log_path = Path("world") | |
rr.log(f"{parent_log_path}", rr.ViewCoordinates.LDB, static=True) | |
blueprint: rrb.Blueprint = create_svd_blueprint(parent_log_path) | |
rr.send_blueprint(blueprint) | |
# Load image and resize to SVD dimensions | |
rgb_original: Image = PIL.Image.open(image_path) | |
rgb_resized: Image = rgb_original.resize( | |
(SVD_WIDTH, SVD_HEIGHT), PIL.Image.Resampling.NEAREST | |
) | |
rgb_np_original: UInt8[np.ndarray, "h w 3"] = np.array(rgb_original) | |
rgb_np_hw3: UInt8[np.ndarray, "h w 3"] = np.array(rgb_resized) | |
# generate initial camera parameters for video trajectory | |
camera_list: list[PinholeParameters] = generate_camera_parameters( | |
num_frames=num_frames, | |
image_width=SVD_WIDTH, | |
image_height=SVD_HEIGHT, | |
degrees_per_frame=degrees_per_frame, | |
major_radius=major_radius, | |
minor_radius=minor_radius, | |
direction=direction, | |
) | |
assert len(camera_list) == num_frames, "Number of camera parameters mismatch" | |
# Estimate depth map and pointcloud for the input image | |
depth: Float32[np.ndarray, "h w"] | |
trimesh_pc: trimesh.PointCloud | |
depth_original: Float32[np.ndarray, "original_h original_w"] | |
trimesh_pc_original: trimesh.PointCloud | |
depth, trimesh_pc, depth_original, trimesh_pc_original = image_to_depth( | |
rgb_np_original=rgb_np_original, | |
rgb_np_hw3=rgb_np_hw3, | |
cam_params=camera_list[0], | |
near=NEAR, | |
far=FAR, | |
depth_predictor=DepthAnythingV2Predictor, | |
) | |
rr.log( | |
f"{parent_log_path}/point_cloud", | |
rr.Points3D( | |
positions=trimesh_pc.vertices, | |
colors=trimesh_pc.colors, | |
), | |
static=True, | |
) | |
start_cam: PinholeParameters = camera_list[0] | |
cond_image: list[PIL.Image.Image] = [] | |
masks: list[Float64[torch.Tensor, "1 72 128"]] = [] | |
# Perform image depth warping to generated camera parameters | |
current_cam: PinholeParameters | |
for frame_id, current_cam in enumerate(camera_list): | |
rr.set_time_sequence("frame_id", frame_id) | |
if frame_id == 0: | |
cam_log_path: Path = parent_log_path / "warped_camera" | |
log_camera(cam_log_path, current_cam, rgb_np_hw3, depth) | |
else: | |
# clear logged depth from the previous frame | |
rr.log(f"{cam_log_path}/pinhole/depth", rr.Clear(recursive=False)) | |
cam_log_path: Path = parent_log_path / "warped_camera" | |
# do image warping | |
warped_frame2, mask_erosion_tensor = image_depth_warping( | |
image=rgb_np_hw3, | |
depth=depth, | |
cam_T_world_44_s=start_cam.extrinsics.cam_T_world, | |
cam_T_world_44_t=current_cam.extrinsics.cam_T_world, | |
K=current_cam.intrinsics.k_matrix, | |
) | |
cond_image.append(warped_frame2) | |
masks.append(mask_erosion_tensor) | |
log_camera(cam_log_path, current_cam, np.asarray(warped_frame2)) | |
yield stream.read(), None, [], "" | |
masks: Float64[torch.Tensor, "b 72 128"] = torch.cat(masks) | |
# load sigmas to optimize for timestep | |
progress(0.1, desc="Optimizing timesteps for diffusion") | |
lambda_ts: Float64[torch.Tensor, "n b"] = load_lambda_ts(num_denoise_iters) | |
progress(0.15, desc="Starting diffusion") | |
# to allow logging from a separate thread | |
log_queue: SimpleQueue = SimpleQueue() | |
handle = threading.Thread( | |
target=svd_render_threaded, | |
kwargs={ | |
"image_o": rgb_resized, | |
"masks": masks, | |
"cond_image": cond_image, | |
"lambda_ts": lambda_ts, | |
"num_denoise_iters": num_denoise_iters, | |
"weight_clamp": 0.2, | |
"log_queue": log_queue, | |
}, | |
) | |
handle.start() | |
i = 0 | |
while True: | |
msg = log_queue.get() | |
match msg: | |
case frames if all(isinstance(frame, PIL.Image.Image) for frame in frames): | |
break | |
case entity_path, entity, times: | |
i += 1 | |
rr.reset_time() | |
for timeline, time in times: | |
if isinstance(time, int): | |
rr.set_time_sequence(timeline, time) | |
else: | |
rr.set_time_seconds(timeline, time) | |
static = False | |
if entity_path == "diffusion_step": | |
static = True | |
rr.log(entity_path, entity, static=static) | |
yield stream.read(), None, [], f"{i} out of {num_denoise_iters}" | |
case _: | |
assert False | |
handle.join() | |
# all frames but the first one | |
frame: np.ndarray | |
for frame_id, (frame, cam_pararms) in enumerate(zip(frames, camera_list)): | |
# add one since the first frame is the original image | |
rr.set_time_sequence("frame_id", frame_id) | |
cam_log_path = parent_log_path / "generated_camera" | |
generated_rgb_np: UInt8[np.ndarray, "h w 3"] = np.array(frame) | |
log_camera(cam_log_path, cam_pararms, generated_rgb_np, depth=None) | |
yield stream.read(), None, [], "finished" | |
frames_to_nerfstudio( | |
rgb_np_original, frames, trimesh_pc_original, camera_list, save_path | |
) | |
# zip up nerfstudio data | |
zip_file_path = save_path / "nerfstudio.zip" | |
progress(0.95, desc="Zipping up camera data in nerfstudio format") | |
# Run the zip command | |
subprocess.run(["zip", "-r", str(zip_file_path), str(save_path)], check=True) | |
video_file_path = save_path / "output.mp4" | |
mmcv.frames2video(str(save_path), str(video_file_path), fps=7) | |
print(f"Video saved to {video_file_path}") | |
yield stream.read(), video_file_path, [str(zip_file_path)], "finished" | |
with gr.Blocks() as demo: | |
with gr.Tab("Streaming"): | |
with gr.Row(): | |
img = gr.Image(interactive=True, label="Image", type="filepath") | |
with gr.Tab(label="Settings"): | |
with gr.Column(): | |
warp_img_btn = gr.Button("Warp Images") | |
num_iters = gr.Radio( | |
choices=[2, 25, 50, 100], | |
value=2, | |
label="Number of iterations", | |
type="value", | |
) | |
cam_direction = gr.Radio( | |
choices=["left", "right"], | |
value="left", | |
label="Camera direction", | |
type="value", | |
) | |
degrees_per_frame = gr.Slider( | |
minimum=0.25, | |
maximum=1.0, | |
step=0.05, | |
value=0.3, | |
label="Degrees per frame", | |
) | |
iteration_num = gr.Textbox( | |
value="", | |
label="Current Diffusion Step", | |
) | |
with gr.Tab(label="Outputs"): | |
video_output = gr.Video(interactive=False) | |
image_files_output = gr.File(interactive=False, file_count="multiple") | |
# Rerun 0.16 has issues when embedded in a Gradio tab, so we share a viewer between all the tabs. | |
# In 0.17 we can instead scope each viewer to its own tab to clean up these examples further. | |
with gr.Row(): | |
viewer = Rerun( | |
streaming=True, | |
) | |
warp_img_btn.click( | |
gradio_warped_image, | |
inputs=[img, num_iters, cam_direction, degrees_per_frame], | |
outputs=[viewer, video_output, image_files_output, iteration_num], | |
) | |
gr.Examples( | |
[ | |
[ | |
"/home/pablo/0Dev/docker/.per/repos/NVS_Solver/example_imgs/single/000001.jpg", | |
], | |
], | |
fn=warp_img_btn, | |
inputs=[img, num_iters, cam_direction, degrees_per_frame], | |
outputs=[viewer, video_output, image_files_output], | |
) | |
if __name__ == "__main__": | |
demo.queue().launch() | |