stella-inpaint / modules /engine_inpaint.py
ikmalsaid's picture
added/updated app files
13f952f
raw
history blame
3.42 kB
import gradio as ui; import requests; from requests.exceptions import Timeout, ConnectionError
from io import BytesIO; from PIL import Image; import concurrent.futures, time
###################################################################################################################
# Import from modules
###################################################################################################################
from modules.service_endpoints import *
from modules.service_configs import *
###################################################################################################################
# Image inpaint process
###################################################################################################################
def inpaint(input_image, input_mask, prompt, negative):
image_pil = Image.fromarray(input_image)
mask_pil = Image.fromarray(input_mask)
image_bytes = BytesIO()
mask_bytes = BytesIO()
image_pil.save(image_bytes, format='PNG')
mask_pil.save(mask_bytes, format='PNG')
print(f"{receive()} -> {prompt}")
payload = {
'model_version': (None, '1'),
'prompt': (None, prompt),
'neg_prompt': (None, f"({negative}), hands, face, eyes, legs"),
'inpaint_strength': (None, '0.75'),
'cfg': (None, '9.5'),
'priority': (None, '1'),
}
data = {
'image': ('input_image.png', image_bytes.getvalue(), 'image/png'),
'mask': ('input_mask.png', mask_bytes.getvalue(), 'image/png')
}
try:
response = requests.post(mode['inpaint'], headers=head, data=payload, files=data, timeout=(None, None))
if len(response.content) < 0 * 1024:
print(reject())
return None
print(done())
return Image.open(BytesIO(response.content))
except Timeout:
print(timeout())
return None
except Exception as e:
print(f"An error occurred: {e}")
ui.Warning(message=single_error)
return None
###################################################################################################################
# 4 image for each generation
###################################################################################################################
def quads_inpaint(a, b, c, d, progress=ui.Progress()):
quantities = int(d)
result_list = [None] * quantities
percent = 0
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for i in range(quantities):
future = executor.submit(lambda x: inpaint(a["background"], a["layers"][0], b, c), i)
futures.append(future)
multiplier = 0.99 / quantities
percent += multiplier
progress(percent, desc=f"Processing image {i + 1} of {quantities}")
time.sleep(0.25)
for i, future in enumerate(futures):
result = future.result()
result_list[i] = result
successful_results = [result for result in result_list if result is not None]
if len(successful_results) < quantities:
if quantities == 1:
ui.Warning(message=single_error)
else:
ui.Warning(message=quads_error)
return successful_results
def inpaint2(a, b, c, d):
return ui.Tabs(selected=1), a, quads_inpaint(a, b, c, d)
def reset():
return ui.Tabs(selected=0)