Spaces:
Running
on
Zero
Running
on
Zero
File size: 3,898 Bytes
e368cec 8752733 e368cec bd0bce1 e368cec bd0bce1 e368cec bd0bce1 e368cec 8752733 e368cec bd0bce1 e368cec 5777088 e368cec 5777088 e368cec bd0bce1 8752733 e368cec 5777088 e368cec 5777088 e368cec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
import concurrent.futures
import random
import gradio as gr
import requests
import io, base64, json
import spaces
from PIL import Image
from .models import IMAGE_GENERATION_MODELS, IMAGE_EDITION_MODELS, load_pipeline
class ModelManager:
def __init__(self):
self.model_ig_list = IMAGE_GENERATION_MODELS
self.model_ie_list = IMAGE_EDITION_MODELS
self.loaded_models = {}
def load_model_pipe(self, model_name):
if not model_name in self.loaded_models:
pipe = load_pipeline(model_name)
self.loaded_models[model_name] = pipe
else:
pipe = self.loaded_models[model_name]
return pipe
@spaces.GPU(duration=60)
def generate_image_ig(self, prompt, model_name):
pipe = self.load_model_pipe(model_name)
result = pipe(prompt=prompt)
return result
def generate_image_ig_parallel_anony(self, prompt, model_A, model_B):
if model_A == "" and model_B == "":
model_names = random.sample([model for model in self.model_ig_list], 2)
else:
model_names = [model_A, model_B]
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_result = {executor.submit(self.generate_image_ig, prompt, model): model for model in model_names}
for future in concurrent.futures.as_completed(future_to_result):
result = future.result()
results.append(result)
return results[0], results[1], model_names[0], model_names[1]
def generate_image_ig_parallel(self, prompt, model_A, model_B):
results = []
model_names = [model_A, model_B]
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_result = {executor.submit(self.generate_image_ig, prompt, model): model for model in model_names}
for future in concurrent.futures.as_completed(future_to_result):
result = future.result()
results.append(result)
return results[0], results[1]
@spaces.GPU(duration=150)
def generate_image_ie(self, textbox_source, textbox_target, textbox_instruct, source_image, model_name):
pipe = self.load_model_pipe(model_name)
result = pipe(src_image = source_image, src_prompt = textbox_source, target_prompt = textbox_target, instruct_prompt = textbox_instruct)
return result
def generate_image_ie_parallel(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B):
results = []
model_names = [model_A, model_B]
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_result = {executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model): model for model in model_names}
for future in concurrent.futures.as_completed(future_to_result):
result = future.result()
results.append(result)
return results[0], results[1]
def generate_image_ie_parallel_anony(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B):
if model_A == "" and model_B == "":
model_names = random.sample([model for model in self.model_ie_list], 2)
else:
model_names = [model_A, model_B]
results = []
# model_names = [model_A, model_B]
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_result = {executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model): model for model in model_names}
for future in concurrent.futures.as_completed(future_to_result):
result = future.result()
results.append(result)
return results[0], results[1], model_names[0], model_names[1] |