import itertools import re import warnings import os import sys import copy import pickle as pkl import numpy as np import pandas as pd import skimage from skimage.segmentation import mark_boundaries import matplotlib.pyplot as plt from matplotlib.pyplot import cm import matplotlib.pyplot matplotlib.pyplot.switch_backend('Agg') import seaborn as sns import phenograph # suppress numba deprecation warning # ref: https://github.com/Arize-ai/phoenix/pull/799 with warnings.catch_warnings(): from numba.core.errors import NumbaWarning warnings.simplefilter("ignore", category=NumbaWarning) import umap from umap import UMAP from typing import Union, Optional, Type, Tuple, List, Dict from collections.abc import Callable from scipy import sparse as sp from sklearn.neighbors import kneighbors_graph as skgraph # , DistanceMetric from sklearn.metrics import DistanceMetric from sklearn.cluster import KMeans from itertools import product ## added for test import platform from pathlib import Path FILE = Path(__file__).resolve() ROOT = FILE.parents[0] # cytof root directory if str(ROOT) not in sys.path: sys.path.append(str(ROOT)) # add ROOT to PATH if platform.system() != 'Windows': ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative from hyperion_segmentation import cytof_nuclei_segmentation, cytof_cell_segmentation, visualize_segmentation from cytof.utils import (save_multi_channel_img, generate_color_dict, show_color_table, visualize_scatter, visualize_expression, _get_thresholds, _generate_summary) def get_name(dfrow): return os.path.join(dfrow['path'], dfrow['ROI']) class CytofImage(): morphology = ["area", "convex_area", "eccentricity", "extent", "filled_area", "major_axis_length", "minor_axis_length", "orientation", "perimeter", "solidity", "pa_ratio"] def __init__(self, df: Optional[pd.DataFrame] = None, slide: str = "", roi: str = "", filename: str = ""): self.df = df self.slide = slide self.roi = roi self.filename = filename self.columns = None # column names in original cytof data (dataframe) self.markers = None # protein markers self.labels = None # metal isotopes used to tag protein self.image = None self.channels = None # channel names correspond to each channel of self.image self.features = None def copy(self): ''' Creates a deep copy of the current CytofImage object and return it ''' new_instance = type(self)(self.df.copy(), self.slide, self.roi, self.filename) new_instance.columns = copy.deepcopy(self.columns) new_instance.markers = copy.deepcopy(self.markers) new_instance.labels = copy.deepcopy(self.labels) new_instance.image = copy.deepcopy(self.image) new_instance.channels = copy.deepcopy(self.channels) new_instance.features = copy.deepcopy(self.features) return new_instance def __str__(self): return f"CytofImage slide {self.slide}, ROI {self.roi}" def __repr__(self): return f"CytofImage(slide={self.slide}, roi={self.roi})" def save_cytof(self, savename: str): directory = os.path.dirname(savename) if not os.path.exists(directory): os.makedirs(directory) pkl.dump(self, open(savename, "wb")) def get_markers(self, imarker0: Optional[str] = None): """ Get (1) the channel names correspond to each image channel (2) a list of protein markers used to obtain the CyTOF image (3) a list of labels tagged to each of the protein markers """ self.columns = list(self.df.columns) if imarker0 is not None: # if the index of the 1st marker provided self.raw_channels = self.columns[imarker0:] else: # assumption: channel names have the common expression: marker(label*) pattern = "\w+.*\(\w+\)" self.raw_channels = [re.findall(pattern, t)[0] for t in self.columns if len(re.findall(pattern, t)) > 0] self.raw_markers = [x.split('(')[0] for x in self.raw_channels] self.raw_labels = [x.split('(')[-1].split(')')[0] for x in self.raw_channels] self.channels = self.raw_channels.copy() self.markers = self.raw_markers.copy() self.labels = self.raw_labels.copy() def export_feature(self, feat_name: str, savename: Optional[str] = None): """ Export a set of specified feature """ savename = savename if savename else f"{feat_name}.csv" savename = savename if savename.endswith(".csv") else f"{feat_name}.csv" df = getattr(self, feat_name) df.to_csv(savename) def preprocess(self): nrow = int(max(self.df['Y'].values)) + 1 ncol = int(max(self.df['X'].values)) + 1 n = len(self.df) if nrow * ncol > n: df2 = pd.DataFrame(np.zeros((nrow * ncol - n, len(self.df.columns)), dtype=int), columns=self.df.columns) self.df = pd.concat([self.df, df2]) def quality_control(self, thres: int = 50) -> None: setattr(self, "keep", False) if (max(self.df['X']) < thres) \ or (max(self.df['Y']) < thres): print("At least one dimension of the image {}-{} is smaller than {}, exclude from analyzing" \ .format(self.slide, self.roi, thres)) self.keep = False def check_channels(self, channels: Optional[List] = None, xlim: Optional[List] = None, ylim: Optional[List] = None, ncols: int = 5, vis_q: float = 0.9, colorbar: bool = False, savedir: Optional[str] = None, savename: str = "check_channels" ):# -> Optional[matplotlib.figure.Figure]: """ xlim = a list of 2 numbers indicating the ylimits to show image (default=None) ylim = a list of 2 numbers indicating the ylimits to show image (default=None) ncols = number of subplots per row (default=5) vis_q = percentile q used to normalize image before visualization (default=0.9) """ show = True if savedir is None else False if channels is not None: if not all([cl.lower() in self.channels for cl in channels]): print("At least one of the channels not available, visualizing all channels instead!") channels = None if channels is None: # if no desired channels specified, check all channels channels = self.channels nrow = max(self.df['Y'].values) + 1 ncol = max(self.df['X'].values) + 1 if len(channels) <= ncols: ax_nrow = 1 ax_ncol = len(channels) else: ax_ncol = ncols ax_nrow = int(np.ceil(len(channels) / ncols)) fig, axes = plt.subplots(ax_nrow, ax_ncol, figsize=(3 * ax_ncol, 3 * ax_nrow)) if ax_nrow == 1: axes = np.array([axes]) if ax_ncol == 1: axes = np.expand_dims(axes, axis=1) for i, _ in enumerate(channels): _ax_nrow = int(np.floor(i / ax_ncol)) _ax_ncol = i % ax_ncol image = self.df[_].values.reshape(nrow, ncol) percentile_q = np.quantile(image, vis_q) if np.quantile(image, vis_q)!= 0 else 1 image = np.clip(image / percentile_q, 0, 1) axes[_ax_nrow, _ax_ncol].set_title(_) if xlim is not None: image = image[:, xlim[0]:xlim[1]] if ylim is not None: image = image[ylim[0]:ylim[1], :] im = axes[_ax_nrow, _ax_ncol].imshow(image, cmap="gray") if colorbar: fig.colorbar(im, ax=axes[_ax_nrow, _ax_ncol]) plt.tight_layout() if show: plt.show() else: plt.savefig(os.path.join(savedir, f"{savename}.png")) return fig def get_image(self, channels: List =None, inplace: bool = True, verbose=False): """ Get channel images based on provided channels. By default, get channel images correspond to all channels """ if channels is not None: if not all([cl in self.channels for cl in channels]): print("At least one of the channels not available, using default all channels instead!") channels = self.channels inplace = True else: channels = self.channels inplace = True nc = len(channels) nrow = max(self.df['Y'].values) + 1 ncol = max(self.df['X'].values) + 1 if verbose: print("Output image shape: [{}, {}, {}]".format(nrow, ncol, nc)) target_image = np.zeros([nrow, ncol, nc], dtype=float) for _nc in range(nc): target_image[..., _nc] = self.df[channels[_nc]].values.reshape(nrow, ncol) if inplace: self.image = target_image else: return target_image def visualize_single_channel(self, channel_name: str, color: str, quantile: float = None, visualize: bool = False): """ Visualize one channel of the multi-channel image, with a specified color from red, green, and blue """ channel_id = self.channels.index(channel_name) if quantile is None: # calculate 99th percentile by default quantile = np.quantile(self.image[..., channel_id], 0.99) channel_id_ = ["red", "green", "blue"].index(color) # channel index vis_im = np.zeros((self.image.shape[0], self.image.shape[1], 3)) gs = np.clip(self.image[..., channel_id] / quantile, 0, 1) # grayscale vis_im[..., channel_id_] = gs vis_im = (vis_im * 255).astype(np.uint8) if visualize: fig, ax = plt.subplots(1, 1) ax.imshow(vis_im) plt.show() return vis_im def visualize_channels(self, channel_ids: Optional[List]=None, channel_names: Optional[List]=None, quantiles: Optional[List]=None, visualize: Optional[bool]=False, show_colortable: Optional[bool]=False ): """ Visualize multiple channels simultaneously """ assert channel_ids or channel_names, 'At least one should be provided, either "channel_ids" or "channel_names"!' if channel_ids is None: channel_ids = [self.channels.index(n) for n in channel_names] else: channel_names = [self.channels[i] for i in channel_ids] assert len(channel_ids) <= 7, "No more than 6 channels can be visualized simultaneously!" if len(channel_ids) > 3: warnings.warn( "Visualizing more than 3 channels the same time results in deteriorated visualization. \ It is not recommended!") print("Visualizing channels: {}".format(', '.join(channel_names))) full_colors = ['red', 'green', 'blue', 'cyan', 'magenta', 'yellow', 'white'] color_values = [(1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 1, 1), (1, 0, 1), (1, 1, 0), (1, 1, 1)] info = ["{} in {}\n".format(marker, c) for (marker, c) in \ zip([self.channels[i] for i in channel_ids], full_colors[:len(channel_ids)])] print("Visualizing... \n{}".format(''.join(info))) merged_im = np.zeros((self.image.shape[0], self.image.shape[1], 3)) if quantiles is None: quantiles = [np.quantile(self.image[..., _], 0.99) for _ in channel_ids] # max_vals = [] for _ in range(min(len(channel_ids), 3)): # first 3 channels, assign colors R, G, B gs = np.clip(self.image[..., channel_ids[_]] / quantiles[_], 0, 1) # grayscale merged_im[..., _] = gs * 255 max_val = [0, 0, 0] max_val[_] = gs.max() * 255 # max_vals.append(max_val) chs = [[1, 2], [0, 2], [0, 1], [0, 1, 2]] chs_id = 0 while _ < len(channel_ids) - 1: _ += 1 max_val = [0, 0, 0] for j in chs[chs_id]: gs = np.clip(self.image[..., channel_ids[_]] / quantiles[_], 0, 1) merged_im[..., j] += gs * 255 # /2 merged_im[..., j] = np.clip(merged_im[..., j], 0, 255) max_val[j] = gs.max() * 255 chs_id += 1 # max_vals.append(max_val) merged_im = merged_im.astype(np.uint8) if visualize: fig, ax = plt.subplots(1, 1) ax.imshow(merged_im) plt.show() vis_markers = [self.markers[i] if i < len(self.markers) else self.channels[i] for i in channel_ids] color_dict = dict((n, c) for (n, c) in zip(vis_markers, color_values[:len(channel_ids)])) if show_colortable: show_color_table(color_dict=color_dict, title="color dictionary", emptycols=3, sort_names=True) return merged_im, quantiles, color_dict def remove_special_channels(self, channels: List): """ Given a list of channels, remove them from the class. This typically happens when users define certain channels to be the nuclei for special processing. """ for channel in channels: if channel not in self.channels: print("Channel {} not available, escaping...".format(channel)) continue idx = self.channels.index(channel) self.channels.pop(idx) self.markers.pop(idx) self.labels.pop(idx) self.df.drop(columns=channel, inplace=True) def define_special_channels(self, channels_dict: Dict, verbose=False, rm_key: str = 'nuclei'): ''' Special channels (antibodies) commonly found to define cell componenets (e.g. nuclei or membranes) ''' channels_rm = [] for new_name, old_names in channels_dict.items(): if len(old_names) == 0: continue old_nms = [] for i, old_name in enumerate(old_names): if old_name not in self.channels: warnings.warn('{} is not available!'.format(old_name)) continue old_nms.append(old_name) if verbose: print("Defining channel '{}' by summing up channels: {}.".format(new_name, ', '.join(old_nms))) if len(old_nms) > 0: # only add channels to removal list if matching remove key if new_name == rm_key: channels_rm += old_nms for i, old_name in enumerate(old_nms): if i == 0: self.df[new_name] = self.df[old_name] else: self.df[new_name] += self.df[old_name] if new_name not in self.channels: self.channels.append(new_name) self.get_image(verbose=verbose) if hasattr(self, "defined_channels"): for key in channels_dict.keys(): self.defined_channels.add(key) else: setattr(self, "defined_channels", set(list(channels_dict.keys()))) return channels_rm def get_seg( self, use_membrane: bool = True, radius: int = 5, sz_hole: int = 1, sz_obj: int = 3, min_distance: int = 2, fg_marker_dilate: int = 2, bg_marker_dilate: int = 2, show_process: bool = False, verbose: bool = False): channels = [x.lower() for x in self.channels] assert 'nuclei' in channels, "a 'nuclei' channel is required for segmentation!" nuclei_img = self.image[..., self.channels.index('nuclei')] if show_process: print("Nuclei segmentation...") # else: # print("Not showing segmentation process") nuclei_seg, color_dict = cytof_nuclei_segmentation(nuclei_img, show_process=show_process, size_hole=sz_hole, size_obj=sz_obj, fg_marker_dilate=fg_marker_dilate, bg_marker_dilate=bg_marker_dilate, min_distance=min_distance) membrane_img = self.image[..., self.channels.index('membrane')] \ if (use_membrane and 'membrane' in self.channels) else None if show_process: print("Cell segmentation...") cell_seg, _ = cytof_cell_segmentation(nuclei_seg, radius, membrane_channel=membrane_img, show_process=show_process, colors=color_dict) self.nuclei_seg = nuclei_seg self.cell_seg = cell_seg return nuclei_seg, cell_seg def visualize_seg(self, segtype: str = "cell", seg=None, show: bool = False, bg_label: int = 1): assert segtype in ["nuclei", "cell"], f"segtype {segtype} not supported. Accepted cell type: ['nuclei', 'cell']" # nuclei in red, membrane in green if "membrane" in self.channels: channel_ids = [self.channels.index(_) for _ in ["nuclei", "membrane"]] else: # visualize one marker channel and nuclei channel channel_ids = [self.channels.index("nuclei"), 0] if seg is None: if segtype == "cell": seg = self.cell_seg '''# membrane in red, nuclei in green channel_ids = [self.channels.index(_) for _ in ["membrane", "nuclei"]]''' else: seg = self.nuclei_seg # mark distinct membrane or nuclei boundary colors if segtype == 'cell': marked_image = visualize_segmentation(self.image, self.channels, seg, channel_ids=channel_ids, bound_color=(1, 1, 1), show=show, bg_label=bg_label) else: # marking nucleus boundaries as blue marked_image = visualize_segmentation(self.image, self.channels, seg, channel_ids=channel_ids, bound_color=(1, 1, 0), show=show, bg_label=bg_label) seg_color = 'yellow' if segtype=='nuclei' else 'white' print(f"{segtype} boundary marked by {seg_color}") return marked_image def extract_features(self, filename, use_parallel=True, show_sample=False): from cytof.utils import extract_feature # channel indices correspond to pure markers '''pattern = "\w+.*\(\w+\)" marker_idx = [i for (i,x) in enumerate(self.channels) if len(re.findall(pattern, x))>0] ''' marker_idx = [i for (i, x) in enumerate(self.channels) if x not in self.defined_channels] marker_channels = [self.channels[i] for i in marker_idx] # pure marker channels marker_image = self.image[..., marker_idx] # channel images correspond to pure markers morphology = self.morphology self.features = { "nuclei_morphology": [_ + '_nuclei' for _ in morphology], # morphology - nuclei level "cell_morphology": [_ + '_cell' for _ in morphology], # morphology - cell level "cell_sum": [_ + '_cell_sum' for _ in marker_channels], "cell_ave": [_ + '_cell_ave' for _ in marker_channels], "nuclei_sum": [_ + '_nuclei_sum' for _ in marker_channels], "nuclei_ave": [_ + '_nuclei_ave' for _ in marker_channels], } self.df_feature = extract_feature(marker_channels, marker_image, self.nuclei_seg, self.cell_seg, filename, use_parallel=use_parallel, show_sample=show_sample) def calculate_quantiles(self, qs: Union[List, int] = 75, savename: Optional[str] = None, verbose: bool = False): """ Calculate the q-quantiles of each marker with cell level summation given the q values """ qs = [qs] if isinstance(qs, int) else qs _expressions_cell_sum = [] quantiles = {} colors = cm.rainbow(np.linspace(0, 1, len(qs))) for feature_name in self.features["cell_sum"]: # all cell sum features except for nuclei_cell_sum and membrane_cell_sum if feature_name.startswith("nuclei") or feature_name.startswith("membrane"): continue _expressions_cell_sum.extend(self.df_feature[feature_name]) plt.hist(np.log2(np.array(_expressions_cell_sum) + 0.0001), 100, density=True) for q, c in zip(qs, colors): quantiles[q] = np.quantile(_expressions_cell_sum, q / 100) plt.axvline(np.log2(quantiles[q]), label=f"{q}th percentile", c=c) if verbose: print(f"{q}th percentile: {quantiles[q]}") plt.xlim(-15, 15) plt.xlabel("log2(expression of all markers)") plt.legend() if savename is not None: plt.savefig(savename) plt.show() # attach quantile dictionary to self self.dict_quantiles = quantiles print('dict quantiles:', quantiles) # return quantiles def _vis_normalization(self, savename: Optional[str] = None): """ Compare before and after normalization """ expressions = {} expressions["original"] = [] ## before normalization for key, features in self.features.items(): if key.endswith("morphology"): continue for feature_name in features: if feature_name.startswith('nuclei') or feature_name.startswith('membrane'): continue expressions["original"].extend(self.df_feature[feature_name]) log_exp = np.log2(np.array(expressions['original']) + 0.0001) plt.hist(log_exp, 100, density=True, label='before normalization') for q in self.dict_quantiles.keys(): n_attr = f"df_feature_{q}normed" expressions[f"{q}_normed"] = [] for key, features in self.features.items(): if key.endswith("morphology"): continue for feature_name in features: if feature_name.startswith('nuclei') or feature_name.startswith('membrane'): continue expressions[f"{q}_normed"].extend(getattr(self, n_attr)[feature_name]) plt.hist(expressions[f"{q}_normed"], 100, density=True, label=f"after {q}th percentile normalization") plt.legend() plt.xlabel('log2(expressions of all markers)') plt.ylabel('Frequency') if savename is not None: plt.savefig(savename) plt.show() return expressions def feature_quantile_normalization(self, qs: Union[List[int], int] = 75, vis_compare: bool = True, savedir: Optional[str] = None): """ Normalize all features with given quantiles except for morphology features Args: qs: value (int) or values (list of int) of for q-th percentile normalization vis_compare: a boolean flag indicating whether or not visualize comparison before and after normalization (default=True) savedir: saving directory for comparison and percentiles; if not None, visualizations of percentiles and comparison before and after normalization will be saved in savedir (default=None) """ qs = [qs] if isinstance(qs, int) else qs if savedir is not None: savename_quantile = os.path.join(savedir, "{}_{}_percentiles.png".format(self.slide, self.roi)) savename_compare = os.path.join(savedir, "{}_{}_comparison.png".format(self.slide, self.roi)) else: savename_quantile, savename_compare = None, None self.calculate_quantiles(qs, savename=savename_quantile) for q, quantile_val in self.dict_quantiles.items(): n_attr = f"df_feature_{q}normed" # attribute name log_normed = copy.deepcopy(self.df_feature) for key, features in self.features.items(): if key.endswith("morphology"): continue for feature_name in features: if feature_name.startswith("nuclei") or feature_name.startswith("membrane"): continue # log-quantile normalization log_normed.loc[:, feature_name] = np.log2(log_normed.loc[:, feature_name] / quantile_val + 0.0001) setattr(self, n_attr, log_normed) if vis_compare: _ = self._vis_normalization(savename=savename_compare) def save_channel_images(self, savedir: str, channels: Optional[List] = None, ext: str = ".png", quantile_norm: int = 99): """ Save channel images """ if channels is not None: if not all([cl in self.channels for cl in channels]): print("At least one of the channels not available, saving all channels instead!") channels = self.channels else: channels = self.channels '''assert all([x.lower() in channels_temp for x in channels]), "Not all provided channels are available!"''' for chn in channels: savename = os.path.join(savedir, f"{chn}{ext}") # i = channels_temp.index(chn.lower()) i = self.channels.index(chn) im_temp = self.image[..., i] quantile_temp = np.quantile(im_temp, quantile_norm / 100) \ if np.quantile(im_temp, quantile_norm / 100) != 0 else 1 im_temp_ = np.clip(im_temp / quantile_temp, 0, 1) save_multi_channel_img((im_temp_ * 255).astype(np.uint8), savename) def marker_positive(self, feature_type: str = "normed", accumul_type: str = "sum", normq: int = 75): assert feature_type in ["original", "normed", "scaled"], 'accepted feature types are "original", "normed", "scaled"' if feature_type == "original": feat_name = "" elif feature_type == "normed": feat_name = f"_{normq}normed" else: feat_name = f"_{normq}normed_scaled" n_attr = f"df_feature{feat_name}" # class attribute name for feature table count_attr = f"cell_count{feat_name}_{accumul_type}" # class attribute name for feature summary table df_feat = getattr(self, n_attr) df_thres = getattr(self, count_attr) thresholds_cell_marker = dict((x, y) for (x, y) in zip(df_thres["feature"], df_thres["threshold"])) columns = ["id"] + [marker for marker in self.markers] df_marker_positive = pd.DataFrame(columns=columns, data=np.zeros((len(df_feat), len(self.markers) + 1), type=np.int32)) df_marker_positive["id"] = df_feat["id"] for im, marker in enumerate(self.markers): channel_ = f"{self.channels[im]}_cell_{accumul_type}" df_marker_positive.loc[df_feat[channel_] > thresholds_cell_marker[channel_], marker] = 1 setattr(self, f"df_marker_positive{feat_name}", df_marker_positive) def marker_positive_summary(self, thresholds: Dict, feat_type: str = "normed", normq: int = 75, accumul_type: str = "sum" ): """ Generate marker positive summary for CytofImage: Output rendered: f"cell_count_{feat_name}_{aggre}" and f"marker_positive_{feat_name}_{aggre}" """ assert feat_type in ["normed_scaled", "normed", ""], f"feature type {feat_type} not supported!" feat_name = f"{feat_type}" if feat_type=="" else f"{normq}{feat_type}" # the attribute name to achieve from cytof_img n_attr = f"df_feature{feat_name}" if feat_type=="" else f"df_feature_{feat_name}" # the attribute name to achieve from cytof_img df_thres = pd.DataFrame({"feature": thresholds.keys(), "threshold": thresholds.values()}) df_marker_pos_sum = getattr(self, n_attr).copy() keep_feat_set = f"cell_{accumul_type}" for key, feat_set in getattr(self, "features").items(): if key == keep_feat_set: marker_set = self.markers df_marker_pos_sum_ = df_marker_pos_sum[feat_set].copy().transpose() comp_cols = list(df_marker_pos_sum_.columns) df_marker_pos_sum_.reset_index(names='feature', inplace=True) merged = df_marker_pos_sum_.merge(df_thres, on="feature", how="left") df_temp = merged[comp_cols].ge(merged["threshold"], axis=0) df_temp.index = merged['feature'] df_marker_pos_sum[feat_set] = df_temp.transpose()[feat_set] map_rename = dict((k, v) for (k,v) in zip(feat_set, marker_set)) df_marker_pos_sum.rename(columns=map_rename, inplace=True) else: df_marker_pos_sum.drop(columns=feat_set, inplace=True) df_thres['total number'] = df_temp.count(axis=1).values df_thres['positive counts'] = df_temp.sum(axis=1).values df_thres['positive ratio'] = df_thres['positive counts'] / df_thres['total number'] attr_cell_count = f"cell_count_{feat_name}_{accumul_type}" attr_marker_pos = f"df_marker_positive_{feat_name}_{accumul_type}" setattr(self, attr_cell_count, df_thres) setattr(self, attr_marker_pos, df_marker_pos_sum) return f"{feat_name}_{accumul_type}" def visualize_marker_positive(self, marker: str, feature_type: str, accumul_type: str = "sum", normq: int = 99, show_boundary: bool = True, color_list: List[Tuple] = [(0,0,1), (0,1,0)], # negative, positive color_bound: Tuple = (0,0,0), show_colortable: bool=False ): assert feature_type in ["original", "normed", "scaled"], 'accepted feature types are "original", "normed", "scaled"' if feature_type == "original": feat_name = "" elif feature_type == "normed": feat_name = f"_{normq}normed" else: feat_name = f"_{normq}normed_scaled" # self.marker_positive(feature_type=feature_type, accumul_type=accumul_type, normq=normq) df_marker_positive_original = getattr(self, f"df_marker_positive{feat_name}_{accumul_type}") df_marker_positive = df_marker_positive_original.copy() # exclude the channels accordingly if 'membrane' in self.channels: channels_wo_special = self.channels[:-2] # excludes nuclei and membrane channel else: channels_wo_special = self.channels[:-1] # excludes nuclei channel only # the original four location info + marker/channel names reconstructed_marker_channel = ['filename', 'id', 'coordinate_x', 'coordinate_y'] + channels_wo_special assert len(reconstructed_marker_channel) == len(df_marker_positive_original.columns) df_marker_positive.columns = reconstructed_marker_channel color_dict = dict((key, v) for (key, v) in zip(['negative', 'positive'], color_list)) if show_colortable: show_color_table(color_dict=color_dict, title="color dictionary", emptycols=3) color_ids = [] stain_nuclei = np.zeros((self.nuclei_seg.shape[0], self.nuclei_seg.shape[1], 3)) + 1 for i in range(2, np.max(self.nuclei_seg) + 1): color_id = df_marker_positive[marker][df_marker_positive['id'] == i].values[0] if color_id not in color_ids: color_ids.append(color_id) stain_nuclei[self.nuclei_seg == i] = color_list[color_id][:3] # add boundary if show_boundary: stain_nuclei = mark_boundaries(stain_nuclei, self.nuclei_seg, mode="inner", color=color_bound) # stained Cell image stain_cell = np.zeros((self.cell_seg.shape[0], self.cell_seg.shape[1], 3)) + 1 for i in range(2, np.max(self.cell_seg) + 1): color_id = df_marker_positive[marker][df_marker_positive['id'] == i].values[0] stain_cell[self.cell_seg == i] = color_list[color_id][:3] if show_boundary: stain_cell = mark_boundaries(stain_cell, self.cell_seg, mode="inner", color=color_bound) return stain_nuclei, stain_cell, color_dict def visualize_pheno(self, key_pheno: str, color_dict: Optional[dict] = None, show: bool = False, show_colortable: bool = False): assert key_pheno in self.phenograph, "Pheno-Graph with {} not available!".format(key_pheno) phenograph = self.phenograph[key_pheno] communities = phenograph['communities'] # phenograph clustering community IDs seg_id = self.df_feature['id'] # nuclei / cell segmentation IDs if color_dict is None: color_dict = dict((_, plt.cm.get_cmap('tab20').colors[_ % 20]) \ for _ in np.unique(communities)) # rgba_colors = np.array([color_dict[_] for _ in communities]) if show_colortable: show_color_table(color_dict=color_dict, title="phenograph clusters", emptycols=3, dpi=60) # Create image with nuclei / cells stained by PhenoGraph clustering output # stain rule: same color for same cluster, stain nuclei stain_nuclei = np.zeros((self.nuclei_seg.shape[0], self.nuclei_seg.shape[1], 3)) + 1 stain_cell = np.zeros((self.cell_seg.shape[0], self.cell_seg.shape[1], 3)) + 1 for i in range(2, np.max(self.nuclei_seg) + 1): commu_id = communities[seg_id == i][0] stain_nuclei[self.nuclei_seg == i] = color_dict[commu_id] # rgba_colors[communities[seg_id == i]][:3] # stain_cell[self.cell_seg == i] = color_dict[commu_id] # rgba_colors[communities[seg_id == i]][:3] # if show: fig, axs = plt.subplots(1, 2, figsize=(16, 8)) axs[0].imshow(stain_nuclei) axs[1].imshow(stain_cell) return stain_nuclei, stain_cell, color_dict def get_binary_pos_express_df(self, feature_name, accumul_type): """ returns a dataframe in the form marker1, marker2, ... vs. cell1, cell2; indicating whether each cell is positively expressed in each marker """ df_feature_name = f"df_feature_{feature_name}" # get the feature extraction result df_feature = getattr(self , df_feature_name) # select only markers with desired accumulation type marker_col_all = [x for x in df_feature.columns if f"cell_{accumul_type}" in x] # subset feature df_feature_of_interst = df_feature[marker_col_all] # reports each marker's threshold to be considered positively expressed, number of positive cells, etc df_cell_count_info = getattr(self, f"cell_count_{feature_name}_{accumul_type}") thresholds = df_cell_count_info.threshold # returns a binary dataframe of whether each cell at each marker passes the positive threshold df_binary_pos_exp = df_feature_of_interst.apply(lambda column: apply_threshold_to_column(column, threshold=thresholds[df_feature_of_interst.columns.get_loc(column.name)])) return df_binary_pos_exp def roi_co_expression(self, feature_name, accumul_type, return_components=False): """ Performs the co-expression analysis at the single ROI level. Can return components for cohort analysis if needed """ from itertools import product # returns a binary dataframe of whether each cell at each marker passes the positive threshold df_binary_pos_exp = self.get_binary_pos_express_df(feature_name, accumul_type) n_cells, n_markers = df_binary_pos_exp.shape df_pos_exp_val = df_binary_pos_exp.values # list all pair-wise combinations of the markers column_combinations = list(product(range(n_markers), repeat=2)) # step to the numerator of the log odds ratio co_positive_count_matrix = np.zeros((n_markers, n_markers)) # step to the denominator of the log odds ratio expected_count_matrix = np.zeros((n_markers, n_markers)) for combo in column_combinations: marker1, marker2 = combo # count cells that positively expresses in both marker 1 and 2 positive_prob_marker1_and_2 = np.sum(np.logical_and(df_pos_exp_val[:, marker1], df_pos_exp_val[:, marker2])) co_positive_count_matrix[marker1, marker2] = positive_prob_marker1_and_2 # pair (A,B) counts is the same as pair (B,A) counts co_positive_count_matrix[marker2, marker1] = positive_prob_marker1_and_2 # count expected cells if marker 1 and 2 are independently expressed # p(A and B) = p(A) * p(B) = num_pos_a * num_pos_b / (num_cells * num_cells) # p(A) = number of positive cells / number of cells exp_prob_in_marker1_and_2 = np.sum(df_pos_exp_val[:, marker1]) * np.sum(df_pos_exp_val[:, marker2]) expected_count_matrix[marker1, marker2] = exp_prob_in_marker1_and_2 expected_count_matrix[marker2, marker1] = exp_prob_in_marker1_and_2 # theta(i_pos and j_pos) df_co_pos = pd.DataFrame(co_positive_count_matrix, index=df_binary_pos_exp.columns, columns=df_binary_pos_exp.columns) # E(x) df_expected = pd.DataFrame(expected_count_matrix, index=df_binary_pos_exp.columns, columns=df_binary_pos_exp.columns) if return_components: # hold off on calculating probabilites. Need the components from other ROIs to calculate the co-expression return df_co_pos, df_expected, n_cells # otherwise, return the probabilies df_co_pos_prob = df_co_pos / n_cells df_expected_prob = df_expected / n_cells**2 return df_co_pos_prob, df_expected_prob def roi_interaction_graphs(self, feature_name, accumul_type, method: str = "distance", threshold=50, return_components=False): """ Performs spatial interaction at the ROI level. Finds if two positive markers are in proximity with each other. Proximity can be defined either with k-nearest neighbor or distance thresholding. Args: key_pheno: dictionary key for a specific phenograph output method: method to construct the adjacency matrix, choose from "distance" and "kneighbor" threshold: either the number of neighbors or euclidean distance to qualify as neighborhood pairs. Default is 50 for distance and 20 for k-neighbor. **kwargs: used to specify distance threshold (thres) for "distance" method or number of neighbors (k) for "kneighbor" method Output: network: (dict) ROI level network that will be used for cluster interaction analysis """ assert method in ["distance", "k-neighbor"], "Method can be either 'distance' or 'k-neighbor'!" print(f'Calculating spatial interaction with method "{method}" and threshold at {threshold}') df_feature_name = f"df_feature_{feature_name}" # get the feature extraction result df_feature = getattr(self , df_feature_name) # select only markers with desired accumulation type marker_col_all = [x for x in df_feature.columns if f"cell_{accumul_type}" in x] # subset feature df_feature_of_interst = df_feature[marker_col_all] n_cells, n_markers = df_feature_of_interst.shape networks = {} if method == "distance": dist = DistanceMetric.get_metric('euclidean') neighbor_matrix = dist.pairwise(df_feature.loc[:, ['coordinate_x', 'coordinate_y']].values) # returns nonzero elements of the matrix # ref: https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.find.html I, J, V = sp.find(neighbor_matrix) # finds index of values less than the distance threshold v_keep_index = V < threshold elif method == "k-neighbor": neighbor_matrix = skgraph(np.array(df_feature.loc[:, ['coordinate_x', 'coordinate_y']]), n_neighbors=threshold, mode='distance') # returns nonzero elements of the matrix # ref: https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.find.html I, J, V = sp.find(neighbor_matrix) v_keep_index = V > 0 # any non-zero distance neighbor qualifies # finds index of values less than the distance threshold i_keep, j_keep = I[v_keep_index], J[v_keep_index] assert len(i_keep) == len(j_keep) # these are paired indexes for the cell. must equal in length. n_neighbor_pairs = len(i_keep) # (i,j) now tells you the index of the two cells that are in close proximity (within {thres} distance of each other) # now we need a list that tells you the positive expressed marker index in each cell # returns a binary dataframe of whether each cell at each marker passes the positive threshold df_binary_pos_exp = self.get_binary_pos_express_df(feature_name, accumul_type) df_pos_exp_val = df_binary_pos_exp.values # convert to matrix operation # cell-marker positive list, 1-D. len = n_cells. Each element indicates the positively expressed marker of that cell index # only wants where the x condition is True. x refers to the docs x, not the actual array direction # ref: https://numpy.org/doc/stable/reference/generated/numpy.where.html cell_marker_pos_list = [np.where(cell)[0] for cell in df_pos_exp_val] cell_interaction_in_markers_counts = np.zeros((n_markers, n_markers)) # used to calculate E(x) expected_marker_count_1d = np.zeros(n_markers) # go through each close proxmity cell pair for i, j in zip(i_keep, j_keep): # locate the cell via index, then marker_index_neighbor_pair1 = cell_marker_pos_list[i] marker_index_neighbor_pair2 = cell_marker_pos_list[j] # within each neighbor pair (i.e. pairs of cells) contains the positively expressed markers index in that cell # the product of these markers index from each cell indicates interaction pair marker_matrix_update_coords = list(product(marker_index_neighbor_pair1, marker_index_neighbor_pair2)) # update the counts between each marker interaction pair # example coords: (pos_marker_index_in_cell1, pos_marker_index_in_cell2) for coords in marker_matrix_update_coords: cell_interaction_in_markers_counts[coords] += 1 # find the marker index that appeared in both pairs of the neighbor cells markers_index_both_neighbor_pair = np.union1d(marker_index_neighbor_pair1, marker_index_neighbor_pair2) expected_marker_count_1d[markers_index_both_neighbor_pair] += 1 # increase the markers that appears in either neighborhood pair # expected counts # expected_marker_count_1d = np.sum(df_pos_exp_val, axis=0) # ref: https://numpy.org/doc/stable/reference/generated/numpy.outer.html expected_counts = np.outer(expected_marker_count_1d, expected_marker_count_1d) # expected and observed needs to match dimension to perform element-wise operation assert expected_counts.shape == cell_interaction_in_markers_counts.shape df_expected_counts = pd.DataFrame(expected_counts, index=df_feature_of_interst.columns, columns=df_feature_of_interst.columns) df_cell_interaction_counts = pd.DataFrame(cell_interaction_in_markers_counts, index=df_feature_of_interst.columns, columns=df_feature_of_interst.columns) if return_components: return df_expected_counts, df_cell_interaction_counts, n_neighbor_pairs # calculates percentage within function if not return compoenents # df_expected_prob = df_expected_counts / n_cells**2 df_expected_prob = df_expected_counts / n_neighbor_pairs**2 # theta(i_pos and j_pos) df_cell_interaction_prob = df_cell_interaction_counts / n_neighbor_pairs return df_expected_prob, df_cell_interaction_prob class CytofImageTiff(CytofImage): """ CytofImage for Tiff images, inherit from Cytofimage """ def __init__(self, image, slide="", roi="", filename=""): self.image = image self.markers = None # markers self.labels = None # labels self.slide = slide self.roi = roi self.filename = filename self.channels = None # ["{}({})".format(marker, label) for (marker, label) in zip(self.markers, self.labels)] def copy(self): ''' Creates a deep copy of the current CytofImageTIFF object and return it ''' new_instance = type(self)(self.image.copy(), self.slide, self.roi, self.filename) new_instance.markers = copy.deepcopy(self.markers) new_instance.labels = copy.deepcopy(self.labels) new_instance.channels = copy.deepcopy(self.channels) return new_instance def quality_control(self, thres: int = 50) -> None: setattr(self, "keep", False) if any([x < thres for x in self.image.shape]): print(f"At least one dimension of the image {self.slide}-{self.roi} is smaller than {thres}, \ hence exclude from analyzing" ) self.keep = False def set_channels(self, markers: List, labels: List): self.markers = markers self.labels = labels self.channels = ["{}({})".format(marker, label) for (marker, label) in zip(self.markers, self.labels)] def set_markers(self, markers: list, labels: list, channels: Optional[list] = None ): """This deprecates set_channels """ self.raw_markers = markers self.raw_labels = labels if channels is not None: self.raw_channels = channels else: self.raw_channels = [f"{marker}-{label}" for (marker, label) in zip(markers, labels)] self.channels = self.raw_channels.copy() self.markers = self.raw_markers.copy() self.labels = self.raw_labels.copy() def check_channels(self, channels: Optional[List] = None, xlim: Optional[List] = None, ylim: Optional[List] = None, ncols: int = 5, vis_q: int = 0.9, colorbar: bool = False, savedir: Optional[str] = None, savename: str = "check_channels"): """ xlim = a list of 2 numbers indicating the ylimits to show image (default=None) ylim = a list of 2 numbers indicating the ylimits to show image (default=None) ncols = number of subplots per row (default=5) vis_q = percentile q used to normalize image before visualization (default=0.9) """ show = True if savedir is None else False if channels is not None: if not all([cl in self.channels for cl in channels]): print("At least one of the channels not available, visualizing all channels instead!") channels = None if channels is None: # if no desired channels specified, check all channels channels = self.channels if len(channels) <= ncols: ax_nrow = 1 ax_ncol = len(channels) else: ax_ncol = ncols ax_nrow = int(np.ceil(len(channels) / ncols)) fig, axes = plt.subplots(ax_nrow, ax_ncol, figsize=(3 * ax_ncol, 3 * ax_nrow)) # fig, axes = plt.subplots(ax_nrow, ax_ncol) if ax_nrow == 1: axes = np.array([axes]) if ax_ncol == 1: axes = np.expand_dims(axes, axis=1) for i, _ in enumerate(channels): _ax_nrow = int(np.floor(i / ax_ncol)) _ax_ncol = i % ax_ncol _i = self.channels.index(_) image = self.image[..., _i] percentile_q = np.quantile(image, vis_q) if np.quantile(image, vis_q) != 0 else 1 image = np.clip(image / percentile_q, 0, 1) axes[_ax_nrow, _ax_ncol].set_title(_) if xlim is not None: image = image[:, xlim[0]:xlim[1]] if ylim is not None: image = image[ylim[0]:ylim[1], :] im = axes[_ax_nrow, _ax_ncol].imshow(image, cmap="gray") if colorbar: fig.colorbar(im, ax=axes[_ax_nrow, _ax_ncol]) plt.tight_layout(pad=1.2) # axes.axis('scaled') if show: plt.show() else: # plt.savefig(os.path.join(savedir, f"{savename}.png")) return fig def remove_special_channels(self, channels: List): for channel in channels: if channel not in self.channels: print("Channel {} not available, escaping...".format(channel)) continue idx = self.channels.index(channel) self.channels.pop(idx) self.markers.pop(idx) self.labels.pop(idx) self.image = np.delete(self.image, idx, axis=2) if hasattr(self, "df"): self.df.drop(columns=channel, inplace=True) def define_special_channels( self, channels_dict: Dict, q: float = 0.95, overwrite: bool = False, verbose: bool = False, rm_key: str = 'nuclei'): channels_rm = [] # new_name is the key from channels_dict, old_names contains a list of existing channel names for new_name, old_names in channels_dict.items(): if len(old_names) == 0: continue if new_name in self.channels and (not overwrite): print("Warning: {} is already present, skipping...".format(new_name)) continue if new_name in self.channels and overwrite: print("Warning: {} is already present, overwriting...".format(new_name)) idx = self.channels.index(new_name) self.image = np.delete(self.image, idx, axis=2) self.channels.pop(idx) old_nms = [] for i, old_name in enumerate(old_names): if old_name not in self.channels: # warnings.warn('{} is not available!'.format(old_name['marker_name'])) warnings.warn('{} is not available!'.format(old_name)) continue old_nms.append(old_name) if verbose: print("Defining channel '{}' by summing up channels: {}.".format(new_name, ', '.join(old_nms))) if len(old_nms) > 0: # only add channels to removal list if matching remove key if new_name == rm_key: channels_rm += old_nms for i, old_name in enumerate(old_nms): _i = self.channels.index(old_name) _image = self.image[..., _i] percentile_q = np.quantile(_image, q) if np.quantile(_image, q) != 0 else 1 _image = np.clip(_image / percentile_q, 0, 1) # quantile normalization if i == 0: image = _image else: image += _image if verbose: print(f"Original image shape: {self.image.shape}") self.image = np.dstack([self.image, image[:, :, None]]) if verbose: print(f"Image shape after defining special channel(s) {self.image.shape}") if new_name not in self.channels: self.channels.append(new_name) if hasattr(self, "defined_channels"): for key in channels_dict.keys(): self.defined_channels.add(key) else: setattr(self, "defined_channels", set(list(channels_dict.keys()))) return channels_rm # Define a function to apply the threshold and convert to binary def apply_threshold_to_column(column, threshold): """ Apply a threshold to a column of data and convert it to binary. @param column: The input column of data to be thresholded. @param threshold: The threshold value to compare the elements in the column. @return: A binary array where True represents values meeting or exceeding the threshold, and False represents values below the threshold. """ return (column >= threshold) class CytofCohort(): def __init__(self, cytof_images: Optional[dict] = None, df_cohort: Optional[pd.DataFrame] = None, dir_out: str = "./", cohort_name: str = "cohort1"): """ cytof_images: df_cohort: Slide | ROI | input file """ self.cytof_images = cytof_images or {} self.df_cohort = df_cohort# or None# pd.read_csv(file_cohort) # the slide-ROI self.feat_sets = { "all": ["cell_sum", "cell_ave", "cell_morphology"], "cell_sum": ["cell_sum", "cell_morphology"], "cell_ave": ["cell_ave", "cell_morphology"], "cell_sum_only": ["cell_sum"], "cell_ave_only": ["cell_ave"] } self.name = cohort_name self.dir_out = os.path.join(dir_out, self.name) if not os.path.exists(self.dir_out): os.makedirs(self.dir_out) def __getitem__(self, key): 'Extracts a particular cytof image from the cohort' return self.cytof_images[key] def __str__(self): return f"CytofCohort {self.name}" def __repr__(self): return f"CytofCohort(name={self.name})" def save_cytof_cohort(self, savename): directory = os.path.dirname(savename) if not os.path.exists(directory): os.makedirs(directory) pkl.dump(self, open(savename, "wb")) def batch_process_feature(self): """ Batch process: if the CytofCohort is initialized by a dictionary of CytofImages """ slides, rois, fs_input = [], [], [] for n, cytof_img in self.cytof_images.items(): if not hasattr(self, "dict_feat"): setattr(self, "dict_feat", cytof_img.features) if not hasattr(self, "markers"): setattr(self, "markers", cytof_img.markers) print('dict quantiles in batch process:', cytof_img.dict_quantiles) try: qs &= set(list(cytof_img.dict_quantiles.keys())) except: qs = set(list(cytof_img.dict_quantiles.keys())) slides.append(cytof_img.slide) rois.append(cytof_img.roi) fs_input.append(cytof_img.filename) #df_feature['filename'].unique()[0]) setattr(self, "normqs", qs) # scale feature (in a batch) df_scale_params = self.scale_feature() setattr(self, "df_scale_params", df_scale_params) if self.df_cohort is None: self.df_cohort = pd.DataFrame({"Slide": slides, "ROI": rois, "input file": fs_input}) def batch_process(self, params: Dict): sys.path.append("../CLIscripts") from process_single_roi import process_single, SetParameters for i, (slide, roi, fname) in self.df_cohort.iterrows(): paramsi = SetParameters(filename=fname, outdir=self.dir_out, label_marker_file=params.get('label_marker_file', None), slide=slide, roi=roi, quality_control_thres=params.get("quality_control_thres", 50), channels_remove=params.get("channels_remove", None), channels_dict=params.get("channels_dict", None), use_membrane=params.get("use_membrane",True), cell_radius=params.get("cell_radius", 5), normalize_qs=params.get("normalize_qs", 75), iltype=params.get('iltype', None)) cytof_img = process_single(paramsi, downstream_analysis=False, verbose=False) self.cytof_images[f"{slide}_{roi}"] = cytof_img self.batch_process_feature() def get_feature(self, normq: int = 75, feat_type: str = "normed_scaled", verbose: bool = False): """ Get a specific set of feature for the cohort The set is defined by `normq` and `feat_type` """ assert feat_type in ["normed_scaled", "normed", ""], f"feature type {feat_type} not supported!" if feat_type != "" and not hasattr(self, "df_feature"): orig_dfs = {} for f_roi, cytof_img in self.cytof_images.items(): orig_dfs[f_roi] = getattr(cytof_img, "df_feature") setattr(self, "df_feature", pd.concat([_ for key, _ in orig_dfs.items()]).reset_index(drop=True)) feat_name = feat_type if feat_type=="" else f"_{normq}{feat_type}" n_attr = f"df_feature{feat_name}" dfs = {} for f_roi, cytof_img in self.cytof_images.items(): dfs[f_roi] = getattr(cytof_img, n_attr) setattr(self, n_attr, pd.concat([_ for key, _ in dfs.items()]).reset_index(drop=True)) if verbose: print("The attribute name of the feature: {}".format(n_attr)) def scale_feature(self): """Scale features for all normalization q values""" cytof_img = list(self.cytof_images.values())[0] # features to be scaled s_features = [col for key, features in cytof_img.features.items() \ for f in features \ for col in cytof_img.df_feature.columns if col.startswith(f)] for normq in self.normqs: n_attr = f"df_feature_{normq}normed" n_attr_scaled = f"df_feature_{normq}normed_scaled" if not hasattr(self, n_attr): self.get_feature(normq=normq, feat_type="normed") df_feature = getattr(self, n_attr) # calculate scaling parameters df_scale_params = df_feature[s_features].mean().to_frame(name="mean").transpose() df_scale_params = pd.concat([df_scale_params, df_feature[s_features].std().to_frame(name="std").transpose()]) # m = df_scale_params[df_scale_params.columns].iloc[0] # mean s = df_scale_params[df_scale_params.columns].iloc[1] # std.dev df_feature_scale = copy.deepcopy(df_feature) assert len([x for x in df_scale_params.columns if x not in df_scale_params.columns]) == 0 # scale df_feature_scale[df_scale_params.columns] = (df_feature_scale[df_scale_params.columns] - m) / s setattr(self, n_attr_scaled, df_feature_scale) return df_scale_params def _get_feature_subset(self, normq: int = 75, feat_type: str = "normed_scaled", feat_set: str = "all", markers: str = "all", verbose: bool = False): assert feat_type in ["normed_scaled", "normed", ""], f"feature type {feat_type} not supported!" assert (markers == "all" or isinstance(markers, list)) assert feat_set in self.feat_sets.keys(), f"feature set {feat_set} not supported!" description = "original" if feat_type=="" else f"{normq}{feat_type}" n_attr = f"df_feature{feat_type}" if feat_type=="" else f"df_feature_{normq}{feat_type}" # the attribute name to achieve from cytof_img if not hasattr(self, n_attr): self.get_feature(normq, feat_type) if verbose: print("\nThe attribute name of the feature: {}".format(n_attr)) feat_names = [] # a list of feature names for y in self.feat_sets[feat_set]: if "morphology" in y: feat_names += self.dict_feat[y] else: if markers == "all": # features extracted from all markers are kept feat_names += self.dict_feat[y] markers = self.markers else: # only features correspond to markers kept (markers are a subset of self.markers) ids = [self.markers.index(x) for x in markers] # TODO: the case where marker in markers not in self.markers??? feat_names += [self.dict_feat[y][x] for x in ids] df_feature = getattr(self, n_attr)[feat_names] return df_feature, markers, feat_names, description, n_attr ############################################################### ################## PhenoGraph Clustering ###################### ############################################################### def clustering_phenograph(self, normq:int = 75, feat_type:str = "normed_scaled", feat_set: str = "all", pheno_markers: Union[str, List] = "all", k: int = None, save_vis: bool = False, verbose:bool = True): if pheno_markers == "all": pheno_markers_ = "_all" else: pheno_markers_ = "_subset1" assert feat_type in ["normed_scaled", "normed", ""], f"feature type {feat_type} not supported!" df_feature, pheno_markers, feat_names, description, n_attr = self._get_feature_subset(normq=normq, feat_type=feat_type, feat_set=feat_set, markers=pheno_markers, verbose=verbose) # set number of nearest neighbors k and run PhenoGraph for phenotype clustering k = k if k else int(df_feature.shape[0] / 100) if k < 10: k = min(df_feature.shape[0]-1, 10) # perform k-means algorithm for small k kmeans = KMeans(n_clusters=k, random_state=42).fit(df_feature) communities = kmeans.labels_ else: communities, graph, Q = phenograph.cluster(df_feature, k=k, n_jobs=-1) # run PhenoGraph # project to 2D using UMAP umap_2d = umap.UMAP(n_components=2, init='random', random_state=0) proj_2d = umap_2d.fit_transform(df_feature) if not hasattr(self, "phenograph"): setattr(self, "phenograph", {}) key_pheno = f"{description}_{feat_set}_feature_{k}" key_pheno += f"{pheno_markers_}_markers" N = len(np.unique(communities)) self.phenograph[key_pheno] = { "data": df_feature, "markers": pheno_markers, "features": feat_names, "description": {"normalization": description, "feature_set": feat_set}, # normalization and/or scaling | set of feature (in self.feat_sets) "communities": communities, "proj_2d": proj_2d, "N": N, "feat_attr": n_attr } if verbose: print(f"\n{N} communities found. The dictionary key for phenograph: {key_pheno}.") return key_pheno def _gather_roi_pheno(self, key_pheno): """Split whole df into df for each ROI""" df_slide_roi = self.df_cohort pheno_out = self.phenograph[key_pheno] df_feat_all = getattr(self, pheno_out['feat_attr']) # original feature (to use the slide/ roi /filename info) data df_pheno_all = pheno_out['data'] # phenograph data proj_2d_all = pheno_out['proj_2d'] communities_all = pheno_out['communities'] df_feature_roi, proj_2d_roi, communities_roi = {}, {}, {} for i in self.df_cohort.index: # Slide | ROI | input file # path_i = df_slide_roi.loc[i, "path"] roi_i = df_slide_roi.loc[i, "ROI"] f_in = df_slide_roi.loc[i, "input file"]# os.path.join(path_i, roi_i) cond = df_feat_all["filename"] == f_in df_feature_roi[roi_i] = df_pheno_all.loc[cond, :] proj_2d_roi[roi_i] = proj_2d_all[cond, :] communities_roi[roi_i] = communities_all[cond] return df_feature_roi, proj_2d_roi, communities_roi def vis_phenograph(self, key_pheno: str, level: str = "cohort", accumul_type: Union[List[str], str] = "cell_sum", # ["cell_sum", "cell_ave"] normalize: bool = False, save_vis: bool = False, show_plots: bool = False, plot_together: bool = True, fig_width: int = 5 # only when plot_together is True ): assert level.upper() in ["COHORT", "SLIDE", "ROI"], "Only 'cohort', 'slide' and 'roi' are accetable values for level" this_pheno = self.phenograph[key_pheno] feat_names = this_pheno['features'] descrip = this_pheno['description'] n_community = this_pheno['N'] markers = this_pheno['markers'] feat_set = self.feat_sets[descrip['feature_set']] if save_vis: vis_savedir = os.path.join(self.dir_out, "phenograph", key_pheno + f"-{n_community}clusters") if not os.path.exists(vis_savedir): os.makedirs(vis_savedir) else: vis_savedir = None if accumul_type is None: # by default, visualize all accumulation types accumul_type = [_ for _ in feat_set if "morphology" not in _] if isinstance(accumul_type, str): accumul_type = [accumul_type] proj_2d = this_pheno['proj_2d'] df_feature = this_pheno['data'] communities = this_pheno['communities'] if level.upper() == "COHORT": proj_2ds = {"cohort": proj_2d} df_feats = {"cohort": df_feature} commus = {"cohort": communities} else: df_feats, proj_2ds, commus = self._gather_roi_pheno(key_pheno) if level.upper() == "SLIDE": for slide in self.df_cohort["Slide"].unique(): # for each slide f_rois = [roi_i.replace(".txt", "") for roi_i in self.df_cohort.loc[self.df_cohort["Slide"] == slide, "ROI"]] df_feats[slide] = pd.concat([df_feats[f_roi] for f_roi in f_rois]) proj_2ds[slide] = np.concatenate([proj_2ds[f_roi] for f_roi in f_rois]) commus[slide] = np.concatenate([commus[f_roi] for f_roi in f_rois]) for f_roi in f_rois: df_feats.pop(f_roi) proj_2ds.pop(f_roi) commus.pop(f_roi) figs = {} # if plot_together figs_scatter = {} # if not plot_together figs_exps = {} cluster_protein_exps = {} for key, df_feature in df_feats.items(): if plot_together: ncol = len(accumul_type)+1 fig, axs = plt.subplots(1,ncol, figsize=(ncol*fig_width, fig_width)) proj_2d = proj_2ds[key] commu = commus[key] # Visualize 1: plot 2d projection together print("Visualization in 2d - {}-{}".format(level, key)) savename = os.path.join(vis_savedir, f"cluster_scatter_{level}_{key}.png") if (save_vis and not plot_together) else None ax = axs[0] if plot_together else None fig_scatter = visualize_scatter(data=proj_2d, communities=commu, n_community=n_community, title=key, savename=savename, show=show_plots, ax=ax) figs_scatter[key] = fig_scatter figs_exps[key] = {} # Visualize 2: protein expression for axid, acm_tpe in enumerate(accumul_type): ids = [i for (i, x) in enumerate(feat_names) if re.search(".{}".format(acm_tpe), x)] feat_names_ = [feat_names[i] for i in ids] cluster_protein_exp = np.zeros((n_community, len(markers))) group_ids = np.arange(len(np.unique(communities))) for cluster in range(len(np.unique(communities))): # for each (global) community df_sub = df_feature.loc[commu == cluster] if df_sub.shape[0] == 0: group_ids = np.delete(group_ids, group_ids == cluster) continue # number of markers should match # of features extracted. for i, feat in enumerate(feat_names_): cluster_protein_exp[cluster, i] = np.average(df_sub[feat]) # get rid of non-exist clusters '''cluster_protein_exp = cluster_protein_exp[group_ids, :]''' if normalize: cluster_protein_exp_norm = cluster_protein_exp - np.median(cluster_protein_exp, axis=0) # or set non-exist cluster to be inf rid = set(np.arange(len(np.unique(communities)))) - set(group_ids) if len(rid) > 0: rid = np.array(list(rid)) cluster_protein_exp_norm[rid, :] = np.nan group_ids = np.arange(len(np.unique(communities))) savename = os.path.join(vis_savedir, f"protein_expression_{level}_{acm_tpe}_{key}.png") \ if (save_vis and not plot_together) else None vis_exp = cluster_protein_exp_norm if normalize else cluster_protein_exp ax = axs[axid+1] if plot_together else None fig_exps = visualize_expression(data=vis_exp, markers=markers, group_ids=group_ids, title="{} - {}-{}".format(level, acm_tpe, key), savename=savename, show=show_plots, ax=ax) figs_exps[key][acm_tpe] = fig_exps cluster_protein_exps[key] = vis_exp plt.tight_layout() if plot_together: figs[key] = fig if save_vis: plt.savefig(os.path.join(vis_savedir, f"phenograph_{level}_{acm_tpe}_{key}.png"), dpi=300) if show_plots: plt.show() if not show_plots: plt.close("all") return df_feats, commus, cluster_protein_exps, figs, figs_scatter, figs_exps def attach_individual_roi_pheno(self, key_pheno, override=False): """ Attach PhenoGraph outputs to each individual CytofImage (roi) and update each saved CytofImage """ assert key_pheno in self.phenograph.keys(), "Pheno-Graph with {} not available!".format(key_pheno) phenograph = self.phenograph[key_pheno] # data, markers, features, description, communities, proj_2d, N for n, cytof_img in self.cytof_images.items(): if not hasattr(cytof_img, "phenograph"): setattr(cytof_img, "phenograph", {}) if key_pheno in cytof_img.phenograph and not override: print("\n{} already attached for {}-{}, skipping ... ".format(key_pheno, cytof_img.slide, cytof_img.roi)) continue cond = self.df_feature['filename'] == cytof_img.filename # cytof_img.filename: original file name data = phenograph['data'].loc[cond, :] communities = phenograph['communities'][cond.values] proj_2d = phenograph['proj_2d'][cond.values] # phenograph for this image this_phenograph = {"data": data, "markers": phenograph["markers"], "features": phenograph["features"], "description": phenograph["description"], "communities": communities, "proj_2d": proj_2d, "N": phenograph["N"] } cytof_img.phenograph[key_pheno] = this_phenograph def _gather_roi_kneighbor_graphs(self, key_pheno: str, method: str = "distance", **kwars: dict) -> dict: """ Define adjacency community for each cell based on either k-nearest neighbor or distance Args: key_pheno: dictionary key for a specific phenograph output method: method to construct the adjacency matrix, choose from "distance" and "kneighbor" **kwargs: used to specify distance threshold (thres) for "distance" method or number of neighbors (k) for "kneighbor" method Output: network: (dict) ROI level network that will be used for cluster interaction analysis """ assert method in ["distance", "kneighbor"], "Method can be either 'distance' or 'kneighbor'!" default_thres = { "thres": 50, "k": 8 } _ = "k" if method == "kneighbor" else "thres" thres = kwars.get(_, default_thres[_]) print("{}: {}".format(_, thres)) df_pheno_feat = getattr(self, self.phenograph[key_pheno]['feat_attr']) n_cluster = self.phenograph[key_pheno]['N'] cluster = self.phenograph[key_pheno]['communities'] df_slide_roi = getattr(self, "df_cohort") networks = {} if method == "kneighbor": # construct K-neighbor graph for i, row in df_slide_roi.iterrows(): #for i in df_slide_roi.index: # Slide | ROI | input file slide, roi, f_in = row["Slide"], row["ROI"], row["input file"] cond = df_pheno_feat['filename'] == f_in if cond.sum() == 0: continue _cluster = cluster[cond.values] df_sub = df_pheno_feat.loc[cond, :] graph = skgraph(np.array(df_sub.loc[:, ['coordinate_x', 'coordinate_y']]), n_neighbors=thres, mode='distance') graph.toarray() I, J, V = sp.find(graph) networks[roi] = dict() networks[roi]['I'] = I # from cell networks[roi]['J'] = J # to cell networks[roi]['V'] = V # distance value networks[roi]['network'] = graph # Edge type summary edge_nums = np.zeros((n_cluster, n_cluster)) for _i, _j in zip(I, J): edge_nums[_cluster[_i], _cluster[_j]] += 1 networks[roi]['edge_nums'] = edge_nums expected_percentage = np.zeros((n_cluster, n_cluster)) for _i in range(n_cluster): for _j in range(n_cluster): expected_percentage[_i, _j] = sum(_cluster == _i) * sum(_cluster == _j) # / len(df_sub)**2 networks[roi]['expected_percentage'] = expected_percentage networks[roi]['num_cell'] = len(df_sub) else: # construct neighborhood matrix using distance cut-off cal_dist = DistanceMetric.get_metric('euclidean') for i, row in df_slide_roi.iterrows(): #for i in df_slide_roi.index: # Slide | ROI | input file slide, roi, f_in = row["Slide"], row["ROI"], row["input file"] cond = df_pheno_feat['filename'] == f_in if cond.sum() == 0: continue networks[roi] = dict() _cluster = cluster[cond.values] df_sub = df_pheno_feat.loc[cond, :] dist = cal_dist.pairwise(df_sub.loc[:, ['coordinate_x', 'coordinate_y']].values) networks[roi]['dist'] = dist # expected percentage expected_percentage = np.zeros((n_cluster, n_cluster)) for _i in range(n_cluster): for _j in range(n_cluster): expected_percentage[_i, _j] = sum(_cluster == _i) * sum(_cluster == _j) # / len(df_sub)**2 networks[roi]['expected_percentage'] = expected_percentage n_cells = len(df_sub) # edge num edge_nums = np.zeros_like(expected_percentage) for _i in range(n_cells): for _j in range(n_cells): if dist[_i, _j] > 0 and dist[_i, _j] < thres: edge_nums[_cluster[_i], _cluster[_j]] += 1 networks[roi]['edge_nums'] = edge_nums networks[roi]['num_cell'] = n_cells return networks def cluster_interaction_analysis(self, key_pheno, method="distance", level="slide", clustergrid=None, viz=False, **kwars): """Interaction analysis for clusters """ assert method in ["distance", "kneighbor"], "Method can be either 'distance' or 'kneighbor'!" assert level in ["slide", "roi"], "Level can be either 'slide' or 'roi'!" default_thres = { "thres": 50, "k": 8 } _ = "k" if method == "kneighbor" else "thres" thres = kwars.get(_, default_thres[_]) """print("{}: {}".format(_, thres))""" networks = self._gather_roi_kneighbor_graphs(key_pheno, method=method, **{_: thres}) if level == "slide": keys = ['edge_nums', 'expected_percentage', 'num_cell'] for slide in self.df_cohort['Slide'].unique(): cond = self.df_cohort['Slide'] == slide df_slide = self.df_cohort.loc[cond, :] rois = df_slide['ROI'].values '''keys = list(networks.values())[0].keys()''' networks[slide] = {} for key in keys: networks[slide][key] = sum([networks[roi][key] for roi in rois if roi in networks]) for roi in rois: if roi in networks: networks.pop(roi) interacts = {} epsilon = 1e-6 for key, item in networks.items(): edge_percentage = item['edge_nums'] / np.sum(item['edge_nums']) expected_percentage = item['expected_percentage'] / item['num_cell'] ** 2 # Normalize interact_norm = np.log10(edge_percentage / (expected_percentage+epsilon) + epsilon) interact_norm[interact_norm == np.log10(epsilon)] = 0 interacts[key] = interact_norm # plot for f_key, interact in interacts.items(): plt.figure(figsize=(6, 6)) ax = sns.heatmap(interact, center=np.log10(1 + epsilon), cmap='RdBu_r', vmin=-1, vmax=1) ax.set_aspect('equal') plt.title(f_key) plt.show() if clustergrid is None: plt.figure() clustergrid = sns.clustermap(interact, center=np.log10(1 + epsilon), cmap='RdBu_r', vmin=-1, vmax=1, xticklabels=np.arange(interact.shape[0]), yticklabels=np.arange(interact.shape[0]), figsize=(6, 6)) plt.title(f_key) plt.show() plt.figure() sns.clustermap(interact[clustergrid.dendrogram_row.reordered_ind, :] \ [:, clustergrid.dendrogram_row.reordered_ind], center=np.log10(1 + 0.1), cmap='RdBu_r', vmin=-1, vmax=1, xticklabels=clustergrid.dendrogram_row.reordered_ind, yticklabels=clustergrid.dendrogram_row.reordered_ind, figsize=(6, 6), row_cluster=False, col_cluster=False) plt.title(f_key) plt.show() # IMPORTANT: attch to individual ROIs self.attach_individual_roi_pheno(key_pheno, override=True) return interacts, clustergrid ############################################################### ###################### Marker Level ########################### ############################################################### def generate_summary(self, feat_type: str = "normed", normq: int = 75, vis_thres: bool = False, accumul_type: Union[List[str], str] = "sum", verbose: bool = False, get_thresholds: Callable = _get_thresholds, ) -> List: """ Generate marker positive summaries and attach to each individual CyTOF image in the cohort """ accumul_type = [accumul_type] if isinstance(accumul_type, str) else accumul_type assert feat_type in ["normed_scaled", "normed", ""], f"feature type {feat_type} not supported!" feat_name = f"{feat_type}" if feat_type=="" else f"{normq}{feat_type}" # the attribute name to achieve from cytof_img n_attr = f"df_feature{feat_name}" if feat_type=="" else f"df_feature_{feat_name}" # the attribute name to achieve from cytof_img df_feat = getattr(self, n_attr) # get thresholds thres = getattr(self, "marker_thresholds", {}) thres[f"{normq}_{feat_type}"] = {} for _ in accumul_type: # for either marker sum or marker average print(f"Getting thresholds for cell {_} of all markers.") thres[f"{normq}_{feat_type}"][f"cell_{_}"] = get_thresholds(df_feature=df_feat, features=self.dict_feat[f"cell_{_}"], visualize=vis_thres, verbose=verbose) setattr(self, "marker_thresholds", thres) # split to each ROI _attr_marker_pos, seen = [], 0 self.df_cohort['Slide_ROI'] = self.df_cohort[['Slide', 'ROI']].agg('_'.join, axis=1) for n, cytof_img in self.cytof_images.items(): # ({slide}_{roi}, CytofImage) if not hasattr(cytof_img, n_attr): # cytof_img object instance may not contain _scaled feature cond = self.df_cohort['Slide_ROI'] == n input_file = self.df_cohort.loc[self.df_cohort['Slide_ROI'] == n, 'input file'].values[0] _df_feat = df_feat.loc[df_feat['filename'] == input_file].reset_index(drop=True) setattr(cytof_img, n_attr, _df_feat) else: _df_feat = getattr(cytof_img, n_attr) for _ in accumul_type: #["sum", "ave"]: # for either marker sum or marker average accumulation attr_marker_pos = cytof_img.marker_positive_summary( thresholds=thres[f"{normq}_{feat_type}"][f"cell_{_}"], feat_type=feat_type, normq=normq, accumul_type=_ ) if seen == 0: _attr_marker_pos.append(attr_marker_pos) seen += 1 return _attr_marker_pos def co_expression_analysis(self, normq: int = 75, feat_type: str = "normed", co_exp_markers: Union[str, List] = "all", accumul_type: Union[str, List[str]] = "sum", verbose: bool = False, clustergrid=None): # parameter checks and preprocess for analysis assert feat_type in ["original", "normed", "scaled"] if feat_type == "original": feat_name = "" elif feat_type == "normed": feat_name = f"{normq}normed" else: feat_name = f"{normq}normed_scaled" # go through each roi, get their binary marker-cell expression roi_binary_express_dict = dict() for i, cytof_img in enumerate(self.cytof_images.values()): slide, roi = cytof_img.slide, cytof_img.roi df_binary_pos_exp = cytof_img.get_binary_pos_express_df(feat_name, accumul_type) roi_binary_express_dict[roi] = df_binary_pos_exp df_slide_roi = self.df_cohort # in cohort analysis, co-expression is always analyzed per Slide. # per ROI analysis can be done by calling the cytof_img individually slide_binary_express_dict = dict() # concatenate all ROIs into one, for each slide for slide in df_slide_roi["Slide"].unique(): rois_of_one_slide = df_slide_roi.loc[df_slide_roi["Slide"] == slide, "ROI"] for i, filename_roi in enumerate(rois_of_one_slide): ind_roi = filename_roi.replace('.txt', '') if ind_roi not in roi_binary_express_dict: print(f'ROI {ind_roi} in self.df_cohort, but not found in co-expression dicts') continue try: # adding to existing slide key # append dataframe row-wise, then perform co-expression analysis at the slide level slide_binary_express_dict[slide] = pd.concat([slide_binary_express_dict[slide], roi_binary_express_dict[ind_roi]], ignore_index=True) except KeyError: # # first iteration writing to slide, couldn't find the slide key slide_binary_express_dict[slide] = roi_binary_express_dict[ind_roi].copy() slide_co_expression_dict = dict() # for each slide, perform co-expression analysis for slide_key, large_binary_express in slide_binary_express_dict.items(): n_cells, n_markers = large_binary_express.shape df_pos_exp_val = large_binary_express.values # list all pair-wise combinations of the markers column_combinations = list(product(range(n_markers), repeat=2)) # step to the numerator of the log odds ratio co_positive_prob_matrix = np.zeros((n_markers, n_markers)) # step to the denominator of the log odds ratio expected_prob_matrix = np.zeros((n_markers, n_markers)) for combo in column_combinations: marker1, marker2 = combo # count cells that positively expresses in both marker 1 and 2 positive_prob_marker1_and_2 = np.sum(np.logical_and(df_pos_exp_val[:, marker1], df_pos_exp_val[:, marker2])) / n_cells co_positive_prob_matrix[marker1, marker2] = positive_prob_marker1_and_2 # pair (A,B) counts is the same as pair (B,A) counts co_positive_prob_matrix[marker2, marker1] = positive_prob_marker1_and_2 # count expected cells if marker 1 and 2 are independently expressed # p(A and B) = p(A) * p(B) = num_pos_a * num_pos_b / (num_cells * num_cells) # p(A) = number of positive cells / number of cells exp_prob_in_marker1_and_2 = np.sum(df_pos_exp_val[:, marker1]) * np.sum(df_pos_exp_val[:, marker2]) / n_cells**2 expected_prob_matrix[marker1, marker2] = exp_prob_in_marker1_and_2 expected_prob_matrix[marker2, marker1] = exp_prob_in_marker1_and_2 # theta(i_pos and j_pos) df_co_pos = pd.DataFrame(co_positive_prob_matrix, index=df_binary_pos_exp.columns, columns=df_binary_pos_exp.columns) # E(x) df_expected = pd.DataFrame(expected_prob_matrix, index=df_binary_pos_exp.columns, columns=df_binary_pos_exp.columns) epsilon = 1e-6 # avoid divide by 0 or log(0) # Normalize and fix Nan edge_percentage_norm = np.log10(df_co_pos.values / (df_expected.values+epsilon) + epsilon) # if observed/expected = 0, then log odds ratio will have log10(epsilon) # no observed means co-expression cannot be determined, does not mean strong negative co-expression edge_percentage_norm[edge_percentage_norm == np.log10(epsilon)] = 0 slide_co_expression_dict[slide_key] = (edge_percentage_norm, df_expected.columns) return slide_co_expression_dict