import gradio import torch import numpy from PIL import Image from torchvision import transforms from diffusers import StableDiffusionInpaintPipeline from diffusers import DPMSolverMultistepScheduler print("Initializing View Diffusion") deviceStr = "cuda" if torch.cuda.is_available() else "cpu" device = torch.device(deviceStr) latents = None latentsOld = None latentsSize = (1, 4, 64, 64) imageSize = (512, 512) lastImage = Image.new(mode="RGB", size=imageSize) lastSeed = 4096 generator = torch.Generator(device).manual_seed(lastSeed) modelNames = ["stabilityai/stable-diffusion-2-inpainting", "runwayml/stable-diffusion-inpainting"] modelIndex = 0 outpaintPipeline = None oldLatentWalk = None activeLatents = None oldLatents = None def GenerateNewLatentsForInference(): global latents, oldLatents if activeLatents is not None: oldLatents = activeLatents else: oldLatents = latents if deviceStr == "cuda": latents = torch.randn(latentsSize, device=device, dtype=torch.float16) else: latents = torch.randn(latentsSize, device=device) return 0 def InitializeOutpainting(): print("Initializing Outpainting") global outpaintPipeline if deviceStr == "cuda": outpaintPipeline = StableDiffusionInpaintPipeline.from_pretrained(modelNames[modelIndex], torch_dtype=torch.float16) #safety_checker=lambda images, **kwargs: (images, False)) outpaintPipeline.to(device) outpaintPipeline.enable_xformers_memory_efficient_attention() else: outpaintPipeline = StableDiffusionInpaintPipeline.from_pretrained(modelNames[modelIndex]) #safety_checker=lambda images, **kwargs: (images, False)) outpaintPipeline.scheduler = DPMSolverMultistepScheduler.from_config(outpaintPipeline.scheduler.config) outpaintPipeline.set_progress_bar_config(disable=True) # Based on: https://discuss.pytorch.org/t/help-regarding-slerp-function-for-generative-model-sampling/32475/4 # Further optimized to trade a divide operation for a multiply def Slerp(start, end, alpha): start_norm = torch.norm(start, dim=1, keepdim=True) end_norm = torch.norm(end, dim=1, keepdim=True) omega = torch.acos((start*end/(start_norm*end_norm)).sum(1)) sinOmega = torch.sin(omega) first = torch.sin((1.0-alpha)*omega)/sinOmega second = torch.sin(alpha*omega)/sinOmega return first.unsqueeze(1)*start + second.unsqueeze(1)*end def Diffuse(latentWalk, generatorSeed, inputImage, mask, prompt, negativePrompt, guidanceScale, numInferenceSteps): global lastImage, lastSeed, generator, oldLatentWalk, activeLatents if mask is None or pauseInference is True: return lastImage #if staticLatents is False: # GenerateNewLatentsForInference() if oldLatentWalk != latentWalk: activeLatents = Slerp(oldLatents, latents, latentWalk) oldLatentWalk = latentWalk if lastSeed != generatorSeed: generator = torch.Generator(device).manual_seed(generatorSeed) lastSeed = generatorSeed newImage = outpaintPipeline(prompt=prompt, negative_prompt=negativePrompt, image=inputImage, mask_image=mask, guidance_scale=guidanceScale, num_inference_steps=numInferenceSteps, latents=activeLatents, generator=generator).images[0] if not pauseInference: lastImage = newImage return newImage InitializeOutpainting() print("Generating Latents") GenerateNewLatentsForInference() GenerateNewLatentsForInference() activeLatents = oldLatents print("Initializing Gradio Interface") defaultMask = Image.open("assets/masks/diamond.png") numInfStepsDesc = "A higher value generally increases quality, but reduces the frames per second of the output stream." #staticLatentsDesc = "This setting increases the frame to frame determisn of the generation. If this is disabled, then the inference will take continuous large walks across the latent space between frames." generatorSeedDesc = "Identical seeds allow for persistent scene generation between runs, and changing the seed will take a static large walk across the latent space to better control and alter the generation of scene scene content especially when large abberations exist in the reconstruction." promptDesc = "This text will condition the generation of the scene to help guide the content creation." negPromptDesc = "This text will help deter the generation from converging towards reconstructing the elements described in the text." outputText = "This inferred imagery expands the field of view from the masked area of the input camera feed." latentWalkDesc = "This allows you to walk short spans across the latent space with relatively continuous gradients." examplePrompt1 = "A person in a room" #A person in a room with colored hair" examplePrompt2 = "A person with colored hair" #"People in a room with colored hair" examplePrompt3 = "A person on a beach with long hair" #"A man on a beach with long hair" examplePrompt4 = "A person in a field under a night sky" #"A woman on a beach with long hair" examplePrompt5 = "A panda eating bamboo" #"A panda eating bamboo" examplePrompt6 = "A bird flying in the sky" #"A family together in a room" examplePrompt7 = "A Koala bear" #"A family together outside with colored hair" with gradio.Blocks(live=True) as ux: gradio.Markdown("This generative machine learning demonstration streams stable diffusion outpainting inference live from your camera on your computer or phone to expand your local reality and create an alternate world. High quality frame to frame determinism is a hard problem to solve for latent diffusion models as the generation is inherently relative to input noise distributions for the latents, and many factors such as the inherent Bayer noise from the camera images as well as anything that is altered between camera images (such as focus, white balance, etc) causes non-determinism between frames. Some methods apply spationtemporal attention, but this demonstration focuses on the control over the input latents to navigate the latent space. **Increase the lighting of your physical scene from your camera's perspective, and avoid self shadows of scene content, to improve the quality and consistency of the scene generation.**") with gradio.Row(): with gradio.Column(): #staticLatents = gradio.Checkbox(label="Static Latents", info=staticLatentsDesc, value=True, interactive=True) inputImage = gradio.Image(label="Input Feed", source="webcam", shape=[512,512], streaming=True) mask = gradio.Image(label="Mask", type="pil", value=defaultMask) prompt = gradio.Textbox(label="Prompt", info=promptDesc, placeholder=examplePrompt1, lines=3) negativePrompt = gradio.Textbox(label="Negative Prompt", info=negPromptDesc, placeholder="Facial hair", lines=3) guidanceScale = gradio.Slider(label="Guidance Scale", info="A higher value causes the generation to be more relative to the text prompt conditioning.", maximum=100, minimum=1, value=7.5, step= 0.1) numInferenceSteps = gradio.Slider(label="Number of Inference Steps", info=numInfStepsDesc, maximum=100, minimum=1, value=20, step=1) generatorSeed = gradio.Slider(label="Generator Seed", info=generatorSeedDesc, maximum=10000, minimum=1, value=lastSeed, step=1) #modelIndex = gradio.Dropdown(modelNames, label="Model", value="runwayml/stable-diffusion-inpainting") #inputImage.style(full_width=True) with gradio.Column(): gradio.Markdown("The navigation will attempt to continously loiter in its current location in the embedded space if no input variables change. If you click **Generate New Latents**, then it will preserve the current active latents in the walk,create a new set of random latents, and reset the **Latent Walk** value so that you can walk to a new location.") generateLatents = gradio.Button(value="Generate New Latents") latentWalk = gradio.Slider(label="Latent Walk", info=latentWalkDesc, maximum=1.0, minimum=0.0, value=0.0, interactive=True) outputImage = gradio.Image(label="Extrapolated Field of View") pauseInference = gradio.Checkbox(label="Pause Inference", value=False) inferenceInputs = [latentWalk, generatorSeed, inputImage, mask, prompt, negativePrompt, guidanceScale, numInferenceSteps] generateLatents.click(GenerateNewLatentsForInference, outputs=latentWalk) inputImage.change(fn=Diffuse, inputs=inferenceInputs, outputs=outputImage, show_progress=False) examples = [[1.0, 1234, "assets/input/man.png", "assets/masks/diamond.png", examplePrompt1, "", 7.5, 20], [0.5, 2048, "assets/input/people.jpg", "assets/masks/star.png", examplePrompt2, "", 7.5, 15], [0.3, 8192, "assets/input/man.png", "assets/masks/sphere.png", examplePrompt3, "", 7.5, 25], [0.7, 1024, "assets/input/woman.jpg", "assets/masks/spiral.png", examplePrompt4, "", 7.5, 15], [1.0, 512, "assets/input/man.png", "assets/masks/square.png", examplePrompt5, "", 7.5, 10], [0.1, 256, "assets/input/family.jpg", "assets/masks/wave.png", examplePrompt6, "", 11.5, 30], [0.9, 9999, "assets/input/family.jpg", "assets/masks/maze.png", examplePrompt7, "", 17.5, 35],] inputExamples = gradio.Examples(examples, inputs=inferenceInputs, outputs=outputImage, fn=Diffuse) gradio.Markdown("This demonstration should initialize automatically from the default values, and run relatively well, but if the output is not an ideal reconstruction of your physical local space from your camera's perspective, then you should adjust the generator seed to take large walks across the latent space. In addition, the static latents can be disable to continously walk the latent space, and then it can be set to static again when a better region of the embedded space is found, but this will increase frame to fram non-determinism. You can also condition the generation using prompts to re-enforce or change aspects of the scene. **If you see a black image instead of a generated output image, then you are running into the safety checker.** This can trigger inconsistently even when the generated content is purely PG. If this happens, then increase the lighting of the scene and also increase the number of inference steps to improve the generated predicition to reduce the likelihood of the saftey checker triggering a false positive.") #inputs=[latentWalk, staticLatents, generatorSeed, inputImage, mask, pauseInference, prompt, negativePrompt, guidanceScale, numInferenceSteps] #ux = gradio.Interface(fn=diffuse, title="View Diffusion", article=article, description=description, inputs=inputs, outputs=outputImage, examples=inputExamples, live=True) print("Launching Demo") ux.launch()