nightfury's picture
Update src/util.py
eeef25d verified
import concurrent.futures
import io
import os
import numpy as np
import oss2
import requests
from PIL import Image, ImageDraw, ImageFont
from .log import logger
# oss
access_key_id = os.getenv("ACCESS_KEY_ID")
access_key_secret = os.getenv("ACCESS_KEY_SECRET")
bucket_name = os.getenv("BUCKET_NAME")
endpoint = os.getenv("ENDPOINT")
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
oss_path = "nightfury.abc/ImageSynthesizerHF"
oss_path_img_gallery = "nightfury.abc/ImageSynthesizerHF_img_gallery"
def download_img_pil(index, img_url):
# print(img_url)
r = requests.get(img_url, stream=True)
if r.status_code == 200:
img = Image.open(io.BytesIO(r.content))
return (index, img)
else:
logger.error(f"Fail to download: {img_url}")
def download_images(img_urls, batch_size):
imgs_pil = [None] * batch_size
# worker_results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
to_do = []
for i, url in enumerate(img_urls):
future = executor.submit(download_img_pil, i, url)
to_do.append(future)
for future in concurrent.futures.as_completed(to_do):
ret = future.result()
# worker_results.append(ret)
index, img_pil = ret
imgs_pil[index] = img_pil # Arrange the URLs in order, which will be used to download the associated images or svg later.
return imgs_pil
def upload_np_2_oss(input_image, name="cache.png", gallery=False):
assert name.lower().endswith((".png", ".jpg")), name
imgByteArr = io.BytesIO()
if name.lower().endswith(".png"):
Image.fromarray(input_image).save(imgByteArr, format="PNG")
else:
Image.fromarray(input_image).save(imgByteArr, format="JPEG", quality=95)
imgByteArr = imgByteArr.getvalue()
if gallery:
path = oss_path_img_gallery
else:
path = oss_path
bucket.put_object(path + "/" + name, imgByteArr) # data is data, which can be a picture
ret = bucket.sign_url('GET', path + "/" + name, 60 * 60 * 24) # The return value is the link, and the parameters are, in order, method/file path on oss/expiration time (s)
del imgByteArr
return ret
def upload_json_string_2_oss(jsonStr, name="cache.txt", gallery=False):
if gallery:
path = oss_path_img_gallery
else:
path = oss_path
bucket.put_object(path + "/" + name, bytes(jsonStr, "utf-8")) # data is data
ret = bucket.sign_url('GET', path + "/" + name, 60 * 60 * 24) # The return value is the link, and the parameters are, in order, method/file path on oss/expiration time (s)
return ret
def upload_preprocess(pil_base_image_rgba, pil_layout_image_dict, pil_style_image_dict, pil_color_image_dict,
pil_fg_mask):
np_out_base_image = np_out_layout_image = np_out_style_image = np_out_color_image = None
if pil_base_image_rgba is not None:
np_fg_image = np.array(pil_base_image_rgba)[..., :3]
np_fg_mask = np.expand_dims(np.array(pil_fg_mask).astype(float), axis=-1) / 255.
np_fg_mask = np_fg_mask * 0.5 + 0.5
np_out_base_image = (np_fg_image * np_fg_mask + (1 - np_fg_mask) * np.array([0, 0, 255])).round().clip(0,
255).astype(
np.uint8)
if pil_layout_image_dict is not None:
np_layout_image = np.array(pil_layout_image_dict["image"].convert("RGBA"))
np_layout_image, np_layout_alpha = np_layout_image[..., :3], np_layout_image[..., 3]
np_layout_mask = np.array(pil_layout_image_dict["mask"].convert("L"))
np_layout_mask = ((np_layout_alpha > 127) * (np_layout_mask < 127)).astype(float)[..., None]
np_layout_mask = np_layout_mask * 0.5 + 0.5
np_out_layout_image = (
np_layout_image * np_layout_mask + (1 - np_layout_mask) * np.array([0, 0, 255])).round().clip(0,
255).astype(
np.uint8)
if pil_style_image_dict is not None:
np_style_image = np.array(pil_style_image_dict["image"].convert("RGBA"))
np_style_image, np_style_alpha = np_style_image[..., :3], np_style_image[..., 3]
np_style_mask = np.array(pil_style_image_dict["mask"].convert("L"))
np_style_mask = ((np_style_alpha > 127) * (np_style_mask < 127)).astype(float)[..., None]
np_style_mask = np_style_mask * 0.5 + 0.5
np_out_style_image = (
np_style_image * np_style_mask + (1 - np_style_mask) * np.array([0, 0, 255])).round().clip(0,
255).astype(
np.uint8)
if pil_color_image_dict is not None:
np_color_image = np.array(pil_color_image_dict["image"].convert("RGBA"))
np_color_image, np_color_alpha = np_color_image[..., :3], np_color_image[..., 3]
np_color_mask = np.array(pil_color_image_dict["mask"].convert("L"))
np_color_mask = ((np_color_alpha > 127) * (np_color_mask < 127)).astype(float)[..., None]
np_color_mask = np_color_mask * 0.5 + 0.5
np_out_color_image = (
np_color_image * np_color_mask + (1 - np_color_mask) * np.array([0, 0, 255])).round().clip(0,
255).astype(
np.uint8)
return np_out_base_image, np_out_layout_image, np_out_style_image, np_out_color_image
def pad_image(image, target_size):
iw, ih = image.size # Original image size
w, h = target_size # The size of the target image
scale = min(w / iw, h / ih) # minimum ratio to convert
# Guaranteed length or width, at least one size that fits the target image 0.5 guarantees rounding
nw = int(iw * scale + 0.5)
nh = int(ih * scale + 0.5)
image = image.resize((nw, nh), Image.BICUBIC) # Changing the image size, bi-legislative interpolation works well
new_image = Image.new('RGB', target_size, (255, 255, 255)) # generate white image
new_image.paste(image, ((w - nw) // 2, (h - nh) // 2)) # Style that fills the image with the middle image and black on both sides
return new_image
def add_text(image, text):
w, h = image.size
text_image = image.copy()
text_image_draw = ImageDraw.Draw(text_image)
ttf = ImageFont.truetype("assets/ttf/FreeMonoBold.ttf", int(h / 10))
left, top, right, bottom = ttf.getbbox(text)
text_image_draw.rectangle((0, 0, right + left, bottom + top), fill=(255, 255, 255))
image = Image.blend(image, text_image, 0.5)
image_draw = ImageDraw.Draw(image)
fillColor = (0, 0, 0, 255) # Text color: black
pos = (0, 0) # The position of the upper left corner of the text (distance from the left border, distance from the top border)
image_draw.text(pos, text, font=ttf, fill=fillColor)
return image.convert("RGB")
def compose_image(image_list, text_list, pil_size, nrow, ncol):
w, h = pil_size # Each small picture size
if len(image_list) > nrow * ncol:
raise ValueError("The parameters of the composite image and the required quantity do not match!")
assert len(image_list) == len(text_list)
new_image_list = []
new_text_list = []
for image, text in zip(image_list, text_list):
if image is not None:
new_image_list.append(image)
new_text_list.append(text)
if len(new_image_list) == 1:
ncol = nrow = 1
to_image = Image.new('RGB', (ncol * w, nrow * h), (255, 255, 255)) # Create a new diagram
for y in range(1, nrow + 1):
for x in range(1, ncol + 1):
if ncol * (y - 1) + x - 1 < len(new_image_list):
from_image = new_image_list[ncol * (y - 1) + x - 1].resize((w, h), Image.BICUBIC)
from_text = new_text_list[ncol * (y - 1) + x - 1]
if from_text is not None:
from_image = add_text(from_image, from_text)
to_image.paste(from_image, ((x - 1) * w, (y - 1) * h))
return to_image
def split_text_lines(text, max_w, ttf):
text_split_lines = []
text_h = 0
if text != "":
line_start = 0
while line_start < len(text):
line_count = 0
_, _, right, bottom = ttf.getbbox(text[line_start: line_start + line_count + 1])
while right < max_w and line_count < len(text):
line_count += 1
_, _, right, bottom = ttf.getbbox(text[line_start: line_start + line_count + 1])
text_split_lines.append(text[line_start:line_start + line_count])
text_h += bottom
line_start += line_count
return text_split_lines, text_h
def add_prompt(image, prompt, negative_prompt):
if prompt == "" and negative_prompt == "":
return image
if prompt != "":
prompt = "Prompt: " + prompt
if negative_prompt != "":
negative_prompt = "Negative prompt: " + negative_prompt
w, h = image.size
ttf = ImageFont.truetype("assets/ttf/AlibabaPuHuiTi-2-55-Regular.ttf", int(h / 20))
prompt_split_lines, prompt_h = split_text_lines(prompt, w, ttf)
negative_prompt_split_lines, negative_prompt_h = split_text_lines(negative_prompt, w, ttf)
text_h = prompt_h + negative_prompt_h
text = "\n".join(prompt_split_lines + negative_prompt_split_lines)
text_image = Image.new(image.mode, (w, text_h), color=(255, 255, 255))
text_image_draw = ImageDraw.Draw(text_image)
text_image_draw.text((0, 0), text, font=ttf, fill=(0, 0, 0))
out_image = Image.new(image.mode, (w, h + text_h), color=(255, 255, 255))
out_image.paste(image, (0, 0))
out_image.paste(text_image, (0, h))
return out_image
def merge_images(np_fg_image, np_layout_image, np_style_image, np_color_image, np_res_image, prompt, negative_prompt):
pil_res_image = Image.fromarray(np_res_image)
w, h = pil_res_image.size
pil_fg_image = None if np_fg_image is None else pad_image(Image.fromarray(np_fg_image), (w, h))
pil_layout_image = None if np_layout_image is None else pad_image(Image.fromarray(np_layout_image), (w, h))
pil_style_image = None if np_style_image is None else pad_image(Image.fromarray(np_style_image), (w, h))
pil_color_image = None if np_color_image is None else pad_image(Image.fromarray(np_color_image), (w, h))
input_images = [pil_layout_image, pil_style_image, pil_color_image, pil_fg_image]
input_texts = ['Layout', 'Style', 'Color', 'Subject']
input_compose_image = compose_image(input_images, input_texts, (w, h), nrow=2, ncol=2)
input_compose_image = input_compose_image.resize((w, h), Image.BICUBIC)
output_compose_image = compose_image([input_compose_image, pil_res_image], [None, None], (w, h), nrow=1,
ncol=2)
output_compose_image = add_prompt(output_compose_image, prompt, negative_prompt)
output_compose_image = np.array(output_compose_image)
return output_compose_image