# -*- coding: utf-8 -*- """Copy of compose_glide.ipynb Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/19xx6Nu4FeiGj-TzTUFxBf-15IkeuFx_F """ import gradio as gr import torch as th from composable_diffusion.download import download_model from composable_diffusion.model_creation import create_model_and_diffusion as create_model_and_diffusion_for_clevr from composable_diffusion.model_creation import model_and_diffusion_defaults as model_and_diffusion_defaults_for_clevr from composable_diffusion.composable_stable_diffusion.pipeline_composable_stable_diffusion import \ ComposableStableDiffusionPipeline import os import shutil import time import glob import numpy as np import open3d as o3d import open3d.visualization.rendering as rendering import plotly.graph_objects as go from PIL import Image from tqdm.auto import tqdm from point_e.diffusion.configs import DIFFUSION_CONFIGS, diffusion_from_config from point_e.diffusion.sampler import PointCloudSampler from point_e.models.download import load_checkpoint from point_e.models.configs import MODEL_CONFIGS, model_from_config from point_e.util.pc_to_mesh import marching_cubes_mesh has_cuda = th.cuda.is_available() device = th.device('cpu' if not th.cuda.is_available() else 'cuda') print(has_cuda) # init stable diffusion model pipe = ComposableStableDiffusionPipeline.from_pretrained( "CompVis/stable-diffusion-v1-4", ).to(device) # uncomment to disable safety_checker # pipe.safety_checker = None # create model for CLEVR Objects clevr_options = model_and_diffusion_defaults_for_clevr() flags = { "image_size": 128, "num_channels": 192, "num_res_blocks": 2, "learn_sigma": True, "use_scale_shift_norm": False, "raw_unet": True, "noise_schedule": "squaredcos_cap_v2", "rescale_learned_sigmas": False, "rescale_timesteps": False, "num_classes": '2', "dataset": "clevr_pos", "use_fp16": has_cuda, "timestep_respacing": '100' } for key, val in flags.items(): clevr_options[key] = val clevr_model, clevr_diffusion = create_model_and_diffusion_for_clevr(**clevr_options) clevr_model.eval() if has_cuda: clevr_model.convert_to_fp16() clevr_model.to(device) clevr_model.load_state_dict(th.load(download_model('clevr_pos'), device)) device = th.device('cpu' if not th.cuda.is_available() else 'cuda') print('creating base model...') base_name = 'base40M-textvec' base_model = model_from_config(MODEL_CONFIGS[base_name], device) base_model.eval() base_diffusion = diffusion_from_config(DIFFUSION_CONFIGS[base_name]) print('creating upsample model...') upsampler_model = model_from_config(MODEL_CONFIGS['upsample'], device) upsampler_model.eval() upsampler_diffusion = diffusion_from_config(DIFFUSION_CONFIGS['upsample']) print('downloading base checkpoint...') base_model.load_state_dict(load_checkpoint(base_name, device)) print('downloading upsampler checkpoint...') upsampler_model.load_state_dict(load_checkpoint('upsample', device)) print('creating SDF model...') name = 'sdf' model = model_from_config(MODEL_CONFIGS[name], device) model.eval() print('loading SDF model...') model.load_state_dict(load_checkpoint(name, device)) def compose_pointe(prompt, weights, version): weight_list = [float(x.strip()) for x in weights.split('|')] sampler = PointCloudSampler( device=device, models=[base_model, upsampler_model], diffusions=[base_diffusion, upsampler_diffusion], num_points=[1024, 4096 - 1024], aux_channels=['R', 'G', 'B'], guidance_scale=[weight_list, 0.0], model_kwargs_key_filter=('texts', ''), # Do not condition the upsampler at all ) def generate_pcd(prompt_list): # Produce a sample from the model. samples = None for x in tqdm(sampler.sample_batch_progressive(batch_size=1, model_kwargs=dict(texts=prompt_list))): samples = x return samples def generate_fig(samples): pc = sampler.output_to_point_clouds(samples)[0] return pc def generate_mesh(pc): mesh = marching_cubes_mesh( pc=pc, model=model, batch_size=4096, grid_size=128, # increase to 128 for resolution used in evals progress=True, ) return mesh def generate_video(mesh_path): render = rendering.OffscreenRenderer(640, 480) mesh = o3d.io.read_triangle_mesh(mesh_path) mesh.compute_vertex_normals() mat = o3d.visualization.rendering.MaterialRecord() mat.shader = 'defaultLit' render.scene.camera.look_at([0, 0, 0], [1, 1, 1], [0, 0, 1]) render.scene.add_geometry('mesh', mesh, mat) timestr = time.strftime("%Y%m%d-%H%M%S") os.makedirs(timestr, exist_ok=True) def update_geometry(): render.scene.clear_geometry() render.scene.add_geometry('mesh', mesh, mat) def generate_images(): for i in range(64): # Rotation R = mesh.get_rotation_matrix_from_xyz((0, 0, np.pi / 32)) mesh.rotate(R, center=(0, 0, 0)) # Update geometry update_geometry() img = render.render_to_image() o3d.io.write_image(os.path.join(timestr + "/{:05d}.jpg".format(i)), img, quality=100) time.sleep(0.05) generate_images() image_list = [] for filename in sorted(glob.glob(f'{timestr}/*.jpg')): # assuming gif im = Image.open(filename) image_list.append(im) # remove the folder shutil.rmtree(timestr) return image_list prompt_list = [x.strip() for x in prompt.split("|")] pcd = generate_pcd(prompt_list) pc = generate_fig(pcd) fig = go.Figure( data=[ go.Scatter3d( x=pc.coords[:, 0], y=pc.coords[:, 1], z=pc.coords[:, 2], mode='markers', marker=dict( size=2, color=['rgb({},{},{})'.format(r, g, b) for r, g, b in zip(pc.channels["R"], pc.channels["G"], pc.channels["B"])], ) ) ], layout=dict( scene=dict( xaxis=dict(visible=False), yaxis=dict(visible=False), zaxis=dict(visible=False) ) ), ) return fig # huggingface failed to render, so we only visualize pointclouds # mesh = generate_mesh(pc) # timestr = time.strftime("%Y%m%d-%H%M%S") # mesh_path = os.path.join(f'{timestr}.ply') # with open(mesh_path, 'wb') as f: # mesh.write_ply(f) # image_frames = generate_video(mesh_path) # gif_path = os.path.join(f'{timestr}.gif') # image_frames[0].save(gif_path, save_all=True, optimizer=False, duration=5, append_images=image_frames[1:], loop=0) # return f'{timestr}.gif' def compose_clevr_objects(prompt, weights, steps): weights = [float(x.strip()) for x in weights.split('|')] weights = th.tensor(weights, device=device).reshape(-1, 1, 1, 1) coordinates = [ [ float(x.split(',')[0].strip()), float(x.split(',')[1].strip())] for x in prompt.split('|') ] coordinates += [[-1, -1]] # add unconditional score label batch_size = 1 clevr_options['timestep_respacing'] = str(int(steps)) _, clevr_diffusion = create_model_and_diffusion_for_clevr(**clevr_options) def model_fn(x_t, ts, **kwargs): half = x_t[:1] combined = th.cat([half] * kwargs['y'].size(0), dim=0) model_out = clevr_model(combined, ts, **kwargs) eps, rest = model_out[:, :3], model_out[:, 3:] masks = kwargs.get('masks') cond_eps = eps[masks] uncond_eps = eps[~masks] half_eps = uncond_eps + (weights * (cond_eps - uncond_eps)).sum(dim=0, keepdims=True) eps = th.cat([half_eps] * x_t.size(0), dim=0) return th.cat([eps, rest], dim=1) def sample(coordinates): masks = [True] * (len(coordinates) - 1) + [False] model_kwargs = dict( y=th.tensor(coordinates, dtype=th.float, device=device), masks=th.tensor(masks, dtype=th.bool, device=device) ) samples = clevr_diffusion.p_sample_loop( model_fn, (len(coordinates), 3, clevr_options["image_size"], clevr_options["image_size"]), device=device, clip_denoised=True, progress=True, model_kwargs=model_kwargs, cond_fn=None, )[:batch_size] return samples samples = sample(coordinates) out_img = samples[0].permute(1, 2, 0) out_img = (out_img + 1) / 2 out_img = (out_img.detach().cpu() * 255.).to(th.uint8) out_img = out_img.numpy() return out_img def stable_diffusion_compose(prompt, steps, weights, seed): generator = th.Generator("cuda").manual_seed(int(seed)) image = pipe(prompt, guidance_scale=7.5, num_inference_steps=steps, weights=weights, generator=generator).images[0] image.save(f'{"_".join(prompt.split())}.png') return image def compose_2D_diffusion(prompt, weights, version, steps, seed): try: with th.no_grad(): if version == 'Stable_Diffusion_1v_4': res = stable_diffusion_compose(prompt, steps, weights, seed) return res else: return compose_clevr_objects(prompt, weights, steps) except Exception as e: return None examples_1 = "A castle in a forest | grainy, fog" examples_3 = '0.1, 0.5 | 0.3, 0.5 | 0.5, 0.5 | 0.7, 0.5 | 0.9, 0.5' examples_5 = 'a white church | lightning in the background' examples_6 = 'mystical trees | A dark magical pond | dark' examples_7 = 'A lake | A mountain | Cherry Blossoms next to the lake' image_examples = [ [examples_6, "7.5 | 7.5 | -7.5", 'Stable_Diffusion_1v_4', 50, 8], [examples_6, "7.5 | 7.5 | 7.5", 'Stable_Diffusion_1v_4', 50, 8], [examples_1, "7.5 | -7.5", 'Stable_Diffusion_1v_4', 50, 0], [examples_7, "7.5 | 7.5 | 7.5", 'Stable_Diffusion_1v_4', 50, 3], [examples_5, "7.5 | 7.5", 'Stable_Diffusion_1v_4', 50, 0], [examples_3, "7.5 | 7.5 | 7.5 | 7.5 | 7.5", 'CLEVR Objects', 100, 0] ] pointe_examples = [["a cake | a house", "7.5 | 7.5", 'Point-E'], ["a chair | chair legs", "7.5 | -7.5", 'Point-E'], ["a green avocado | a chair", "7.5 | 3", 'Point-E'], ["a toilet | a chair", "7 | 5", 'Point-E']] with gr.Blocks() as demo: gr.Markdown( """
Compositional visual generation by composing pre-trained diffusion models using compositional operators, AND and NOT.
""") gr.Markdown( """When composing multiple inputs, please use “|” to separate them
""") gr.Markdown( """( Note: For composing CLEVR objects, we recommend using x in range [0.1, 0.9] and y in range [0.25, 0.7], since the training dataset labels are in given ranges.)