|
import json |
|
import requests |
|
import io |
|
import base64 |
|
from PIL import Image, PngImagePlugin |
|
|
|
import asyncio |
|
import httpx |
|
|
|
|
|
from io import BytesIO |
|
import prompt_shortcut |
|
|
|
|
|
def img_2_b64(image): |
|
buff = BytesIO() |
|
image.save(buff, format="PNG") |
|
img_byte = base64.b64encode(buff.getvalue()) |
|
img_str = img_byte.decode("utf-8") |
|
return img_str |
|
|
|
import time |
|
import serverHelper |
|
import metadata_to_json |
|
|
|
def b64_2_img(base64_image): |
|
image = Image.open(io.BytesIO(base64.b64decode(base64_image.split(",",1)[0]))) |
|
return image |
|
|
|
def reserveBorderPixels(img,dilation_img): |
|
pixels = img.load() |
|
width, height = img.size |
|
dilation_pixels = dilation_img.load() |
|
all_pixels = [] |
|
depth = 20 |
|
for x in range(width): |
|
for d in range(depth): |
|
dilation_pixels[x,d] = pixels[x, d] |
|
dilation_pixels[x,height-(d+1)] = pixels[x, height-(d+1)] |
|
|
|
for y in range(height): |
|
for d in range(depth): |
|
dilation_pixels[d,y] = pixels[d,y] |
|
dilation_pixels[width-(d+1),y] = pixels[width-(d+1), y] |
|
return dilation_img |
|
|
|
def maskExpansion(mask_img,mask_expansion): |
|
|
|
|
|
|
|
|
|
|
|
|
|
iteration = mask_expansion |
|
dilated_img = applyDilation(mask_img,iteration) |
|
mask_with_border = reserveBorderPixels(mask_img,dilated_img) |
|
mask_with_border = mask_with_border.filter(ImageFilter.GaussianBlur(radius = 10)) |
|
return mask_with_border |
|
|
|
async def base64ToPng(base64_image,image_path): |
|
base64_img_bytes = base64_image.encode('utf-8') |
|
with open(image_path, 'wb') as file_to_save: |
|
decoded_image_data = base64.decodebytes(base64_img_bytes) |
|
file_to_save.write(decoded_image_data) |
|
|
|
|
|
|
|
|
|
from PIL import Image, ImageFilter |
|
def applyDilation(img,iteration=20,max_filter=3): |
|
|
|
dilation_img = img.copy() |
|
|
|
|
|
for i in range(iteration): |
|
dilation_img = dilation_img.filter(ImageFilter.MaxFilter(max_filter)) |
|
return dilation_img |
|
|
|
|
|
|
|
async def img2ImgRequest(sd_url,payload): |
|
|
|
print("payload debug:",payload) |
|
|
|
if(payload['use_prompt_shortcut']): |
|
|
|
prompt_shortcut_dict = prompt_shortcut.load() |
|
prompt_shortcut_dict.update(payload["prompt_shortcut_ui_dict"]) |
|
payload['prompt'] = prompt_shortcut.replaceShortcut(payload['prompt'],prompt_shortcut_dict) |
|
|
|
payload['negative_prompt'] = prompt_shortcut.replaceShortcut(payload['negative_prompt'],prompt_shortcut_dict) |
|
|
|
init_img_dir = "./init_images" |
|
init_img_name = payload['init_image_name'] |
|
init_img = Image.open(f"{init_img_dir}/{init_img_name}") |
|
init_img_str = img_2_b64(init_img) |
|
payload['init_images'] = [init_img_str] |
|
|
|
|
|
|
|
init_img_mask_name = payload.get('init_image_mask_name',"") |
|
|
|
|
|
|
|
if(len(init_img_mask_name) > 0): |
|
init_img_mask = Image.open(f"{init_img_dir}/{init_img_mask_name}") |
|
|
|
if(payload['use_sharp_mask'] == False): |
|
iteration = payload['mask_expansion'] |
|
init_img_mask = applyDilation(init_img_mask,iteration) |
|
|
|
init_img_mask_str = img_2_b64(init_img_mask) |
|
payload['mask'] = init_img_mask_str |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(type(init_img_str)) |
|
|
|
async with httpx.AsyncClient() as client: |
|
response = await client.post(url=f'{sd_url}/sdapi/v1/img2img', json=payload, timeout=None) |
|
|
|
r = response.json() |
|
|
|
|
|
|
|
|
|
uniqueDocumentId = payload['uniqueDocumentId'] |
|
dir_fullpath,dirName = serverHelper.getUniqueDocumentDirPathName(uniqueDocumentId) |
|
serverHelper.createFolder(dir_fullpath) |
|
image_paths = [] |
|
metadata = [] |
|
images_info = [] |
|
|
|
for i in r['images']: |
|
image = Image.open(io.BytesIO(base64.b64decode(i.split(",",1)[0]))) |
|
|
|
png_payload = { |
|
"image": "data:image/png;base64," + i |
|
} |
|
response2 = await client.post(url=f'{sd_url}/sdapi/v1/png-info', json=png_payload, timeout=None) |
|
pnginfo = PngImagePlugin.PngInfo() |
|
pnginfo.add_text("parameters", response2.json().get("info")) |
|
image_name = f'output- {time.time()}.png' |
|
image_path = f'output/{dirName}/{image_name}' |
|
image_paths.append(image_path) |
|
image.save(f'./{image_path}', pnginfo=pnginfo) |
|
|
|
metadata_info = response2.json().get("info") |
|
metadata_json = metadata_to_json.convertMetadataToJson(metadata_info) |
|
metadata.append(metadata_json) |
|
images_info.append({"base64":i,"path":image_path}) |
|
print("metadata_json: ", metadata_json) |
|
|
|
return dirName,images_info,metadata |
|
|
|
if __name__=="__main__": |
|
img2ImgRequest() |