File size: 6,678 Bytes
67e6974
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import torch
from bayes_opt import BayesianOptimization, SequentialDomainReductionTransformer
from lpips import LPIPS
from scipy.stats import beta as beta_distribution

from utils import compute_lpips, compute_smoothness_and_consistency


def bayesian_prior_selection(
    interpolation_pipe,
    latent1: torch.FloatTensor,
    latent2: torch.FloatTensor,
    prompt1: str,
    prompt2: str,
    lpips_model: LPIPS,
    guide_prompt: str | None = None,
    negative_prompt: str = "",
    size: int = 3,
    num_inference_steps: int = 25,
    warmup_ratio: float = 1,
    early: str = "vfused",
    late: str = "self",
    target_score: float = 0.9,
    n_iter: int = 15,
    p_min: float | None = None,
    p_max: float | None = None,
) -> tuple:
    """
    Select the alpha and beta parameters for the interpolation using Bayesian optimization.

    Args:
        interpolation_pipe (any): The interpolation pipeline.
        latent1 (torch.FloatTensor): The first source latent vector.
        latent2 (torch.FloatTensor): The second source latent vector.
        prompt1 (str): The first source prompt.
        prompt2 (str): The second source prompt.
        lpips_model (any): The LPIPS model used to compute perceptual distances.
        guide_prompt (str | None, optional): The guide prompt for the interpolation, if any. Defaults to None.
        negative_prompt (str, optional): The negative prompt for the interpolation, default to empty string. Defaults to "".
        size (int, optional): The size of the interpolation sequence. Defaults to 3.
        num_inference_steps (int, optional): The number of inference steps. Defaults to 25.
        warmup_ratio (float, optional): The warmup ratio. Defaults to 1.
        early (str, optional): The early fusion method. Defaults to "vfused".
        late (str, optional): The late fusion method. Defaults to "self".
        target_score (float, optional): The target score. Defaults to 0.9.
        n_iter (int, optional): The maximum number of iterations. Defaults to 15.
        p_min (float, optional): The minimum value of alpha and beta. Defaults to None.
        p_max (float, optional): The maximum value of alpha and beta. Defaults to None.
    Returns:
        tuple: A tuple containing the selected alpha and beta parameters.
    """

    def get_smoothness(alpha, beta):
        """
        Black-box objective function of Bayesian Optimization.
        Get the smoothness of the interpolated sequence with the given alpha and beta.
        """
        if alpha < beta and large_alpha_prior:
            return 0
        if alpha > beta and not large_alpha_prior:
            return 0
        if alpha == beta:
            return init_smoothness
        interpolation_sequence = interpolation_pipe.interpolate_save_gpu(
            latent1,
            latent2,
            prompt1,
            prompt2,
            guide_prompt=guide_prompt,
            negative_prompt=negative_prompt,
            size=size,
            num_inference_steps=num_inference_steps,
            warmup_ratio=warmup_ratio,
            early=early,
            late=late,
            alpha=alpha,
            beta=beta,
        )
        smoothness, _, _ = compute_smoothness_and_consistency(
            interpolation_sequence, lpips_model
        )
        return smoothness

    # Add prior into selection of alpha and beta
    # We firstly compute the interpolated images with t=0.5
    images = interpolation_pipe.interpolate_single(
        0.5,
        latent1,
        latent2,
        prompt1,
        prompt2,
        guide_prompt=guide_prompt,
        negative_prompt=negative_prompt,
        num_inference_steps=num_inference_steps,
        warmup_ratio=warmup_ratio,
        early=early,
        late=late,
    )
    # We compute the perceptual distances of the interpolated images (t=0.5) to the source image
    distances = compute_lpips(images, lpips_model)
    # We compute the init_smoothness as the smoothness when alpha=beta to avoid recomputation
    init_smoothness, _, _ = compute_smoothness_and_consistency(images, lpips_model)
    # If perceptual distance to the first source image is smaller, alpha should be larger than beta
    large_alpha_prior = distances[0] < distances[1]

    # Bayesian optimization configuration
    num_warmup_steps = warmup_ratio * num_inference_steps
    if p_min is None:
        p_min = 1
    if p_max is None:
        p_max = num_warmup_steps
    pbounds = {"alpha": (p_min, p_max), "beta": (p_min, p_max)}
    bounds_transformer = SequentialDomainReductionTransformer(minimum_window=0.1)
    optimizer = BayesianOptimization(
        f=get_smoothness,
        pbounds=pbounds,
        random_state=1,
        bounds_transformer=bounds_transformer,
        allow_duplicate_points=True,
    )
    alpha_init = [p_min, (p_min + p_max) / 2, p_max]
    beta_init = [p_min, (p_min + p_max) / 2, p_max]

    # Initial probing
    for alpha in alpha_init:
        for beta in beta_init:
            optimizer.probe(params={"alpha": alpha, "beta": beta}, lazy=False)
            latest_result = optimizer.res[-1]  # Get the last result
            latest_score = latest_result["target"]
            if latest_score >= target_score:
                return alpha, beta

    # Start optimization
    for _ in range(n_iter):  # Max iterations
        optimizer.maximize(init_points=0, n_iter=1)  # One iteration at a time
        max_score = optimizer.max["target"]  # Get the highest score so far
        if max_score >= target_score:
            print(f"Stopping early, target of {target_score} reached.")
            break  # Exit the loop if target is reached or exceeded

    results = optimizer.max
    alpha = results["params"]["alpha"]
    beta = results["params"]["beta"]
    return alpha, beta


def generate_beta_tensor(
    size: int, alpha: float = 3, beta: float = 3
) -> torch.FloatTensor:
    """
    Assume size as n
    Generates a PyTorch tensor of values [x0, x1, ..., xn-1] for the Beta distribution
    where each xi satisfies F(xi) = i/(n-1) for the CDF F of the Beta distribution.

    Args:
        size (int): The number of values to generate.
        alpha (float): The alpha parameter of the Beta distribution.
        beta (float): The beta parameter of the Beta distribution.

    Returns:
        torch.Tensor: A tensor of the inverse CDF values of the Beta distribution.
    """
    # Generating the inverse CDF values
    prob_values = [i / (size - 1) for i in range(size)]
    inverse_cdf_values = beta_distribution.ppf(prob_values, alpha, beta)

    # Converting to a PyTorch tensor
    return torch.tensor(inverse_cdf_values, dtype=torch.float32)