import numpy as np from sklearn.svm import SVC from sklearn.linear_model import LogisticRegression from sklearn.model_selection import train_test_split import torch from umap import UMAP import PIL from tqdm import tqdm import random from PIL import Image, ImageColor from .color_annotations import extract_color def get_separation_space(type_bin, annotations, df, samples=200, method='LR', C=0.1, latent_space='Z'): """ The get_separation_space function takes in a type_bin, annotations, and df. It then samples 100 of the most representative abstracts for that type_bin and 100 of the least representative abstracts for that type_bin. It then trains an SVM or logistic regression model on these 200 samples to find a separation space between them. The function returns this separation space as well as how many nodes are important in this separation space. :param type_bin: Select the type of abstracts to be used for training :param annotations: Access the z_vectors :param df: Get the abstracts that are used for training :param samples: Determine how many samples to take from the top and bottom of the distribution :param method: Specify the classifier to use :param C: Control the regularization strength :return: The weights of the linear classifier :doc-author: Trelent """ if latent_space == 'Z': col = 'z_vectors' else: col = 'w_vectors' if len(type_bin) == 1: type_bin = type_bin[0] if type(type_bin) == str: abstracts = np.array([float(ann) for ann in df[type_bin]]) abstract_idxs = list(np.argsort(abstracts))[:samples] repr_idxs = list(np.argsort(abstracts))[-samples:] X = np.array([annotations[col][i] for i in abstract_idxs+repr_idxs]) elif len(type_bin) == 2: print('Using two concepts for separation space') first_concept = np.array([float(ann) for ann in df[type_bin[0]]]) second_concept = np.array([float(ann) for ann in df[type_bin[1]]]) first_idxs = list(np.argsort(first_concept))[:samples] second_idxs = list(np.argsort(second_concept))[:samples] X = np.array([annotations[col][i] for i in first_idxs+second_idxs]) else: print('Error: type_bin must be either a string or a list of strings of len 2') return X = X.reshape((2*samples, 512)) y = np.array([1]*samples + [0]*samples) x_train, x_val, y_train, y_val = train_test_split(X, y, test_size=0.2) if method == 'SVM': svc = SVC(gamma='auto', kernel='linear', random_state=0, C=C) svc.fit(x_train, y_train) print('Val performance SVM', svc.score(x_val, y_val)) imp_features = (np.abs(svc.coef_) > 0.2).sum() imp_nodes = np.where(np.abs(svc.coef_) > 0.2)[1] return svc.coef_ / np.linalg.norm(clf.coef_), imp_features, imp_nodes, np.round(clf.score(x_val, y_val),2) elif method == 'LR': clf = LogisticRegression(random_state=0, C=C) clf.fit(x_train, y_train) print('Val performance logistic regression', clf.score(x_val, y_val)) imp_features = (np.abs(clf.coef_) > 0.15).sum() imp_nodes = np.where(np.abs(clf.coef_) > 0.15)[1] return clf.coef_ / np.linalg.norm(clf.coef_), imp_features, imp_nodes, np.round(clf.score(x_val, y_val),2) def regenerate_images(model, z, decision_boundary, min_epsilon=-3, max_epsilon=3, count=5, latent_space='Z', layers=None, number=3): """ The regenerate_images function takes a model, z, and decision_boundary as input. It then constructs an inverse rotation/translation matrix and passes it to the generator. The generator expects this matrix as an inverse to avoid potentially failing numerical operations in the network. The function then generates images using G(z_0, label) where z_0 is a linear combination of z and the decision boundary. :param model: Pass in the model to be used for image generation :param z: Generate the starting point of the line :param decision_boundary: Generate images along the direction of the decision boundary :param min_epsilon: Set the minimum value of lambda :param max_epsilon: Set the maximum distance from the original image to generate :param count: Determine the number of images that are generated :return: A list of images and a list of lambdas :doc-author: Trelent """ device = torch.device('cpu') G = model.to(device) # type: ignore if False: decision_boundary = z - (np.dot(z, decision_boundary.T) / np.dot(decision_boundary, decision_boundary.T)) * decision_boundary # Labels. label = torch.zeros([1, G.c_dim], device=device) z = torch.from_numpy(z.copy()).to(device) decision_boundary = torch.from_numpy(decision_boundary.copy()).to(device) repetitions = 16 if number == 3 else 14 lambdas = np.linspace(min_epsilon, max_epsilon, count) images = [] # Generate images. for _, lambda_ in enumerate(tqdm(lambdas)): z_0 = z + lambda_ * decision_boundary if latent_space == 'Z': W_0 = G.mapping(z_0, label, truncation_psi=1).to(torch.float32) W = G.mapping(z, label, truncation_psi=1).to(torch.float32) else: W_0 = z_0.expand((repetitions, -1)).unsqueeze(0).to(torch.float32) W = z.expand((repetitions, -1)).unsqueeze(0).to(torch.float32) if layers: W_f = torch.empty_like(W).copy_(W).to(torch.float32) W_f[:, layers, :] = W_0[:, layers, :] img = G.synthesis(W_f, noise_mode='const') else: img = G.synthesis(W_0, noise_mode='const') img = (img.permute(0, 2, 3, 1) * 127.5 + 128).clamp(0, 255).to(torch.uint8) images.append(PIL.Image.fromarray(img[0].cpu().numpy(), 'RGB')) return images, lambdas def generate_joint_effect(model, z, decision_boundaries, min_epsilon=-3, max_epsilon=3, count=5, latent_space='Z'): decision_boundary_joint = np.sum(decision_boundaries, axis=0) print(decision_boundary_joint.shape) return regenerate_images(model, z, decision_boundary_joint, min_epsilon=min_epsilon, max_epsilon=max_epsilon, count=count, latent_space=latent_space) def generate_original_image(z, model, latent_space='Z', number=3): """ The generate_original_image function takes in a latent vector and the model, and returns an image generated from that latent vector. :param z: Generate the image :param model: Generate the image :return: A pil image :doc-author: Trelent """ repetitions = 16 if number == 3 else 14 device = torch.device('cpu') G = model.to(device) # type: ignore # Labels. label = torch.zeros([1, G.c_dim], device=device) if latent_space == 'Z': z = torch.from_numpy(z.copy()).to(device) img = G(z, label, truncation_psi=1, noise_mode='const') else: W = torch.from_numpy(np.repeat(z, repetitions, axis=0).reshape(1, repetitions, z.shape[1]).copy()).to(device) print(W.shape) img = G.synthesis(W, noise_mode='const') img = (img.permute(0, 2, 3, 1) * 127.5 + 128).clamp(0, 255).to(torch.uint8) return PIL.Image.fromarray(img[0].cpu().numpy(), 'RGB') def get_concepts_vectors(concepts, annotations, df, samples=100, method='LR', C=0.1, latent_space='Z'): """ The get_concepts_vectors function takes in a list of concepts, a dictionary of annotations, and the dataframe containing all the images. It returns two things: 1) A numpy array with shape (len(concepts), 512) where each row is an embedding vector for one concept. 2) A set containing all nodes that are important in this separation space. :param concepts: Specify the concepts to be used in the analysis :param annotations: Get the annotations for each concept :param df: Get the annotations for each concept :param samples: Determine the number of samples to use in training the logistic regression model :param method: Choose the method used to train the model :param C: Control the regularization of the logistic regression :return: The vectors of the concepts and the nodes that are in common for all concepts :doc-author: Trelent """ important_nodes = [] performances = [] vectors = np.zeros((len(concepts), 512)) for i, conc in enumerate(concepts): vec, _, imp_nodes, performance = get_separation_space(conc, annotations, df, samples=samples, method=method, C=C, latent_space=latent_space) vectors[i,:] = vec performances.append(performance) important_nodes.append(set(imp_nodes)) # reducer = UMAP(n_neighbors=3, # default 15, The size of local neighborhood (in terms of number of neighboring sample points) used for manifold approximation. # n_components=3, # default 2, The dimension of the space to embed into. # min_dist=0.1, # default 0.1, The effective minimum distance between embedded points. # spread=2.0, # default 1.0, The effective scale of embedded points. In combination with ``min_dist`` this determines how clustered/clumped the embedded points are. # random_state=0, # default: None, If int, random_state is the seed used by the random number generator; # ) # projection = reducer.fit_transform(vectors) nodes_in_common = set.intersection(*important_nodes) return vectors, nodes_in_common, performances def get_verification_score(color_id, decision_boundary, model, annotations, samples=100, latent_space='W'): listlen = len(annotations['fname']) items = random.sample(range(listlen), samples) hue_low = color_id * 256 / 12 hue_high = (color_id + 1) * 256 / 12 hue_mean = (hue_low + hue_high) / 2 print(int(hue_low), int(hue_high), int(hue_mean)) distances = [] distances_orig = [] for iterator in tqdm(items): if latent_space == 'Z': z = annotations['z_vectors'][iterator] else: z = annotations['w_vectors'][iterator] images, lambdas = regenerate_images(model, z, decision_boundary, min_epsilon=0, max_epsilon=1, count=2, latent_space=latent_space) colors_orig = extract_color(images[0], 5, 1, None) h_old, s_old, v_old = ImageColor.getcolor(colors_orig[0], 'HSV') colors_new = extract_color(images[1], 5, 1, None) h_new, s_new, v_new = ImageColor.getcolor(colors_new[0], 'HSV') print(h_old, h_new) distance = np.abs(hue_mean - h_new) distances.append(distance) distance_orig = np.abs(hue_mean - h_old) distances_orig.append(distance_orig) return np.round(np.mean(np.array(distances)), 4), np.round(np.mean(np.array(distances_orig)), 4) def get_verification_score_clip(concept, decision_boundary, model, annotations, samples=100, latent_space='Z'): import open_clip import os import random from tqdm import tqdm os.environ["CUDA_VISIBLE_DEVICES"] = "" model_clip, _, preprocess = open_clip.create_model_and_transforms('ViT-L-14', pretrained='laion2b_s32b_b82k') tokenizer = open_clip.get_tokenizer('ViT-L-14') # Prepare the text queries #@markdown _in the form pre_prompt {label}_: pre_prompt = "Artwork, " #@param {type:"string"} text_descriptions = [f"{pre_prompt}{label}" for label in [concept]] text_tokens = tokenizer(text_descriptions) listlen = len(annotations['fname']) items = random.sample(range(listlen), samples) changes = [] for iterator in tqdm(items): chunk_imgs = [] chunk_ids = [] if latent_space == 'Z': z = annotations['z_vectors'][iterator] else: z = annotations['w_vectors'][iterator] images, lambdas = regenerate_images(model, z, decision_boundary, min_epsilon=0, max_epsilon=1, count=2, latent_space=latent_space) for im,l in zip(images, lambdas): chunk_imgs.append(preprocess(im.convert("RGB"))) chunk_ids.append(l) image_input = torch.tensor(np.stack(chunk_imgs)) with torch.no_grad(), torch.cuda.amp.autocast(): text_features = model_clip.encode_text(text_tokens).float() image_features = model_clip.encode_image(image_input).float() # Rescale features image_features /= image_features.norm(dim=-1, keepdim=True) text_features /= text_features.norm(dim=-1, keepdim=True) # Analyze featues text_probs = (100.0 * image_features.cpu().numpy() @ text_features.cpu().numpy().T)#.softmax(dim=-1) change = max(text_probs[1][0].item() - text_probs[0][0].item(), 0) changes.append(change) return np.round(np.mean(np.array(changes)), 4) def tohsv(df): df['H1'] = df['top1col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[0]) df['H2'] = df['top2col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[0]) df['H3'] = df['top3col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[0]) df['S1'] = df['top1col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[1]) df['S2'] = df['top2col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[1]) df['S3'] = df['top3col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[1]) df['V1'] = df['top1col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[2]) df['V2'] = df['top2col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[2]) df['V3'] = df['top3col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[2]) return df def rest_from_style(x, styles, layer): dtype = torch.float16 if (getattr(model.synthesis, layer).use_fp16 and device=='cuda') else torch.float32 if getattr(model.synthesis, layer).is_torgb: print(layer, getattr(model.synthesis, layer).is_torgb) weight_gain = 1 / np.sqrt(getattr(model.synthesis, layer).in_channels * (getattr(model.synthesis, layer).conv_kernel ** 2)) styles = styles * weight_gain input_gain = getattr(model.synthesis, layer).magnitude_ema.rsqrt().to(dtype) # Execute modulated conv2d. x = modulated_conv2d(x=x.to(dtype), w=getattr(model.synthesis, layer).weight.to(dtype), s=styles.to(dtype), padding=getattr(model.synthesis, layer).conv_kernel-1, demodulate=(not getattr(model.synthesis, layer).is_torgb), input_gain=input_gain.to(dtype)) # Execute bias, filtered leaky ReLU, and clamping. gain = 1 if getattr(model.synthesis, layer).is_torgb else np.sqrt(2) slope = 1 if getattr(model.synthesis, layer).is_torgb else 0.2 x = filtered_lrelu.filtered_lrelu(x=x, fu=getattr(model.synthesis, layer).up_filter, fd=getattr(model.synthesis, layer).down_filter, b=getattr(model.synthesis, layer).bias.to(x.dtype), up=getattr(model.synthesis, layer).up_factor, down=getattr(model.synthesis, layer).down_factor, padding=getattr(model.synthesis, layer).padding, gain=gain, slope=slope, clamp=getattr(model.synthesis, layer).conv_clamp) return x def getS(w): w_torch = torch.from_numpy(w).to('cpu') W = w_torch.expand((16, -1)).unsqueeze(0) s = [] s.append(model.synthesis.input.affine(W[0, 0].unsqueeze(0)).numpy()) s.append(model.synthesis.L0_36_512.affine(W[0, 1].unsqueeze(0)).numpy()) s.append(model.synthesis.L1_36_512.affine(W[0, 2].unsqueeze(0)).numpy()) s.append(model.synthesis.L2_36_512.affine(W[0, 3].unsqueeze(0)).numpy()) s.append(model.synthesis.L3_52_512.affine(W[0, 4].unsqueeze(0)).numpy()) s.append(model.synthesis.L4_52_512.affine(W[0, 5].unsqueeze(0)).numpy()) s.append(model.synthesis.L5_84_512.affine(W[0, 6].unsqueeze(0)).numpy()) s.append(model.synthesis.L6_84_512.affine(W[0, 7].unsqueeze(0)).numpy()) s.append(model.synthesis.L7_148_512.affine(W[0, 8].unsqueeze(0)).numpy()) s.append(model.synthesis.L8_148_512.affine(W[0, 9].unsqueeze(0)).numpy()) s.append(model.synthesis.L9_148_362.affine(W[0, 10].unsqueeze(0)).numpy()) s.append(model.synthesis.L10_276_256.affine(W[0, 11].unsqueeze(0)).numpy()) s.append(model.synthesis.L11_276_181.affine(W[0, 12].unsqueeze(0)).numpy()) s.append(model.synthesis.L12_276_128.affine(W[0, 13].unsqueeze(0)).numpy()) s.append(model.synthesis.L13_256_128.affine(W[0, 14].unsqueeze(0)).numpy()) s.append(model.synthesis.L14_256_3.affine(W[0, 15].unsqueeze(0)).numpy()) return s def detect_attribute_specific_channels(positives, all, sign=False): """ Formula from StyleSpace Analysis """ mp = np.mean(all, axis=0) sp = np.std(all, axis=0) de = (positives - mp) / sp meu = np.mean(de, axis=0) seu = np.std(de, axis=0) if sign: thetau = meu / seu else: thetau = np.abs(meu) / seu return thetau def all_variance_based_disentanglements(labels, x, y, k=10, sign=False, cutout=0.28): seps = [] sorted_vals = [] for lbl in labels: positives = x[np.where(y == lbl)] variations = detect_attribute_specific_channels(positives, x, sign=sign) if sign: argsorted_vars_pos = np.argsort(variations)[-k//2:] # print(argsorted_vars_pos) argsorted_vars_neg = np.argsort(variations)[:k//2] if cutout: beyond_cutout = np.where(np.abs(variations) > cutout) # print(beyond_cutout) argsorted_vars_pos_int = np.intersect1d(argsorted_vars_pos, beyond_cutout) argsorted_vars_neg_int = np.intersect1d(argsorted_vars_neg, beyond_cutout) # print(argsorted_vars_pos) if len(argsorted_vars_neg_int) > 0: argsorted_vars_neg = np.array(argsorted_vars_neg_int) if len(argsorted_vars_pos_int) > 0: argsorted_vars_pos = np.array(argsorted_vars_pos_int) else: argsorted_vars = np.argsort(variations)[-k:] sorted_vals.append(np.sort(variations)) separation_vector_onehot /= np.linalg.norm(separation_vector_onehot) seps.append(separation_vector_onehot) return seps, sorted_vals def generate_flexible_images(w, change_vectors, lambdas=1, device='cpu'): w_torch = torch.from_numpy(w).to('cpu') if len(change_vectors) != 17: w_torch = w_torch + lambdas * change_vectors[0] W = w_torch.expand((16, -1)).unsqueeze(0) x = model.synthesis.input(W[0,0].unsqueeze(0)) for i, layer in enumerate(layers): if i < 2: continue style = getattr(model.synthesis, layer).affine(W[0, i-1].unsqueeze(0)) if len(change_vectors) != 17: change = torch.from_numpy(change_vectors[i].copy()).unsqueeze(0).to(device) style = torch.add(style, change, alpha=lambdas) x = rest_from_style(x, style, layer) if model.synthesis.output_scale != 1: x = x * model.synthesis.output_scale img = (x.permute(0, 2, 3, 1) * 127.5 + 128).clamp(0, 255).to(torch.uint8) img = PIL.Image.fromarray(img[0].cpu().numpy(), 'RGB') return img def get_original_pos(top_positions, bottom_positions=None, space='s', sign=True, shapes=[[512, 4, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 362, 256, 181, 128, 128]], layers=['w', 'input', 'L0_36_512', 'L1_36_512', 'L2_36_512', 'L3_52_512', 'L4_52_512', 'L5_84_512', 'L6_84_512', 'L7_148_512', 'L8_148_512', 'L9_148_362', 'L10_276_256', 'L11_276_181', 'L12_276_128', 'L13_256_128', 'L14_256_3'], ): if space == 's': current_idx = 0 vectors = [] for i, (leng, layer) in enumerate(zip(shapes, layers)): arr = np.zeros(leng) for top_position in top_positions: if top_position >= current_idx and top_position < current_idx + leng: arr[top_position - current_idx] = 1 for bottom_position in bottom_positions: if sign: if bottom_position >= current_idx and bottom_position < current_idx + leng: arr[bottom_position - current_idx] = 1 arr = arr / (np.linalg.norm(arr) + 0.000001) vectors.append(arr) current_idx += leng else: if sign: vectors = np.zeros(512) vectors[top_positions] = 1 vectors[bottom_positions] = -1 else: vectors = np.zeros(512) vectors[top_positions] = 1 return vectors def getX(annotations, space='s'): if space == 'x': X = np.array(annotations['w_vectors']).reshape((len(annotations['w_vectors']), 512)) elif space == 's': concat_v = [] for i in range(len(annotations['w_vectors'])): concat_v.append(np.concatenate([annotations['w_vectors'][i]] + annotations['s_vectors'][i], axis=1)) X = np.array(concat_v) X = X[:, 0, :] print(X.shape) return X