# Copyright (c) OpenMMLab. All rights reserved. import json import numpy as np import torch # triton_python_backend_utils is available in every Triton Python model. You # need to use this module to create inference requests and responses. It also # contains some utility functions for extracting information from model_config # and converting Triton input/output types to numpy types. import triton_python_backend_utils as pb_utils from diffusers import (StableDiffusionXLPipeline, AutoencoderKL, ControlNetModel, StableDiffusionXLImg2ImgPipeline, StableDiffusionXLControlNetPipeline, StableDiffusionXLControlNetImg2ImgPipeline, StableDiffusionPipeline) from diffusers.utils import load_image from PIL import Image def prepare_tpose_image(img): tpose_img_ratio = {} padding_color = (0, 0, 0) # img0 padded_image = Image.new(img.mode, (1024, 768), padding_color) img768 = img.resize((768,768)) padded_image.paste(img768, ((1024 - 768) // 2, 0)) tpose_img_ratio[0] = padded_image # img1 img800 = img.resize((800, 800)) tpose_img_ratio[1] = img800 # img2 padded_image = Image.new(img.mode, (600, 800), padding_color) img600 = img.resize((600, 600)) padded_image.paste(img600, (0, (800 - 600) // 2)) tpose_img_ratio[2] = padded_image # img3 padded_image = Image.new(img.mode, (1024, 576), padding_color) img576 = img.resize((576, 576)) padded_image.paste(img576, ((1024 - 576) // 2, 0)) tpose_img_ratio[3] = padded_image # img4 padded_image = Image.new(img.mode, (448, 800), padding_color) img448 = img.resize((448, 448)) padded_image.paste(img448, (0, (800 - 448) // 2)) tpose_img_ratio[4] = padded_image # img5 padded_image = Image.new(img.mode, (1024, 680), padding_color) img576 = img.resize((680, 680)) padded_image.paste(img576, ((1024 - 680) // 2, 0)) tpose_img_ratio[5] = padded_image # img6 padded_image = Image.new(img.mode, (528, 800), padding_color) img448 = img.resize((528, 528)) padded_image.paste(img448, (0, (800 - 528) // 2)) tpose_img_ratio[6] = padded_image return tpose_img_ratio class TritonPythonModel: """Your Python model must use the same class name. Every Python model that is created must have "TritonPythonModel" as the class name. """ def initialize(self, args): """`initialize` is called only once when the model is being loaded. Implementing `initialize` function is optional. This function allows the model to initialize any state associated with this model. Parameters ---------- args : dict Both keys and values are strings. The dictionary keys and values are: * model_config: A JSON string containing the model configuration * model_instance_kind: A string containing model instance kind * model_instance_device_id: A string containing model instance device ID * model_repository: Model repository path * model_version: Model version * model_name: Model name """ print(args) # You must parse model_config. JSON string is not parsed here self.model_config = json.loads(args['model_config']) weight_dtype = torch.float16 # pose control self.controlnet = ControlNetModel.from_pretrained("/nvme/shared/huggingface_hub/models/controlnet-openpose-sdxl-1.0", torch_dtype=weight_dtype) self.controlnet = self.controlnet.to(f"cuda:{args['model_instance_device_id']}") self.tpose_image = load_image('/nvme/liuwenran/repos/magicmaker2-image-generation/data/t-pose.jpg') # anime style anime_ckpt_dir = '/nvme/shared/civitai_models/ckpts/models--gsdf--CounterfeitXL/snapshots/4708675873bd09833aabc3fd4cb2de5fcd1726ac' self.pipeline_anime = StableDiffusionXLPipeline.from_pretrained( anime_ckpt_dir, torch_dtype=weight_dtype ) self.pipeline_anime = self.pipeline_anime.to(f"cuda:{args['model_instance_device_id']}") # realistic style realistic_ckpt_dir = '/nvme/shared/civitai_models/ckpt_save_pretrained/copaxTimelessxlSDXL1_v8' self.pipeline_realistic = StableDiffusionXLPipeline.from_pretrained( realistic_ckpt_dir, torch_dtype=weight_dtype ) self.pipeline_realistic = self.pipeline_realistic.to(f"cuda:{args['model_instance_device_id']}") # dim3 for oil painting style and sketch dim3_ckpt_dir = '/nvme/shared/civitai_models/ckpt_save_pretrained/protovisionXLHighFidelity3D_release0630Bakedvae' self.pipeline_oil_painting = StableDiffusionXLPipeline.from_pretrained( dim3_ckpt_dir, torch_dtype=weight_dtype ) oil_painting_lora_dir = '/nvme/shared/civitai_models/loras/ClassipeintXL1.9.safetensors' self.pipeline_oil_painting.load_lora_weights(oil_painting_lora_dir) self.pipeline_oil_painting = self.pipeline_oil_painting.to(f"cuda:{args['model_instance_device_id']}") # sd xl base # pretrained_model_name_or_path = "stabilityai/stable-diffusion-xl-base-1.0" pretrained_model_name_or_path = '/nvme/shared/huggingface_hub/huggingface/hub/models--stabilityai--stable-diffusion-xl-base-1.0/snapshots/76d28af79639c28a79fa5c6c6468febd3490a37e' # vae_path = "madebyollin/sdxl-vae-fp16-fix" vae_path = '/nvme/shared/huggingface_hub/huggingface/hub/models--madebyollin--sdxl-vae-fp16-fix/snapshots/4df413ca49271c25289a6482ab97a433f8117d15' vae = AutoencoderKL.from_pretrained( vae_path, torch_dtype=weight_dtype, ) # guofeng style guofeng_lora_dir = '/nvme/shared/civitai_models/loras/minimalism.safetensors' self.pipeline_guofeng = StableDiffusionXLPipeline.from_pretrained( pretrained_model_name_or_path, vae=vae, torch_dtype=weight_dtype ) self.pipeline_guofeng.load_lora_weights(guofeng_lora_dir) self.pipeline_guofeng = self.pipeline_guofeng.to(f"cuda:{args['model_instance_device_id']}") # manghe style manghe_lora_dir = '/nvme/shared/civitai_models/loras/mengwa.safetensors' self.pipeline_manghe = StableDiffusionXLPipeline.from_pretrained( pretrained_model_name_or_path, vae=vae, torch_dtype=weight_dtype ) self.pipeline_manghe.load_lora_weights(manghe_lora_dir) self.pipeline_manghe = self.pipeline_manghe.to(f"cuda:{args['model_instance_device_id']}") self.ratio_dict = { 0: (1024, 768), 1: (800, 800), 2: (600, 800), 3: (1024, 576), 4: (448, 800), 5: (1024, 680), 6: (528, 800) } self.tpose_image_ratio = prepare_tpose_image(self.tpose_image) sd15_dir = '/nvme/shared/stable-diffusion-v1-5' self.sd15 = StableDiffusionPipeline.from_pretrained(sd15_dir) self.sd15 = self.sd15.to(f"cuda:{args['model_instance_device_id']}") def execute(self, requests): """`execute` must be implemented in every Python model. `execute` function receives a list of pb_utils.InferenceRequest as the only argument. This function is called when an inference is requested for this model. Depending on the batching configuration (e.g. Dynamic Batching) used, `requests` may contain multiple requests. Every Python model, must create one pb_utils.InferenceResponse for every pb_utils.InferenceRequest in `requests`. If there is an error, you can set the error argument when creating a pb_utils.InferenceResponse. Parameters ---------- requests : list A list of pb_utils.InferenceRequest Returns ------- list A list of pb_utils.InferenceResponse. The length of this list must be the same as `requests` """ responses = [] # Every Python backend must iterate over everyone of the requests # and create a pb_utils.InferenceResponse for each of them. for request in requests: # Get INPUT prompt = pb_utils.get_input_tensor_by_name(request, 'PROMPT').as_numpy() prompt = prompt.item().decode('utf-8') style = pb_utils.get_input_tensor_by_name(request,'STYLE').as_numpy() style = style.item().decode('utf-8') ref_img = pb_utils.get_input_tensor_by_name(request,'REFIMAGE').as_numpy() tpose = pb_utils.get_input_tensor_by_name(request,'TPOSE').as_numpy() ratio = pb_utils.get_input_tensor_by_name(request,'RATIO').as_numpy() print(f"prompt:{prompt} style:{style} ref_img:{ref_img.shape} tpose:{tpose} ratio:{ratio}") tpose = tpose[0] pipeline_infer = self.pipeline_anime # load lora if style == 'manghe': pipeline_infer = self.pipeline_manghe prompt = 'chibi,' + prompt elif style == 'guofeng': pipeline_infer = self.pipeline_guofeng prompt = 'minimalist style, Flat illustration, Chinese style,' + prompt elif style == 'xieshi': pipeline_infer = self.pipeline_realistic elif style == 'youhua': pipeline_infer = self.pipeline_oil_painting prompt = 'oil painting,' + prompt elif style == 'chahua': pipeline_infer = self.pipeline_realistic prompt = 'sketch, sketch painting,' + prompt prompt_to_append = ', best quality, extremely detailed, perfect, 8k, masterpeice' prompt = prompt + prompt_to_append negative_prompt = 'nude' # use img2img pipeline to infer ref img if ref_img.shape != (1,1,3): if tpose: pipeline_infer = StableDiffusionXLControlNetImg2ImgPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2, pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, self.controlnet, pipeline_infer.scheduler) else: pipeline_infer = StableDiffusionXLImg2ImgPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2, pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, pipeline_infer.scheduler) else: if tpose: pipeline_infer = StableDiffusionXLControlNetPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2, pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, self.controlnet, pipeline_infer.scheduler) else: pipeline_infer = StableDiffusionXLPipeline(pipeline_infer.vae, pipeline_infer.text_encoder, pipeline_infer.text_encoder_2, pipeline_infer.tokenizer, pipeline_infer.tokenizer_2, pipeline_infer.unet, pipeline_infer.scheduler) ratio_type = ratio[0] width, height = self.ratio_dict[ratio_type] controlnet_conditioning_scale = 1.0 if ref_img.shape != (1, 1, 3): init_image = Image.fromarray(ref_img) if tpose: image = pipeline_infer(prompt, negative_prompt=negative_prompt, controlnet_conditioning_scale=controlnet_conditioning_scale, image=init_image.resize((width, height)), control_image=self.tpose_image_ratio[ratio_type], strength=0.5).images[0] else: image = pipeline_infer(prompt, negative_prompt=negative_prompt, image=init_image, width=width, height=height, strength=0.5).images[0] else: if tpose: image = pipeline_infer(prompt, negative_prompt=negative_prompt, controlnet_conditioning_scale=controlnet_conditioning_scale, image=self.tpose_image_ratio[ratio_type]).images[0] else: image = pipeline_infer(prompt, negative_prompt=negative_prompt, num_inference_steps=25, width=width, height=height).images[0] image_np = np.array(image).astype(np.float32) / 255.0 image_pt = torch.from_numpy(image_np.transpose(2, 0, 1)).unsqueeze(0) image_pt = image_pt.to('cuda') check_res, nsfw = self.sd15.run_safety_checker(image_pt, 'cuda', torch.float32) if nsfw[0]: image = Image.new("RGB", image.size, (0, 0, 0)) image = np.array(image).astype(np.uint8) print(f"final result: {image.shape}, [{np.min(image)}-{np.max(image)}]") # Create output tensors. You need pb_utils.Tensor # objects to create pb_utils.InferenceResponse. out_tensor = pb_utils.Tensor('OUTPUT', image) # Create InferenceResponse. You can set an error here in case # there was a problem with handling this inference request. # Below is an example of how you can set errors in inference # response: # # pb_utils.InferenceResponse( # output_tensors=..., TritonError("An error occurred")) inference_response = pb_utils.InferenceResponse( output_tensors=[out_tensor]) responses.append(inference_response) # You should return a list of pb_utils.InferenceResponse. Length # of this list must match the length of `requests` list. return responses def finalize(self): """`finalize` is called only once when the model is being unloaded. Implementing `finalize` function is optional. This function allows the model to perform any necessary clean ups before exit. """ print('Cleaning up...')