| | import os |
| | import numpy as np |
| | import pandas as pd |
| | import torch |
| | import torch.nn.functional as F |
| | from PIL import Image |
| | import json |
| | import cv2 |
| | from sklearn.decomposition import PCA |
| | from open_clip import create_model_from_pretrained, get_tokenizer |
| |
|
| |
|
| |
|
| | def load_model(model_path, device): |
| | model, preprocess = create_model_from_pretrained("coca_ViT-L-14", device=device, pretrained=model_path) |
| | tokenizer = get_tokenizer('coca_ViT-L-14') |
| |
|
| | return model, preprocess, tokenizer |
| |
|
| |
|
| |
|
| | def encode_image(model, preprocess, image): |
| | image_input = torch.stack([preprocess(image)]) |
| | with torch.no_grad(): |
| | image_features = model.encode_image(image_input) |
| | image_embeddings = F.normalize(image_features, p=2, dim=-1) |
| |
|
| | return image_embeddings |
| |
|
| |
|
| |
|
| | def encode_image_patches(model, preprocess, data_dir, img_list): |
| | image_embeddings = [] |
| | for img_name in img_list: |
| | image_path = os.path.join(data_dir, 'demo_data', 'patch', img_name) |
| | image = Image.open(image_path) |
| | image_features = encode_image(model, preprocess, image) |
| | image_embeddings.append(image_features) |
| | image_embeddings = torch.from_numpy(np.array(image_embeddings)) |
| | image_embeddings = F.normalize(image_embeddings, p=2, dim=-1) |
| | return image_embeddings |
| |
|
| |
|
| |
|
| | def encode_text(model, tokenizer, text): |
| | text_input = tokenizer(text) |
| | with torch.no_grad(): |
| | text_features = model.encode_text(text_input) |
| | text_embeddings = F.normalize(text_features, p=2, dim=-1) |
| |
|
| | return text_embeddings |
| |
|
| |
|
| |
|
| | def encode_text_df(model, tokenizer, df, col_name): |
| | text_embeddings = [] |
| | for idx in df.index: |
| | text = df[df.index==idx][col_name][0] |
| | text_features = encode_text(model, tokenizer, text) |
| | text_embeddings.append(text_features) |
| | text_embeddings = torch.from_numpy(np.array(text_embeddings)) |
| | text_embeddings = F.normalize(text_embeddings, p=2, dim=-1) |
| | return text_embeddings |
| |
|
| |
|
| |
|
| | def get_pca_by_fit(tar_features, src_features): |
| | """ |
| | Applies PCA to target features and transforms both target and source features using the fitted PCA model. |
| | Combines the PCA-transformed features from both target and source datasets and returns the combined data |
| | along with batch labels indicating the origin of each sample. |
| | |
| | :param tar_features: Numpy array of target features (samples by features). |
| | :param src_features: Numpy array of source features (samples by features). |
| | :return: |
| | - pca_comb_features: A numpy array containing PCA-transformed target and source features combined. |
| | - pca_comb_features_batch: A numpy array of batch labels indicating which samples are from target (0) and source (1). |
| | """ |
| |
|
| | pca = PCA(n_components=3) |
| | |
| | |
| | pca_fit_tar = pca.fit(tar_features.T) |
| | |
| | |
| | pca_tar = pca_fit_tar.transform(tar_features.T) |
| | pca_src = pca_fit_tar.transform(src_features.T) |
| | |
| | |
| | pca_comb_features = np.concatenate((pca_tar, pca_src)) |
| | |
| | |
| | pca_comb_features_batch = np.array([0] * len(pca_tar) + [1] * len(pca_src)) |
| |
|
| | return pca_comb_features, pca_comb_features_batch |
| |
|
| |
|
| |
|
| | def cap_quantile(weight, cap_max=None, cap_min=None): |
| | """ |
| | Caps the values in the 'weight' array based on the specified quantile thresholds for maximum and minimum values. |
| | If the quantile thresholds are provided, the function will replace values above or below these thresholds |
| | with the corresponding quantile values. |
| | |
| | :param weight: Numpy array of weights to be capped. |
| | :param cap_max: Quantile threshold for the maximum cap. Values above this quantile will be capped. |
| | If None, no maximum capping will be applied. |
| | :param cap_min: Quantile threshold for the minimum cap. Values below this quantile will be capped. |
| | If None, no minimum capping will be applied. |
| | :return: Numpy array with the values capped at the specified quantiles. |
| | """ |
| | |
| | |
| | if cap_max is not None: |
| | cap_max = np.quantile(weight, cap_max) |
| | |
| | |
| | if cap_min is not None: |
| | cap_min = np.quantile(weight, cap_min) |
| | |
| | |
| | weight = np.minimum(weight, cap_max) |
| | |
| | |
| | weight = np.maximum(weight, cap_min) |
| | |
| | return weight |
| |
|
| |
|
| |
|
| | def read_polygons(file_path, slide_id): |
| | """ |
| | Reads polygon data from a JSON file for a specific slide ID, extracting coordinates, colors, and thickness. |
| | |
| | :param file_path: Path to the JSON file containing polygon configurations. |
| | :param slide_id: Identifier for the specific slide whose polygon data is to be extracted. |
| | :return: |
| | - polygons: A list of numpy arrays, where each array contains the coordinates of a polygon. |
| | - polygon_colors: A list of color values corresponding to each polygon. |
| | - polygon_thickness: A list of thickness values for each polygon's border. |
| | """ |
| |
|
| | |
| | with open(file_path, 'r') as f: |
| | polygons_configs = json.load(f) |
| |
|
| | |
| | if slide_id not in polygons_configs: |
| | return None, None, None |
| |
|
| | |
| | polygons = [np.array(poly['coords']) for poly in polygons_configs[slide_id]] |
| | polygon_colors = [poly['color'] for poly in polygons_configs[slide_id]] |
| | polygon_thickness = [poly['thickness'] for poly in polygons_configs[slide_id]] |
| |
|
| | |
| | return polygons, polygon_colors, polygon_thickness |
| |
|
| |
|
| |
|