File size: 6,004 Bytes
e368cec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import concurrent.futures 
import random
import gradio as gr
# from fal_api_utils import load_fal_model
from .imagenhub_utils import load_imagenhub_model
import spaces
import requests
import io, base64, json
from PIL import Image
import os


IMAGE_GENERATION_MODELS = ['imagenhub_LCM_generation','imagenhub_SDXLTurbo_generation','imagenhub_SDXL_generation', 'imagenhub_PixArtAlpha_generation',
                            'imagenhub_OpenJourney_generation','imagenhub_SDXLLightning_generation', 'imagenhub_StableCascade_generation',
                            'imagenhub_PlayGroundV2_generation', 'imagenhub_PlayGroundV2.5_generation']
IMAGE_EDITION_MODELS = ['imagenhub_CycleDiffusion_edition', 'imagenhub_Pix2PixZero_edition', 'imagenhub_Prompt2prompt_edition',
                        'imagenhub_SDEdit_edition', 'imagenhub_InstructPix2Pix_edition', 'imagenhub_MagicBrush_edition', 'imagenhub_PNP_edition']

class ModelManager:
    def __init__(self):
        self.model_ig_list = IMAGE_GENERATION_MODELS
        self.model_ie_list = IMAGE_EDITION_MODELS
        self.loaded_models = {}
    @spaces.GPU
    def load_model_pipe(self, model_name):
        model_source, model_name, model_type = model_name.split("_")
        if not model_name in self.loaded_models:
            if model_source == "imagenhub":
                pipe = load_imagenhub_model(model_name)
            # elif model_source == "fal":
            #     pipe = load_fal_model(model_name, model_type)
            else:
                raise ValueError(f"Model source {model_source} not supported")
            self.loaded_models[model_name] = pipe
        else:
            pipe = self.loaded_models[model_name]
        return pipe
    
    def generate_image_playground(self, model_name, prompt):
        if model_name == "imagenhub_PlayGroundV2_generation":
            model_name = "Playground_v2"
        elif model_name == "imagenhub_PlayGroundV2.5_generation":
            model_name = "Playground_v2.5"

        headers = {
            'Content-Type': 'application/json',
            'Authorization': os.environ['PlaygroundAPI'],
        }

        data = json.dumps({"prompt": prompt, "filter_model": model_name, "scheduler": "DPMPP_2M_K", "guidance_scale": 3})

        response = requests.post('https://playground.com/api/models/external/v1', headers=headers, data=data)
        response.raise_for_status() 
        json_obj = response.json()
        image_base64 = json_obj['images'][0]
        img = Image.open(io.BytesIO(base64.decodebytes(bytes(image_base64, "utf-8"))))

        return img
    @spaces.GPU
    def generate_image_ig(self, prompt, model_name):
        if 'playground' in model_name.lower():
            result = self.generate_image_playground(model_name=model_name, prompt=prompt)
        else:
            pipe = self.load_model_pipe(model_name)
            result = pipe(prompt=prompt)
        return result
    # @spaces.GPU
    def generate_image_ig_parallel_anony(self, prompt, model_A, model_B):
        if model_A == "" and model_B == "":
            model_names = random.sample([model for model in self.model_ig_list], 2)
        else:
            model_names = [model_A, model_B]

        results = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_result = {executor.submit(self.generate_image_ig, prompt, model): model for model in model_names}
            for future in concurrent.futures.as_completed(future_to_result):
                result = future.result()
                results.append(result)
        return results[0], results[1], model_names[0], model_names[1]
    # @spaces.GPU
    def generate_image_ig_parallel(self, prompt, model_A, model_B):
        results = []
        model_names = [model_A, model_B]
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_result = {executor.submit(self.generate_image_ig, prompt, model): model for model in model_names}
            for future in concurrent.futures.as_completed(future_to_result):
                result = future.result()
                results.append(result)
        return results[0], results[1]
    @spaces.GPU
    def generate_image_ie(self, textbox_source, textbox_target, textbox_instruct, source_image, model_name):
        pipe = self.load_model_pipe(model_name)
        result = pipe(src_image = source_image, src_prompt = textbox_source, target_prompt = textbox_target, instruct_prompt = textbox_instruct)
        return result
    # @spaces.GPU
    def generate_image_ie_parallel(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B):
        results = []
        model_names = [model_A, model_B]
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_result = {executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model): model for model in model_names}
            for future in concurrent.futures.as_completed(future_to_result):
                result = future.result()
                results.append(result)
        return results[0], results[1]
    # @spaces.GPU
    def generate_image_ie_parallel_anony(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B):
        if model_A == "" and model_B == "":
            model_names = random.sample([model for model in self.model_ie_list], 2)
        else:
            model_names = [model_A, model_B]
        results = []
        # model_names = [model_A, model_B]
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_result = {executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model): model for model in model_names}
            for future in concurrent.futures.as_completed(future_to_result):
                result = future.result()
                results.append(result)
        return results[0], results[1], model_names[0], model_names[1]