import os import numpy as np import pandas as pd import matplotlib.pyplot as plt import pathlib import skimage.io as skio import warnings from typing import Union, Optional, Type, Tuple, List # from readimc import MCDFile # from cytof.classes import CytofImage, CytofImageTiff import sys import platform from pathlib import Path FILE = Path(__file__).resolve() ROOT = FILE.parents[0] # cytof root directory if str(ROOT) not in sys.path: sys.path.append(str(ROOT)) # add ROOT to PATH if platform.system() != 'Windows': ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative from classes import CytofImage, CytofImageTiff # ####################### Read data ######################## def cytof_read_data_roi(filename, slide="", roi=None, iltype="hwd", **kwargs) -> Tuple[CytofImage, list]: """ Read cytof data (.txt file) as a dataframe Inputs: filename = full filename of the cytof data (path-name-ext) Returns: df_cytof = dataframe of the cytof data cols = column names of the dataframe, an empty list returned if not reading data from a dataframe :param filename: str :return df_cytof: pandas.core.frame.DataFrame """ ext = pathlib.Path(filename).suffix assert len(ext) > 0, "Please provide a full file name with extension!" assert ext.upper() in ['.TXT', '.TIFF', '.TIF', '.CSV', '.QPTIFF'], "filetypes other than '.txt', '.tiff' or '.csv' are not (yet) supported." if ext.upper() in ['.TXT', '.CSV']: # the case with a dataframe if ext.upper() == '.TXT': df_cytof = pd.read_csv(filename, sep='\t') # pd.read_table(filename) if roi is None: roi = os.path.basename(filename).split('.txt')[0] # initialize an instance of CytofImage cytof_img = CytofImage(df_cytof, slide=slide, roi=roi, filename=filename) elif ext.upper() == '.CSV': df_cytof = pd.read_csv(filename) if roi is None: roi = os.path.basename(filename).split('.csv')[0] # initialize an instance of CytofImage cytof_img = CytofImage(df_cytof, slide=slide, roi=roi, filename=filename) if "X" in kwargs and "Y" in kwargs: cytof_img.df.rename(columns={kwargs["X"]: "X", kwargs["Y"]: 'Y'}, inplace=True) cols = cytof_img.df.columns else: # the case without a dataframe image = skio.imread(filename, plugin="tifffile") orig_img_shape = image.shape sorted_shape = np.sort(orig_img_shape) # roll the sorted shape by one to the left # ref: https://numpy.org/doc/stable/reference/generated/numpy.roll.html correct_shape = np.roll(sorted_shape, -1) # sometimes tiff could be square, this ensures images were correctly transposed orig_temp = list(orig_img_shape) # tuple is immutable correct_index = [] for shape in correct_shape: correct_index.append(orig_temp.index(shape)) # placeholder, since shape can't = 0 orig_temp[orig_temp.index(shape)] = 0 image = image.transpose(correct_index) # create TIFF class cytof image cytof_img = CytofImageTiff(image, slide=slide, roi=roi, filename=filename) cols = [] return cytof_img, cols def cytof_read_data_mcd(filename, verbose=False): # slides = {} cytof_imgs = {} with MCDFile(filename) as f: if verbose: print("\n{}, \n\t{} slides, showing the 1st slide:".format(filename, len(f.slides))) ## slide for slide in f.slides: if verbose: print("\tslide ID: {}, description: {}, width: {} um, height: {}um".format( slide.id, slide.description, slide.width_um, slide.height_um) ) # slides[slide.id] = {} # read the slide image im_slide = f.read_slide(slide) # numpy array or None if verbose: print("\n\tslide image shape: {}".format(im_slide.shape)) # (optional) read the first panorama image panorama = slide.panoramas[0] if verbose: print( "\t{} panoramas, showing the 1st one. \n\tpanorama ID: {}, description: {}, width: {} um, height: {}um".format( len(slide.panoramas), panorama.id, panorama.description, panorama.width_um, panorama.height_um) ) im_pano = f.read_panorama(panorama) # numpy array if verbose: print("\n\tpanorama image shape: {}".format(im_pano.shape)) for roi in slide.acquisitions: # for each acquisition (roi) im_roi = f.read_acquisition(roi) # array, shape: (c, y, x), dtype: float32 if verbose: print("\troi {}, shape: {}".format(roi.id, img_roi.shape)) # slides[slide.id][roi.id] = { # "channel_names": roi.channel_names, # "channel_labels": roi.channel_labels, # "image": im_roi # } cytof_img = CytofImageTiff(image=im_roi.transpose((1,2,0)), slide=slide.id, roi=roi.id, filename=raw_f) cytof_img.set_channels(roi.channel_names, roi.channel_labels) cytof_imgs["{}_{}".format(slide.id, roi.id)] = cytof_img return cytof_imgs# slides def cytof_preprocess(df): """ Preprocess cytof dataframe Every pair of X and Y values represent for a unique physical pixel locations in the original image The values for Xs and Ys should be continuous integers The missing pixels would be filled with 0 Inputs: df = cytof dataframe Returns: df = preprocessed cytof dataframe with missing pixel values filled with 0 :param df: pandas.core.frame.DataFrame :return df: pandas.core.frame.DataFrame """ nrow = max(df['Y'].values) + 1 ncol = max(df['X'].values) + 1 n = len(df) if nrow * ncol > n: df2 = pd.DataFrame(np.zeros((nrow * ncol - n, len(df.columns)), dtype=int), columns=df.columns) df = pd.concat([df, df2]) return df def cytof_check_channels(df, marker_names=None, xlim=None, ylim=None): """A visualization function to show different markers of a cytof image Inputs: df = preprocessed cytof dataframe marker_names = marker names to visualize, should match to column names in df (default=None) xlim = x-axis limit of output image (default=None) ylim = y-axis limit of output image (default=None) :param df: pandas.core.frame.DataFrame :param marker_names: list :param xlim: tuple :prarm ylim: tuple """ if marker_names is None: marker_names = [df.columns[_] for _ in range(6, len(df.columns))] nrow = max(df['Y'].values) + 1 ncol = max(df['X'].values) + 1 ax_ncol = 5 ax_nrow = int(np.ceil(len(marker_names)/5)) fig, axes = plt.subplots(ax_nrow, ax_ncol, figsize=(3*ax_ncol, 3*ax_nrow)) if ax_nrow == 1: axes = np.array([axes]) for i, _ in enumerate(marker_names): _ax_nrow = int(np.floor(i/ax_ncol)) _ax_ncol = i % ax_ncol image = df[_].values.reshape(nrow, ncol) image = np.clip(image/np.quantile(image, 0.99), 0, 1) axes[_ax_nrow, _ax_ncol].set_title(_) if xlim is not None: image = image[:, xlim[0]:xlim[1]] if ylim is not None: image = image[ylim[0]:ylim[1], :] im = axes[_ax_nrow, _ax_ncol].imshow(image, cmap="gray") fig.colorbar(im, ax=axes[_ax_nrow, _ax_ncol]) plt.show() def remove_special_channels(self, channels): for channel in channels: idx = self.channels.index(channel) self.channels.pop(idx) self.markers.pop(idx) self.labels.pop(idx) self.df.drop(columns=channel, inplace=True) def define_special_channels(self, channels_dict): # create a copy of original dataframe self.df_orig = self.df.copy() for new_name, old_names in channels_dict.items(): print(new_name) if len(old_names) == 0: continue old_nms = [] for i, old_name in enumerate(old_names): if old_name['marker_name'] not in self.channels: warnings.warn('{} is not available!'.format(old_name['marker_name'])) continue old_nms.append(old_name) if len(old_nms) > 0: for i, old_name in enumerate(old_nms): if i == 0: self.df[new_name] = self.df[old_name['marker_name']] else: self.df[new_name] += self.df[old_name['marker_name']] if not old_name['to_keep']: idx = self.channels.index(old_name['marker_name']) # Remove the unwanted channels self.channels.pop(idx) self.markers.pop(idx) self.labels.pop(idx) self.df.drop(columns=old_name['marker_name'], inplace=True) self.channels.append(new_name) def cytof_txt2img(df, marker_names): """ Convert from cytof dataframe to d-dimensional image, where d=length of marker names Each channel of the output image correspond to the pixel intensity of the corresponding marker Inputs: df = cytof dataframe marker_names = markers to take into consideration Returns: out_img = d-dimensional image :param df: pandas.core.frame.DataFrame :param marker_names: list :return out_img: numpy.ndarray """ nc_in = len(marker_names) marker_names = [_ for _ in marker_names if _ in df.columns.values] nc = len(marker_names) if nc != nc_in: warnings.warn("{} markers selected instead of {}".format(nc, nc_in)) nrow = max(df['Y'].values) + 1 ncol = max(df['X'].values) + 1 print("Output image shape: [{}, {}, {}]".format(nrow, ncol, nc)) out_image = np.zeros([nrow, ncol, nc], dtype=float) for _nc in range(nc): out_image[..., _nc] = df[marker_names[_nc]].values.reshape(nrow, ncol) return out_image def cytof_merge_channels(im_cytof: np.ndarray, channel_names: List, channel_ids:List = None, channels: List = None, quantiles: List = None, visualize: bool = False): """ Merge selected channels (given by "channel_ids") of raw cytof image and generate a RGB image Inputs: im_cytof = raw cytof image channel_names = a list of names correspond to all channels of the im_cytof channel_ids = the indices of channels to show, no more than 6 channels can be shown the same time (default=None) channels = the names of channels to show, no more than 6 channels can be shown the same time (default=None) Either "channel_ids" or "channels" should be provided quantiles = the quantile values for each channel defined by channel_ids (default=None) visualize = a flag indicating whether print the visualization on screen Returns: merged_im = channel merged image quantiles = the quantile values for each channel defined by channel_ids :param im_cytof: numpy.ndarray :param channel_names: list :param channel_ids: list :param channels: list :param quantiles: list :return merged_im: numpy.ndarray :return quantiles: list """ assert len(channel_names) == im_cytof.shape[-1], 'The length of "channel_names" does not match the image size!' assert channel_ids or channels, 'At least one should be provided, either "channel_ids" or "channels"!' if channel_ids is None: channel_ids = [channel_names.index(n) for n in channels] assert len(channel_ids) <= 6, "No more than 6 channels can be visualized simultaneously!" if len(channel_ids) > 3: warnings.warn( "Visualizing more than 3 channels the same time results in deteriorated visualization. \ It is not recommended!") full_colors = ['red', 'green', 'blue', 'cyan', 'magenta', 'yellow'] info = [f"{marker} in {c}\n" for (marker, c) in \ zip([channel_names[i] for i in channel_ids], full_colors[:len(channel_ids)])] print(f"Visualizing... \n{''.join(info)}") merged_im = np.zeros((im_cytof.shape[0], im_cytof.shape[1], 3)) if quantiles is None: quantiles = [np.quantile(im_cytof[..., _], 0.99) for _ in channel_ids] for _ in range(min(len(channel_ids), 3)): merged_im[..., _] = np.clip(im_cytof[..., channel_ids[_]] / quantiles[_], 0, 1) * 255 chs = [[1, 2], [0, 2], [0, 1]] chs_id = 0 while _ < len(channel_ids) - 1: _ += 1 for j in chs[chs_id]: merged_im[..., j] += np.clip(im_cytof[..., channel_ids[_]] / quantiles[_], 0, 1) * 255 # /2 merged_im[..., j] = np.clip(merged_im[..., j], 0, 255) chs_id += 1 merged_im = merged_im.astype(np.uint8) if visualize: plt.imshow(merged_im) plt.show() return merged_im, quantiles