import gradio as gr import cv2 from PIL import Image import numpy as np from transformers import pipeline import os import torch import torch.nn.functional as F from torchvision import transforms from torchvision.transforms import Compose import trimesh from geometry import create_triangles import tempfile from functools import partial import spaces from zipfile import ZipFile import json from depth_anything.dpt import DepthAnything from depth_anything.util.transform import Resize, NormalizeImage, PrepareForNet from moviepy.editor import * edge = [] gradient = None params = { "fnum":0, "l":16 } frame_selected = 0 frames = [] depths = [] masks = [] locations = [] mesh = [] scene = None def zip_files(files_in, files_out): with ZipFile("depth_result.zip", "w") as zipObj: for idx, file in enumerate(files_in): zipObj.write(file, file.split("/")[-1]) for idx, file in enumerate(files_out): zipObj.write(file, file.split("/")[-1]) return "depth_result.zip" def create_video(frames, fps, type): print("building video result") clip = ImageSequenceClip(frames, fps=fps) clip.write_videofile(type + "_result.mp4", fps=fps) return type + "_result.mp4" @torch.no_grad() def predict_depth(model, image): return model(image)["depth"] #@spaces.GPU def make_video(video_path, outdir='./vis_video_depth', encoder='vits'): if encoder not in ["vitl","vitb","vits"]: encoder = "vits" mapper = {"vits":"small","vitb":"base","vitl":"large"} # DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' # model = DepthAnything.from_pretrained('LiheYoung/depth_anything_vitl14').to(DEVICE).eval() # Define path for temporary processed frames temp_frame_dir = tempfile.mkdtemp() margin_width = 50 to_tensor_transform = transforms.ToTensor() DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' # depth_anything = DepthAnything.from_pretrained('LiheYoung/depth_anything_{}14'.format(encoder)).to(DEVICE).eval() depth_anything = pipeline(task = "depth-estimation", model=f"nielsr/depth-anything-{mapper[encoder]}") # total_params = sum(param.numel() for param in depth_anything.parameters()) # print('Total parameters: {:.2f}M'.format(total_params / 1e6)) transform = Compose([ Resize( width=518, height=518, resize_target=False, keep_aspect_ratio=True, ensure_multiple_of=14, resize_method='lower_bound', image_interpolation_method=cv2.INTER_CUBIC, ), NormalizeImage(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), PrepareForNet(), ]) if os.path.isfile(video_path): if video_path.endswith('txt'): with open(video_path, 'r') as f: lines = f.read().splitlines() else: filenames = [video_path] else: filenames = os.listdir(video_path) filenames = [os.path.join(video_path, filename) for filename in filenames if not filename.startswith('.')] filenames.sort() # os.makedirs(outdir, exist_ok=True) for k, filename in enumerate(filenames): file_size = os.path.getsize(filename)/1024/1024 if file_size > 128.0: print(f'File size of {filename} larger than 128Mb, sorry!') return filename print('Progress {:}/{:},'.format(k+1, len(filenames)), 'Processing', filename) raw_video = cv2.VideoCapture(filename) frame_width, frame_height = int(raw_video.get(cv2.CAP_PROP_FRAME_WIDTH)), int(raw_video.get(cv2.CAP_PROP_FRAME_HEIGHT)) frame_rate = int(raw_video.get(cv2.CAP_PROP_FPS)) if frame_rate < 1: frame_rate = 1 cframes = int(raw_video.get(cv2.CAP_PROP_FRAME_COUNT)) print(f'frames: {cframes}, fps: {frame_rate}') # output_width = frame_width * 2 + margin_width #filename = os.path.basename(filename) # output_path = os.path.join(outdir, filename[:filename.rfind('.')] + '_video_depth.mp4') #with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmpfile: # output_path = tmpfile.name #out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"avc1"), frame_rate, (output_width, frame_height)) #fourcc = cv2.VideoWriter_fourcc(*'mp4v') #out = cv2.VideoWriter(output_path, fourcc, frame_rate, (output_width, frame_height)) global masks count=0 depth_frames = [] orig_frames = [] while raw_video.isOpened(): ret, raw_frame = raw_video.read() if not ret: break frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2RGB) / 255.0 frame_pil = Image.fromarray((frame * 255).astype(np.uint8)) frame = transform({'image': frame})['image'] frame = torch.from_numpy(frame).unsqueeze(0).to(DEVICE) depth = to_tensor_transform(predict_depth(depth_anything, frame_pil)) depth = F.interpolate(depth[None], (frame_height, frame_width), mode='bilinear', align_corners=False)[0, 0] depth = (depth - depth.min()) / (depth.max() - depth.min()) * 255.0 depth = depth.cpu().numpy().astype(np.uint8) depth_color = cv2.applyColorMap(depth, cv2.COLORMAP_BONE) depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGBA2GRAY) depth_color = cv2.cvtColor(depth_gray, cv2.COLOR_GRAY2BGR) # Remove white border around map: # define lower and upper limits of white white_lo = np.array([250,250,250]) white_hi = np.array([255,255,255]) # mask image to only select white mask = cv2.inRange(depth_color, white_lo, white_hi) # change image to black where we found white depth_color[mask>0] = (0,0,0) # split_region = np.ones((frame_height, margin_width, 3), dtype=np.uint8) * 255 # combined_frame = cv2.hconcat([raw_frame, split_region, depth_color]) # out.write(combined_frame) # frame_path = os.path.join(temp_frame_dir, f"frame_{count:05d}.png") # cv2.imwrite(frame_path, combined_frame) cv2.imwrite(f"f{count}.png", raw_frame) orig_frames.append(f"f{count}.png") cv2.imwrite(f"f{count}_dmap.png", depth_color) depth_frames.append(f"f{count}_dmap.png") masks.append(f"f{count}_dmap.png") count += 1 final_vid = create_video(depth_frames, frame_rate, "depth") final_zip = zip_files(orig_frames, depth_frames) raw_video.release() # out.release() cv2.destroyAllWindows() global gradient global frame_selected global depths global frames frames = orig_frames depths = depth_frames if depth_color.shape[0] == 2048: #height masks[len(masks)-1] = './gradient_large.png' depth_frames[len(masks)-1] = './gradient_large.png' gradient = cv2.imread('./gradient_large.png').astype(np.uint8) elif depth_color.shape[0] == 1024: masks[len(masks)-1] = './gradient.png' depth_frames[len(masks)-1] = './gradient.png' gradient = cv2.imread('./gradient.png').astype(np.uint8) else: masks[len(masks)-1] = './gradient_small.png' depth_frames[len(masks)-1] = './gradient_small.png' gradient = cv2.imread('./gradient_small.png').astype(np.uint8) return final_vid, final_zip, frames, masks[frame_selected] #output_path def depth_edges_mask(depth): """Returns a mask of edges in the depth map. Args: depth: 2D numpy array of shape (H, W) with dtype float32. Returns: mask: 2D numpy array of shape (H, W) with dtype bool. """ # Compute the x and y gradients of the depth map. depth_dx, depth_dy = np.gradient(depth) # Compute the gradient magnitude. depth_grad = np.sqrt(depth_dx ** 2 + depth_dy ** 2) # Compute the edge mask. mask = depth_grad > 0.05 return mask def pano_depth_to_world_points(depth): """ 360 depth to world points given 2D depth is an equirectangular projection of a spherical image Treat depth as radius longitude : -pi to pi latitude : -pi/2 to pi/2 """ mask = cv2.inRange(depth[int(depth.shape[0]/8*6.5):depth.shape[0]-1, 0:depth.shape[1]], 160, 255) depth[int(depth.shape[0]/8*6.5):depth.shape[0]-1, 0:depth.shape[1]][mask>0] = 160 depth[depth.shape[0]-1:depth.shape[0], 0:depth.shape[1]] = 255 # Convert depth to radius radius = (255 - depth.flatten()) lon = np.linspace(0, np.pi*2, depth.shape[1]) lat = np.linspace(0, np.pi, depth.shape[0]) lon, lat = np.meshgrid(lon, lat) lon = lon.flatten() lat = lat.flatten() pts3d = [[255,255,255]] uv = [[1,1]] for i in range(0, 1): #(0,2) for j in range(0, 1): #(0,2) #rnd_lon = (np.random.rand(depth.shape[0]*depth.shape[1]) - 0.5) / 8 #rnd_lat = (np.random.rand(depth.shape[0]*depth.shape[1]) - 0.5) / 8 d_lon = lon + i/2 * np.pi*2 / depth.shape[1] d_lat = lat + j/2 * np.pi / depth.shape[0] # Convert to cartesian coordinates x = radius * np.cos(d_lon) * np.sin(d_lat) y = radius * np.cos(d_lat) z = radius * np.sin(d_lon) * np.sin(d_lat) pts = np.stack([x, y, z], axis=1) uvs = np.stack([lon, lat], axis=1) pts3d = np.concatenate((pts3d, pts), axis=0) uv = np.concatenate((uv, uvs), axis=0) #print(f'i: {i}, j: {j}') j = j+1 i = i+1 return [pts3d, uv] def rgb2gray(rgb): return np.dot(rgb[...,:3], [0.333, 0.333, 0.333]) def get_mesh(image, depth, blur_data, loadall): global locations global mesh global scene if loadall == False: mesh = [] fnum = frame_selected #print(image[fnum][0]) #print(depth["composite"]) depthc = cv2.cvtColor(depth["background"], cv2.COLOR_RGBA2RGB) blur_img = blur_image(image[fnum][0], depthc, blur_data) gdepth = rgb2gray(depthc) print('depth to gray - ok') points = pano_depth_to_world_points(gdepth) pts3d = points[0] uv = points[1] print('radius from depth - ok') # Create a trimesh mesh from the points # Each pixel is connected to its 4 neighbors # colors are the RGB values of the image mask = cv2.inRange(pts3d, [0,0,0], [0,0,0]) np.delete(pts3d[mask>0]) np.delete(uv[mask>0]) verts = pts3d.reshape(-1, 3) #triangles = create_triangles(image.shape[0], image.shape[1]) #print('triangles - ok') rgba = cv2.cvtColor(blur_img, cv2.COLOR_RGB2RGBA) np.delete(rgba[mask>0]) colors = rgba.reshape(-1, 4) clrs = [[128,128,128,0]] for i in range(0,1): #(0,4) clrs = np.concatenate((clrs, colors), axis=0) i = i+1 #mesh = trimesh.Trimesh(vertices=verts, faces=triangles, vertex_colors=colors) mesh.append(trimesh.PointCloud(verts, colors=clrs)) #material = trimesh.visual.texture.SimpleMaterial(image=image) #texture = trimesh.visual.TextureVisuals(uv=uv, image=image, material=material) #mesh.visual = texture scene = trimesh.Scene(mesh) print('mesh - ok') # Save as glb glb_file = tempfile.NamedTemporaryFile(suffix='.glb', delete=False) glb_path = glb_file.name scene.export(glb_path) print('file - ok') return glb_path def blur_image(image, depth, blur_data): blur_a = blur_data.split() print(f'blur data {blur_data}') blur_frame = image.copy() j = 0 while j < 256: i = 255 - j blur_lo = np.array([i,i,i]) blur_hi = np.array([i+1,i+1,i+1]) blur_mask = cv2.inRange(depth, blur_lo, blur_hi) print(f'kernel size {int(blur_a[j])}') blur = cv2.GaussianBlur(image, (int(blur_a[j]), int(blur_a[j])), 0) blur_frame[blur_mask>0] = blur[blur_mask>0] j = j + 1 return blur_frame def loadurl(url): return url def select_frame(v, evt: gr.SelectData): global frame_selected global masks global edge if evt.index != frame_selected: masks[frame_selected] = v frame_selected = evt.index edge = [] return masks[frame_selected], frame_selected def switch_rows(v): global frames global depths if v == True: print(depths[0]) return depths else: print(frames[0]) return frames def apply_mask(d): global frame_selected global masks masks[frame_selected] = d["background"] cv2.imwrite(f"f{frame_selected}_dmap.png", masks[frame_selected]) return masks[frame_selected], f"f{frame_selected}_dmap.png" def draw_mask(l, v, d, evt: gr.EventData): global params global frame_selected global masks global gradient global edge points = json.loads(v) pts = np.array(points, np.int32) pts = pts.reshape((-1,1,2)) scale = 1 delta = 0 ddepth = cv2.CV_16S if len(edge) == 0 or params["fnum"] != frame_selected or params["l"] != l: if len(edge) > 0: d["background"] = cv2.imread(masks[frame_selected]).astype(np.uint8) bg = cv2.cvtColor(d["background"], cv2.COLOR_RGBA2GRAY) diff = (bg-cv2.cvtColor(gradient, cv2.COLOR_RGBA2GRAY)).astype(np.uint8) mask = cv2.inRange(diff, -1, 1) #kernel = np.ones((c,c),np.float32)/(c*c) #mask = cv2.filter2D(mask,-1,kernel) grad = cv2.convertScaleAbs(cv2.Sobel(mask, ddepth, 1, 1, ksize=3, scale=scale, delta=delta, borderType=cv2.BORDER_DEFAULT)) mask = mask + cv2.inRange(grad, 1, 255) indices = np.arange(0,256) # List of all colors divider = np.linspace(0,255,l+1)[1] # we get a divider quantiz = np.int0(np.linspace(0,255,l)) # we get quantization colors color_levels = np.clip(np.int0(indices/divider),0,l-1) # color levels 0,1,2.. palette = quantiz[color_levels] #for i in range(l): # bg[(bg >= i*255/l) & (bg < (i+1)*255/l)] = i*255/(l-1) bg = cv2.convertScaleAbs(palette[bg]).astype(np.uint8) # Converting image back to uint bg[mask>0] = 255 params["fnum"] = frame_selected params["l"] = l d["layers"][0] = cv2.cvtColor(bg, cv2.COLOR_GRAY2RGBA) edge = bg.copy() else: bg = edge.copy() x = points[len(points)-1][0] y = points[len(points)-1][1] mask = cv2.floodFill(bg, None, (x, y), 1, 0, 0)[2] #, (4 | cv2.FLOODFILL_FIXED_RANGE | cv2.FLOODFILL_MASK_ONLY | 255 << 8) # 255 << 8 tells to fill with the value 255) mask = mask[1:mask.shape[0]-1, 1:mask.shape[1]-1] d["layers"][0][mask>0] = (0,0,0,255) d["background"][mask>0] = (0,0,0,255) return gr.ImageEditor(value=d) css = """ #img-display-container { max-height: 100vh; } #img-display-input { max-height: 80vh; } #img-display-output { max-height: 80vh; } """ title = "# Depth Anything Video Demo" description = """Depth Anything on full video files. Please refer to our [paper](https://arxiv.org/abs/2401.10891), [project page](https://depth-anything.github.io), or [github](https://github.com/LiheYoung/Depth-Anything) for more details. Mesh rendering from [ZoeDepth](https://huggingface.co/spaces/shariqfarooq/ZoeDepth) ([github](https://github.com/isl-org/ZoeDepth/tree/main/ui)).""" transform = Compose([ Resize( width=518, height=518, resize_target=False, keep_aspect_ratio=True, ensure_multiple_of=14, resize_method='lower_bound', image_interpolation_method=cv2.INTER_CUBIC, ), NormalizeImage(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), PrepareForNet(), ]) # @torch.no_grad() # def predict_depth(model, image): # return model(image) with gr.Blocks(css=css) as demo: gr.Markdown(title) gr.Markdown(description) gr.Markdown("### Video Depth Prediction demo") with gr.Row(): with gr.Column(): input_url = gr.Textbox(value="./examples/streetview.mp4", label="URL") input_video = gr.Video(label="Input Video", format="mp4") input_url.change(fn=loadurl, inputs=[input_url], outputs=[input_video]) output_frame = gr.Gallery(label="Frames", preview=True, columns=8192, type="numpy") output_switch = gr.Checkbox(label="Show depths") output_switch.input(fn=switch_rows, inputs=[output_switch], outputs=[output_frame]) output_mask = gr.ImageEditor(layers=False, sources=('upload', 'clipboard'), show_download_button=True, type="numpy", interactive=True, transforms=(None,), eraser=gr.Eraser(), brush=gr.Brush(default_size=0, colors=['black', '#505050', '#a0a0a0', 'white']), elem_id="image_edit") depth_file = gr.File(label="Edited depth") output_mask.apply(fn=apply_mask, inputs=[output_mask], outputs=[output_mask, depth_file]) with gr.Accordion(label="Edge", open=False): levels = gr.Slider(label="Color levels", value=16, maximum=32, minimum=2, step=1) mouse = gr.Textbox(elem_id="mouse", value="""[]""", interactive=False) mouse.input(fn=draw_mask, show_progress="minimal", inputs=[levels, mouse, output_mask], outputs=[output_mask]) selector = gr.HTML(value=""" ⊹ Select point ✕ Clear selection""") submit = gr.Button("Submit") with gr.Column(): model_type = gr.Dropdown([("small", "vits"), ("base", "vitb"), ("large", "vitl")], type="value", value="vits", label='Model Type') processed_video = gr.Video(label="Output Video", format="mp4") processed_zip = gr.File(label="Output Archive") result = gr.Model3D(label="3D Mesh", clear_color=[0.5, 0.5, 0.5, 0.0], camera_position=[0, 90, 0], interactive=True, elem_id="model3D") svg_in = gr.HTML(value=""" """) average = gr.HTML(value="""1""") with gr.Accordion(label="Blur levels", open=False): blur_in = gr.Textbox(label="Kernel size", show_label=False, value="1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1") with gr.Accordion(label="Locations", open=False): offset = gr.HTML(value="""
`  1  2  3  4  5  6  7  8  9  0  -  =  
       W  E     T  Y     I  O     {  }
     A-`S´-D  F-`G´-H  J-`K´-L  ;  '
      Z´ X̀     V´ B̀     M´ `,  .  /
      move    rotate    scale
                
""") selected = gr.Number(elem_id="fnum", value=0, minimum=0, maximum=256, interactive=False) output_frame.select(fn=select_frame, inputs=[output_mask], outputs=[output_mask, selected]) example_coords = """[ {"latLng": { "lat": 50.07379596793083, "lng": 14.437146122950555 } }, {"latLng": { "lat": 50.073799567020004, "lng": 14.437146774240507 } }, {"latLng": { "lat": 50.07377647505558, "lng": 14.437161000659017 } }, {"latLng": { "lat": 50.07379496839027, "lng": 14.437148958238538 } }, {"latLng": { "lat": 50.073823157821664, "lng": 14.437124189538856 } } ]""" coords = gr.JSON(elem_id="coords", value=example_coords, label="Precise coordinates", show_label=False) html = gr.HTML(value="""0.8""") camera = gr.HTML(value="""reset camera""") contrast = gr.HTML(value="""2.0""") exposure = gr.HTML(value="""0.5""") canvas = gr.HTML(value="""snapshot

""") load_all = gr.Checkbox(label="Load all") render = gr.Button("Render") def on_submit(uploaded_video,model_type,coordinates): global locations locations = [] avg = [0, 0] if not coordinates: locations = json.loads(example_coords) for k, location in enumerate(locations): locations[k] = location["latLng"] avg[0] = avg[0] + locations[k]["lat"] avg[1] = avg[1] + locations[k]["lng"] else: locations = json.loads(coordinates) for k, location in enumerate(locations): locations[k] = location["location"]["latLng"] avg[0] = avg[0] + locations[k]["lat"] avg[1] = avg[1] + locations[k]["lng"] avg[0] = avg[0] / len(locations) avg[1] = avg[1] / len(locations) for k, location in enumerate(locations): locations[k]["lat"] = location["lat"] - avg[0] locations[k]["lng"] = location["lng"] - avg[1] print(locations) # Process the video and get the path of the output video output_video_path = make_video(uploaded_video,encoder=model_type) return output_video_path + (locations,) submit.click(on_submit, inputs=[input_video, model_type, coords], outputs=[processed_video, processed_zip, output_frame, output_mask, coords]) render.click(partial(get_mesh), inputs=[output_frame, output_mask, blur_in, load_all], outputs=[result]) example_files = os.listdir('examples') example_files.sort() example_files = [os.path.join('examples', filename) for filename in example_files] examples = gr.Examples(examples=example_files, inputs=[input_video], outputs=[processed_video, processed_zip, output_frame, output_mask, coords], fn=on_submit, cache_examples=True) if __name__ == '__main__': demo.queue().launch()