import os import socket import requests from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import FileResponse from PIL import Image import torch from diffusers import ( DiffusionPipeline, AutoencoderKL, StableDiffusionControlNetPipeline, ControlNetModel, StableDiffusionLatentUpscalePipeline, StableDiffusionImg2ImgPipeline, StableDiffusionControlNetImg2ImgPipeline, DPMSolverMultistepScheduler, EulerDiscreteScheduler ) import random import time import tempfile app = FastAPI() BASE_MODEL = "SG161222/Realistic_Vision_V5.1_noVAE" # Initialize both pipelines vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=torch.float16) controlnet = ControlNetModel.from_pretrained("monster-labs/control_v1p_sd15_qrcode_monster", torch_dtype=torch.float16) main_pipe = StableDiffusionControlNetPipeline.from_pretrained( BASE_MODEL, controlnet=controlnet, vae=vae, safety_checker=None, torch_dtype=torch.float16, ).to("cuda") image_pipe = StableDiffusionControlNetImg2ImgPipeline(**main_pipe.components) # Sampler map SAMPLER_MAP = { "DPM++ Karras SDE": lambda config: DPMSolverMultistepScheduler.from_config(config, use_karras=True, algorithm_type="sde-dpmsolver++"), "Euler": lambda config: EulerDiscreteScheduler.from_config(config), } def center_crop_resize(img, output_size=(512, 512)): width, height = img.size # Calculate dimensions to crop to the center new_dimension = min(width, height) left = (width - new_dimension)/2 top = (height - new_dimension)/2 right = (width + new_dimension)/2 bottom = (height + new_dimension)/2 # Crop and resize img = img.crop((left, top, right, bottom)) img = img.resize(output_size) return img def common_upscale(samples, width, height, upscale_method, crop=False): if crop == "center": old_width = samples.shape[3] old_height = samples.shape[2] old_aspect = old_width / old_height new_aspect = width / height x = 0 y = 0 if old_aspect > new_aspect: x = round((old_width - old_width * (new_aspect / old_aspect)) / 2) elif old_aspect < new_aspect: y = round((old_height - old_height * (old_aspect / new_aspect)) / 2) s = samples[:,:,y:old_height-y,x:old_width-x] else: s = samples return torch.nn.functional.interpolate(s, size=(height, width), mode=upscale_method) def upscale(samples, upscale_method, scale_by): #s = samples.copy() width = round(samples["images"].shape[3] * scale_by) height = round(samples["images"].shape[2] * scale_by) s = common_upscale(samples["images"], width, height, upscale_method, "disabled") return (s) # def convert_to_pil(base64_image): pil_image = processing_utils.decode_base64_to_image(base64_image) return pil_image def convert_to_base64(pil_image): base64_image = processing_utils.encode_pil_to_base64(pil_image) return base64_image # Inference function def inference( control_image: Image.Image, prompt: str, negative_prompt: str, guidance_scale: float = 8.0, controlnet_conditioning_scale: float = 1, control_guidance_start: float = 1, control_guidance_end: float = 1, upscaler_strength: float = 0.5, seed: int = -1, sampler = "DPM++ Karras SDE", #profile: gr.OAuthProfile | None = None, ): start_time = time.time() start_time_struct = time.localtime(start_time) start_time_formatted = time.strftime("%H:%M:%S", start_time_struct) print(f"Inference started at {start_time_formatted}") # Generate the initial image #init_image = init_pipe(prompt).images[0] # Rest of your existing code control_image_small = center_crop_resize(control_image) control_image_large = center_crop_resize(control_image, (1024, 1024)) main_pipe.scheduler = SAMPLER_MAP[sampler](main_pipe.scheduler.config) my_seed = random.randint(0, 2**32 - 1) if seed == -1 else seed generator = torch.Generator(device="cuda").manual_seed(my_seed) out = main_pipe( prompt=prompt, negative_prompt=negative_prompt, image=control_image_small, guidance_scale=float(guidance_scale), controlnet_conditioning_scale=float(controlnet_conditioning_scale), generator=generator, control_guidance_start=float(control_guidance_start), control_guidance_end=float(control_guidance_end), num_inference_steps=15, output_type="latent" ) upscaled_latents = upscale(out, "nearest-exact", 2) out_image = image_pipe( prompt=prompt, negative_prompt=negative_prompt, control_image=control_image_large, image=upscaled_latents, guidance_scale=float(guidance_scale), generator=generator, num_inference_steps=20, strength=upscaler_strength, control_guidance_start=float(control_guidance_start), control_guidance_end=float(control_guidance_end), controlnet_conditioning_scale=float(controlnet_conditioning_scale) ) end_time = time.time() end_time_struct = time.localtime(end_time) end_time_formatted = time.strftime("%H:%M:%S", end_time_struct) print(f"Inference ended at {end_time_formatted}, taking {end_time-start_time}s") # Save image + metadata user_history.save_image( label=prompt, image=out_image["images"][0], profile=profile, metadata={ "prompt": prompt, "negative_prompt": negative_prompt, "guidance_scale": guidance_scale, "controlnet_conditioning_scale": controlnet_conditioning_scale, "control_guidance_start": control_guidance_start, "control_guidance_end": control_guidance_end, "upscaler_strength": upscaler_strength, "seed": seed, "sampler": sampler, }, ) return out_image["images"][0], my_seed import os def generate_image_from_parameters(prompt, guidance_scale, controlnet_scale, controlnet_end, upscaler_strength, seed, sampler_type, image): try: # Save the uploaded image to a temporary file temp_image_path = f"/tmp/{int(time.time())}_{image.filename}" with open(temp_image_path, "wb") as temp_image: temp_image.write(image.file.read()) # Open the uploaded image using PIL control_image = Image.open(temp_image_path) # Call existing inference function with the provided parameters generated_image, _, _, _ = inference(control_image, prompt, "", guidance_scale, controlnet_scale, 0, controlnet_end, upscaler_strength, seed, sampler_type) # Specify the desired output directory for saving generated images output_directory = "/home/user/app/generated_files" # Create the output directory if it doesn't exist os.makedirs(output_directory, exist_ok=True) # Generate a unique filename for the saved image filename = f"generated_image_{int(time.time())}.png" # Save the generated image to the permanent location output_path = os.path.join(output_directory, filename) generated_image.save(output_path, format="PNG") # Return the generated image path return output_path except Exception as e: # Handle exceptions and return an error message if something goes wrong return str(e) @app.post("/generate_image") async def generate_image( prompt: str = Form(...), guidance_scale: float = Form(...), controlnet_scale: float = Form(...), controlnet_end: float = Form(...), upscaler_strength: float = Form(...), seed: int = Form(...), sampler_type: str = Form(...), image: UploadFile = File(...) ): try: # Save the uploaded image to a temporary file temp_image_path = f"/tmp/{int(time.time())}_{image.filename}" with open(temp_image_path, "wb") as temp_image: temp_image.write(image.file.read()) # Open the uploaded image using PIL control_image = Image.open(temp_image_path) # Call existing inference function with the provided parameters generated_image, _, _, _ = inference(control_image, prompt, "", guidance_scale, controlnet_scale, 0, controlnet_end, upscaler_strength, seed, sampler_type) # Specify the desired output directory for saving generated images output_directory = "/home/user/app/generated_files" # Create the output directory if it doesn't exist os.makedirs(output_directory, exist_ok=True) # Generate a unique filename for the saved image filename = f"generated_image_{int(time.time())}.png" # Save the generated image to the permanent location output_path = os.path.join(output_directory, filename) generated_image.save(output_path, format="PNG") # Return the generated image path return output_path except Exception as e: # Handle exceptions and return an error message if something goes wrong return str(e) if __name__ == "__main__": import uvicorn # Get internal IP address internal_ip = socket.gethostbyname(socket.gethostname()) # Get public IP address using a public API (this may not work if you are behind a router/NAT) try: public_ip = requests.get("http://api.ipify.org").text except requests.RequestException: public_ip = "Not Available" print(f"Internal URL: http://{internal_ip}:8000") print(f"Public URL: http://{public_ip}:8000") uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)