Spaces:
Building
on
A10G
Building
on
A10G
#This is an example that uses the websockets api to know when a prompt execution is done | |
#Once the prompt execution is done it downloads the images using the /history endpoint | |
import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client) | |
import uuid | |
import json | |
import urllib.request | |
import urllib.parse | |
server_address = "127.0.0.1:8188" | |
client_id = str(uuid.uuid4()) | |
def queue_prompt(prompt): | |
p = {"prompt": prompt, "client_id": client_id} | |
data = json.dumps(p).encode('utf-8') | |
req = urllib.request.Request("http://{}/prompt".format(server_address), data=data) | |
return json.loads(urllib.request.urlopen(req).read()) | |
def get_image(filename, subfolder, folder_type): | |
data = {"filename": filename, "subfolder": subfolder, "type": folder_type} | |
url_values = urllib.parse.urlencode(data) | |
with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response: | |
return response.read() | |
def get_history(prompt_id): | |
with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response: | |
return json.loads(response.read()) | |
def get_images(ws, prompt): | |
prompt_id = queue_prompt(prompt)['prompt_id'] | |
output_images = {} | |
while True: | |
out = ws.recv() | |
if isinstance(out, str): | |
message = json.loads(out) | |
if message['type'] == 'executing': | |
data = message['data'] | |
if data['node'] is None and data['prompt_id'] == prompt_id: | |
break #Execution is done | |
else: | |
continue #previews are binary data | |
history = get_history(prompt_id)[prompt_id] | |
for o in history['outputs']: | |
for node_id in history['outputs']: | |
node_output = history['outputs'][node_id] | |
if 'images' in node_output: | |
images_output = [] | |
for image in node_output['images']: | |
image_data = get_image(image['filename'], image['subfolder'], image['type']) | |
images_output.append(image_data) | |
output_images[node_id] = images_output | |
return output_images | |
prompt_text = """ | |
{ | |
"3": { | |
"class_type": "KSampler", | |
"inputs": { | |
"cfg": 8, | |
"denoise": 1, | |
"latent_image": [ | |
"5", | |
0 | |
], | |
"model": [ | |
"4", | |
0 | |
], | |
"negative": [ | |
"7", | |
0 | |
], | |
"positive": [ | |
"6", | |
0 | |
], | |
"sampler_name": "euler", | |
"scheduler": "normal", | |
"seed": 8566257, | |
"steps": 20 | |
} | |
}, | |
"4": { | |
"class_type": "CheckpointLoaderSimple", | |
"inputs": { | |
"ckpt_name": "v1-5-pruned-emaonly.ckpt" | |
} | |
}, | |
"5": { | |
"class_type": "EmptyLatentImage", | |
"inputs": { | |
"batch_size": 1, | |
"height": 512, | |
"width": 512 | |
} | |
}, | |
"6": { | |
"class_type": "CLIPTextEncode", | |
"inputs": { | |
"clip": [ | |
"4", | |
1 | |
], | |
"text": "masterpiece best quality girl" | |
} | |
}, | |
"7": { | |
"class_type": "CLIPTextEncode", | |
"inputs": { | |
"clip": [ | |
"4", | |
1 | |
], | |
"text": "bad hands" | |
} | |
}, | |
"8": { | |
"class_type": "VAEDecode", | |
"inputs": { | |
"samples": [ | |
"3", | |
0 | |
], | |
"vae": [ | |
"4", | |
2 | |
] | |
} | |
}, | |
"9": { | |
"class_type": "SaveImage", | |
"inputs": { | |
"filename_prefix": "ComfyUI", | |
"images": [ | |
"8", | |
0 | |
] | |
} | |
} | |
} | |
""" | |
prompt = json.loads(prompt_text) | |
#set the text prompt for our positive CLIPTextEncode | |
prompt["6"]["inputs"]["text"] = "masterpiece best quality man" | |
#set the seed for our KSampler node | |
prompt["3"]["inputs"]["seed"] = 5 | |
ws = websocket.WebSocket() | |
ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) | |
images = get_images(ws, prompt) | |
#Commented out code to display the output images: | |
# for node_id in images: | |
# for image_data in images[node_id]: | |
# from PIL import Image | |
# import io | |
# image = Image.open(io.BytesIO(image_data)) | |
# image.show() | |