Spaces:
Running
on
Zero
Running
on
Zero
| import io | |
| from inspect import cleandoc | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from comfy.comfy_types.node_typing import IO, ComfyNodeABC, InputTypeDict | |
| from comfy_api_nodes.apis import ( | |
| OpenAIImageGenerationRequest, | |
| OpenAIImageEditRequest, | |
| OpenAIImageGenerationResponse, | |
| ) | |
| from comfy_api_nodes.apis.client import ( | |
| ApiEndpoint, | |
| HttpMethod, | |
| SynchronousOperation, | |
| ) | |
| from comfy_api_nodes.apinode_utils import ( | |
| downscale_image_tensor, | |
| validate_and_cast_response, | |
| validate_string, | |
| ) | |
| class OpenAIDalle2(ComfyNodeABC): | |
| """ | |
| Generates images synchronously via OpenAI's DALL路E 2 endpoint. | |
| """ | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls) -> InputTypeDict: | |
| return { | |
| "required": { | |
| "prompt": ( | |
| IO.STRING, | |
| { | |
| "multiline": True, | |
| "default": "", | |
| "tooltip": "Text prompt for DALL路E", | |
| }, | |
| ), | |
| }, | |
| "optional": { | |
| "seed": ( | |
| IO.INT, | |
| { | |
| "default": 0, | |
| "min": 0, | |
| "max": 2**31 - 1, | |
| "step": 1, | |
| "display": "number", | |
| "control_after_generate": True, | |
| "tooltip": "not implemented yet in backend", | |
| }, | |
| ), | |
| "size": ( | |
| IO.COMBO, | |
| { | |
| "options": ["256x256", "512x512", "1024x1024"], | |
| "default": "1024x1024", | |
| "tooltip": "Image size", | |
| }, | |
| ), | |
| "n": ( | |
| IO.INT, | |
| { | |
| "default": 1, | |
| "min": 1, | |
| "max": 8, | |
| "step": 1, | |
| "display": "number", | |
| "tooltip": "How many images to generate", | |
| }, | |
| ), | |
| "image": ( | |
| IO.IMAGE, | |
| { | |
| "default": None, | |
| "tooltip": "Optional reference image for image editing.", | |
| }, | |
| ), | |
| "mask": ( | |
| IO.MASK, | |
| { | |
| "default": None, | |
| "tooltip": "Optional mask for inpainting (white areas will be replaced)", | |
| }, | |
| ), | |
| }, | |
| "hidden": { | |
| "auth_token": "AUTH_TOKEN_COMFY_ORG", | |
| "comfy_api_key": "API_KEY_COMFY_ORG", | |
| "unique_id": "UNIQUE_ID", | |
| }, | |
| } | |
| RETURN_TYPES = (IO.IMAGE,) | |
| FUNCTION = "api_call" | |
| CATEGORY = "api node/image/OpenAI" | |
| DESCRIPTION = cleandoc(__doc__ or "") | |
| API_NODE = True | |
| def api_call( | |
| self, | |
| prompt, | |
| seed=0, | |
| image=None, | |
| mask=None, | |
| n=1, | |
| size="1024x1024", | |
| unique_id=None, | |
| **kwargs | |
| ): | |
| validate_string(prompt, strip_whitespace=False) | |
| model = "dall-e-2" | |
| path = "/proxy/openai/images/generations" | |
| content_type = "application/json" | |
| request_class = OpenAIImageGenerationRequest | |
| img_binary = None | |
| if image is not None and mask is not None: | |
| path = "/proxy/openai/images/edits" | |
| content_type = "multipart/form-data" | |
| request_class = OpenAIImageEditRequest | |
| input_tensor = image.squeeze().cpu() | |
| height, width, channels = input_tensor.shape | |
| rgba_tensor = torch.ones(height, width, 4, device="cpu") | |
| rgba_tensor[:, :, :channels] = input_tensor | |
| if mask.shape[1:] != image.shape[1:-1]: | |
| raise Exception("Mask and Image must be the same size") | |
| rgba_tensor[:, :, 3] = 1 - mask.squeeze().cpu() | |
| rgba_tensor = downscale_image_tensor(rgba_tensor.unsqueeze(0)).squeeze() | |
| image_np = (rgba_tensor.numpy() * 255).astype(np.uint8) | |
| img = Image.fromarray(image_np) | |
| img_byte_arr = io.BytesIO() | |
| img.save(img_byte_arr, format="PNG") | |
| img_byte_arr.seek(0) | |
| img_binary = img_byte_arr # .getvalue() | |
| img_binary.name = "image.png" | |
| elif image is not None or mask is not None: | |
| raise Exception("Dall-E 2 image editing requires an image AND a mask") | |
| # Build the operation | |
| operation = SynchronousOperation( | |
| endpoint=ApiEndpoint( | |
| path=path, | |
| method=HttpMethod.POST, | |
| request_model=request_class, | |
| response_model=OpenAIImageGenerationResponse, | |
| ), | |
| request=request_class( | |
| model=model, | |
| prompt=prompt, | |
| n=n, | |
| size=size, | |
| seed=seed, | |
| ), | |
| files=( | |
| { | |
| "image": img_binary, | |
| } | |
| if img_binary | |
| else None | |
| ), | |
| content_type=content_type, | |
| auth_kwargs=kwargs, | |
| ) | |
| response = operation.execute() | |
| img_tensor = validate_and_cast_response(response, node_id=unique_id) | |
| return (img_tensor,) | |
| class OpenAIDalle3(ComfyNodeABC): | |
| """ | |
| Generates images synchronously via OpenAI's DALL路E 3 endpoint. | |
| """ | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls) -> InputTypeDict: | |
| return { | |
| "required": { | |
| "prompt": ( | |
| IO.STRING, | |
| { | |
| "multiline": True, | |
| "default": "", | |
| "tooltip": "Text prompt for DALL路E", | |
| }, | |
| ), | |
| }, | |
| "optional": { | |
| "seed": ( | |
| IO.INT, | |
| { | |
| "default": 0, | |
| "min": 0, | |
| "max": 2**31 - 1, | |
| "step": 1, | |
| "display": "number", | |
| "control_after_generate": True, | |
| "tooltip": "not implemented yet in backend", | |
| }, | |
| ), | |
| "quality": ( | |
| IO.COMBO, | |
| { | |
| "options": ["standard", "hd"], | |
| "default": "standard", | |
| "tooltip": "Image quality", | |
| }, | |
| ), | |
| "style": ( | |
| IO.COMBO, | |
| { | |
| "options": ["natural", "vivid"], | |
| "default": "natural", | |
| "tooltip": "Vivid causes the model to lean towards generating hyper-real and dramatic images. Natural causes the model to produce more natural, less hyper-real looking images.", | |
| }, | |
| ), | |
| "size": ( | |
| IO.COMBO, | |
| { | |
| "options": ["1024x1024", "1024x1792", "1792x1024"], | |
| "default": "1024x1024", | |
| "tooltip": "Image size", | |
| }, | |
| ), | |
| }, | |
| "hidden": { | |
| "auth_token": "AUTH_TOKEN_COMFY_ORG", | |
| "comfy_api_key": "API_KEY_COMFY_ORG", | |
| "unique_id": "UNIQUE_ID", | |
| }, | |
| } | |
| RETURN_TYPES = (IO.IMAGE,) | |
| FUNCTION = "api_call" | |
| CATEGORY = "api node/image/OpenAI" | |
| DESCRIPTION = cleandoc(__doc__ or "") | |
| API_NODE = True | |
| def api_call( | |
| self, | |
| prompt, | |
| seed=0, | |
| style="natural", | |
| quality="standard", | |
| size="1024x1024", | |
| unique_id=None, | |
| **kwargs | |
| ): | |
| validate_string(prompt, strip_whitespace=False) | |
| model = "dall-e-3" | |
| # build the operation | |
| operation = SynchronousOperation( | |
| endpoint=ApiEndpoint( | |
| path="/proxy/openai/images/generations", | |
| method=HttpMethod.POST, | |
| request_model=OpenAIImageGenerationRequest, | |
| response_model=OpenAIImageGenerationResponse, | |
| ), | |
| request=OpenAIImageGenerationRequest( | |
| model=model, | |
| prompt=prompt, | |
| quality=quality, | |
| size=size, | |
| style=style, | |
| seed=seed, | |
| ), | |
| auth_kwargs=kwargs, | |
| ) | |
| response = operation.execute() | |
| img_tensor = validate_and_cast_response(response, node_id=unique_id) | |
| return (img_tensor,) | |
| class OpenAIGPTImage1(ComfyNodeABC): | |
| """ | |
| Generates images synchronously via OpenAI's GPT Image 1 endpoint. | |
| """ | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls) -> InputTypeDict: | |
| return { | |
| "required": { | |
| "prompt": ( | |
| IO.STRING, | |
| { | |
| "multiline": True, | |
| "default": "", | |
| "tooltip": "Text prompt for GPT Image 1", | |
| }, | |
| ), | |
| }, | |
| "optional": { | |
| "seed": ( | |
| IO.INT, | |
| { | |
| "default": 0, | |
| "min": 0, | |
| "max": 2**31 - 1, | |
| "step": 1, | |
| "display": "number", | |
| "control_after_generate": True, | |
| "tooltip": "not implemented yet in backend", | |
| }, | |
| ), | |
| "quality": ( | |
| IO.COMBO, | |
| { | |
| "options": ["low", "medium", "high"], | |
| "default": "low", | |
| "tooltip": "Image quality, affects cost and generation time.", | |
| }, | |
| ), | |
| "background": ( | |
| IO.COMBO, | |
| { | |
| "options": ["opaque", "transparent"], | |
| "default": "opaque", | |
| "tooltip": "Return image with or without background", | |
| }, | |
| ), | |
| "size": ( | |
| IO.COMBO, | |
| { | |
| "options": ["auto", "1024x1024", "1024x1536", "1536x1024"], | |
| "default": "auto", | |
| "tooltip": "Image size", | |
| }, | |
| ), | |
| "n": ( | |
| IO.INT, | |
| { | |
| "default": 1, | |
| "min": 1, | |
| "max": 8, | |
| "step": 1, | |
| "display": "number", | |
| "tooltip": "How many images to generate", | |
| }, | |
| ), | |
| "image": ( | |
| IO.IMAGE, | |
| { | |
| "default": None, | |
| "tooltip": "Optional reference image for image editing.", | |
| }, | |
| ), | |
| "mask": ( | |
| IO.MASK, | |
| { | |
| "default": None, | |
| "tooltip": "Optional mask for inpainting (white areas will be replaced)", | |
| }, | |
| ), | |
| }, | |
| "hidden": { | |
| "auth_token": "AUTH_TOKEN_COMFY_ORG", | |
| "comfy_api_key": "API_KEY_COMFY_ORG", | |
| "unique_id": "UNIQUE_ID", | |
| }, | |
| } | |
| RETURN_TYPES = (IO.IMAGE,) | |
| FUNCTION = "api_call" | |
| CATEGORY = "api node/image/OpenAI" | |
| DESCRIPTION = cleandoc(__doc__ or "") | |
| API_NODE = True | |
| def api_call( | |
| self, | |
| prompt, | |
| seed=0, | |
| quality="low", | |
| background="opaque", | |
| image=None, | |
| mask=None, | |
| n=1, | |
| size="1024x1024", | |
| unique_id=None, | |
| **kwargs | |
| ): | |
| validate_string(prompt, strip_whitespace=False) | |
| model = "gpt-image-1" | |
| path = "/proxy/openai/images/generations" | |
| content_type="application/json" | |
| request_class = OpenAIImageGenerationRequest | |
| img_binaries = [] | |
| mask_binary = None | |
| files = [] | |
| if image is not None: | |
| path = "/proxy/openai/images/edits" | |
| request_class = OpenAIImageEditRequest | |
| content_type ="multipart/form-data" | |
| batch_size = image.shape[0] | |
| for i in range(batch_size): | |
| single_image = image[i : i + 1] | |
| scaled_image = downscale_image_tensor(single_image).squeeze() | |
| image_np = (scaled_image.numpy() * 255).astype(np.uint8) | |
| img = Image.fromarray(image_np) | |
| img_byte_arr = io.BytesIO() | |
| img.save(img_byte_arr, format="PNG") | |
| img_byte_arr.seek(0) | |
| img_binary = img_byte_arr | |
| img_binary.name = f"image_{i}.png" | |
| img_binaries.append(img_binary) | |
| if batch_size == 1: | |
| files.append(("image", img_binary)) | |
| else: | |
| files.append(("image[]", img_binary)) | |
| if mask is not None: | |
| if image is None: | |
| raise Exception("Cannot use a mask without an input image") | |
| if image.shape[0] != 1: | |
| raise Exception("Cannot use a mask with multiple image") | |
| if mask.shape[1:] != image.shape[1:-1]: | |
| raise Exception("Mask and Image must be the same size") | |
| batch, height, width = mask.shape | |
| rgba_mask = torch.zeros(height, width, 4, device="cpu") | |
| rgba_mask[:, :, 3] = 1 - mask.squeeze().cpu() | |
| scaled_mask = downscale_image_tensor(rgba_mask.unsqueeze(0)).squeeze() | |
| mask_np = (scaled_mask.numpy() * 255).astype(np.uint8) | |
| mask_img = Image.fromarray(mask_np) | |
| mask_img_byte_arr = io.BytesIO() | |
| mask_img.save(mask_img_byte_arr, format="PNG") | |
| mask_img_byte_arr.seek(0) | |
| mask_binary = mask_img_byte_arr | |
| mask_binary.name = "mask.png" | |
| files.append(("mask", mask_binary)) | |
| # Build the operation | |
| operation = SynchronousOperation( | |
| endpoint=ApiEndpoint( | |
| path=path, | |
| method=HttpMethod.POST, | |
| request_model=request_class, | |
| response_model=OpenAIImageGenerationResponse, | |
| ), | |
| request=request_class( | |
| model=model, | |
| prompt=prompt, | |
| quality=quality, | |
| background=background, | |
| n=n, | |
| seed=seed, | |
| size=size, | |
| ), | |
| files=files if files else None, | |
| content_type=content_type, | |
| auth_kwargs=kwargs, | |
| ) | |
| response = operation.execute() | |
| img_tensor = validate_and_cast_response(response, node_id=unique_id) | |
| return (img_tensor,) | |
| # A dictionary that contains all nodes you want to export with their names | |
| # NOTE: names should be globally unique | |
| NODE_CLASS_MAPPINGS = { | |
| "OpenAIDalle2": OpenAIDalle2, | |
| "OpenAIDalle3": OpenAIDalle3, | |
| "OpenAIGPTImage1": OpenAIGPTImage1, | |
| } | |
| # A dictionary that contains the friendly/humanly readable titles for the nodes | |
| NODE_DISPLAY_NAME_MAPPINGS = { | |
| "OpenAIDalle2": "OpenAI DALL路E 2", | |
| "OpenAIDalle3": "OpenAI DALL路E 3", | |
| "OpenAIGPTImage1": "OpenAI GPT Image 1", | |
| } | |