multiTAP / cytof /hyperion_preprocess.py
ivangzf's picture
add multitap files
b78c3b8
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pathlib
import skimage.io as skio
import warnings
from typing import Union, Optional, Type, Tuple, List
# from readimc import MCDFile
# from cytof.classes import CytofImage, CytofImageTiff
import sys
import platform
from pathlib import Path
FILE = Path(__file__).resolve()
ROOT = FILE.parents[0] # cytof root directory
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT)) # add ROOT to PATH
if platform.system() != 'Windows':
ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative
from classes import CytofImage, CytofImageTiff
# ####################### Read data ########################
def cytof_read_data_roi(filename, slide="", roi=None, iltype="hwd", **kwargs) -> Tuple[CytofImage, list]:
""" Read cytof data (.txt file) as a dataframe
Inputs:
filename = full filename of the cytof data (path-name-ext)
Returns:
df_cytof = dataframe of the cytof data
cols = column names of the dataframe, an empty list returned if not reading data from a dataframe
:param filename: str
:return df_cytof: pandas.core.frame.DataFrame
"""
ext = pathlib.Path(filename).suffix
assert len(ext) > 0, "Please provide a full file name with extension!"
assert ext.upper() in ['.TXT', '.TIFF', '.TIF', '.CSV', '.QPTIFF'], "filetypes other than '.txt', '.tiff' or '.csv' are not (yet) supported."
if ext.upper() in ['.TXT', '.CSV']: # the case with a dataframe
if ext.upper() == '.TXT':
df_cytof = pd.read_csv(filename, sep='\t') # pd.read_table(filename)
if roi is None:
roi = os.path.basename(filename).split('.txt')[0]
# initialize an instance of CytofImage
cytof_img = CytofImage(df_cytof, slide=slide, roi=roi, filename=filename)
elif ext.upper() == '.CSV':
df_cytof = pd.read_csv(filename)
if roi is None:
roi = os.path.basename(filename).split('.csv')[0]
# initialize an instance of CytofImage
cytof_img = CytofImage(df_cytof, slide=slide, roi=roi, filename=filename)
if "X" in kwargs and "Y" in kwargs:
cytof_img.df.rename(columns={kwargs["X"]: "X", kwargs["Y"]: 'Y'}, inplace=True)
cols = cytof_img.df.columns
else: # the case without a dataframe
image = skio.imread(filename, plugin="tifffile")
orig_img_shape = image.shape
sorted_shape = np.sort(orig_img_shape)
# roll the sorted shape by one to the left
# ref: https://numpy.org/doc/stable/reference/generated/numpy.roll.html
correct_shape = np.roll(sorted_shape, -1)
# sometimes tiff could be square, this ensures images were correctly transposed
orig_temp = list(orig_img_shape) # tuple is immutable
correct_index = []
for shape in correct_shape:
correct_index.append(orig_temp.index(shape))
# placeholder, since shape can't = 0
orig_temp[orig_temp.index(shape)] = 0
image = image.transpose(correct_index)
# create TIFF class cytof image
cytof_img = CytofImageTiff(image, slide=slide, roi=roi, filename=filename)
cols = []
return cytof_img, cols
def cytof_read_data_mcd(filename, verbose=False):
# slides = {}
cytof_imgs = {}
with MCDFile(filename) as f:
if verbose:
print("\n{}, \n\t{} slides, showing the 1st slide:".format(filename, len(f.slides)))
## slide
for slide in f.slides:
if verbose:
print("\tslide ID: {}, description: {}, width: {} um, height: {}um".format(
slide.id,
slide.description,
slide.width_um,
slide.height_um)
)
# slides[slide.id] = {}
# read the slide image
im_slide = f.read_slide(slide) # numpy array or None
if verbose:
print("\n\tslide image shape: {}".format(im_slide.shape))
# (optional) read the first panorama image
panorama = slide.panoramas[0]
if verbose:
print(
"\t{} panoramas, showing the 1st one. \n\tpanorama ID: {}, description: {}, width: {} um, height: {}um".format(
len(slide.panoramas),
panorama.id,
panorama.description,
panorama.width_um,
panorama.height_um)
)
im_pano = f.read_panorama(panorama) # numpy array
if verbose:
print("\n\tpanorama image shape: {}".format(im_pano.shape))
for roi in slide.acquisitions: # for each acquisition (roi)
im_roi = f.read_acquisition(roi) # array, shape: (c, y, x), dtype: float32
if verbose:
print("\troi {}, shape: {}".format(roi.id, img_roi.shape))
# slides[slide.id][roi.id] = {
# "channel_names": roi.channel_names,
# "channel_labels": roi.channel_labels,
# "image": im_roi
# }
cytof_img = CytofImageTiff(image=im_roi.transpose((1,2,0)),
slide=slide.id,
roi=roi.id,
filename=raw_f)
cytof_img.set_channels(roi.channel_names, roi.channel_labels)
cytof_imgs["{}_{}".format(slide.id, roi.id)] = cytof_img
return cytof_imgs# slides
def cytof_preprocess(df):
""" Preprocess cytof dataframe
Every pair of X and Y values represent for a unique physical pixel locations in the original image
The values for Xs and Ys should be continuous integers
The missing pixels would be filled with 0
Inputs:
df = cytof dataframe
Returns:
df = preprocessed cytof dataframe with missing pixel values filled with 0
:param df: pandas.core.frame.DataFrame
:return df: pandas.core.frame.DataFrame
"""
nrow = max(df['Y'].values) + 1
ncol = max(df['X'].values) + 1
n = len(df)
if nrow * ncol > n:
df2 = pd.DataFrame(np.zeros((nrow * ncol - n, len(df.columns)), dtype=int), columns=df.columns)
df = pd.concat([df, df2])
return df
def cytof_check_channels(df, marker_names=None, xlim=None, ylim=None):
"""A visualization function to show different markers of a cytof image
Inputs:
df = preprocessed cytof dataframe
marker_names = marker names to visualize, should match to column names in df (default=None)
xlim = x-axis limit of output image (default=None)
ylim = y-axis limit of output image (default=None)
:param df: pandas.core.frame.DataFrame
:param marker_names: list
:param xlim: tuple
:prarm ylim: tuple
"""
if marker_names is None:
marker_names = [df.columns[_] for _ in range(6, len(df.columns))]
nrow = max(df['Y'].values) + 1
ncol = max(df['X'].values) + 1
ax_ncol = 5
ax_nrow = int(np.ceil(len(marker_names)/5))
fig, axes = plt.subplots(ax_nrow, ax_ncol, figsize=(3*ax_ncol, 3*ax_nrow))
if ax_nrow == 1:
axes = np.array([axes])
for i, _ in enumerate(marker_names):
_ax_nrow = int(np.floor(i/ax_ncol))
_ax_ncol = i % ax_ncol
image = df[_].values.reshape(nrow, ncol)
image = np.clip(image/np.quantile(image, 0.99), 0, 1)
axes[_ax_nrow, _ax_ncol].set_title(_)
if xlim is not None:
image = image[:, xlim[0]:xlim[1]]
if ylim is not None:
image = image[ylim[0]:ylim[1], :]
im = axes[_ax_nrow, _ax_ncol].imshow(image, cmap="gray")
fig.colorbar(im, ax=axes[_ax_nrow, _ax_ncol])
plt.show()
def remove_special_channels(self, channels):
for channel in channels:
idx = self.channels.index(channel)
self.channels.pop(idx)
self.markers.pop(idx)
self.labels.pop(idx)
self.df.drop(columns=channel, inplace=True)
def define_special_channels(self, channels_dict):
# create a copy of original dataframe
self.df_orig = self.df.copy()
for new_name, old_names in channels_dict.items():
print(new_name)
if len(old_names) == 0:
continue
old_nms = []
for i, old_name in enumerate(old_names):
if old_name['marker_name'] not in self.channels:
warnings.warn('{} is not available!'.format(old_name['marker_name']))
continue
old_nms.append(old_name)
if len(old_nms) > 0:
for i, old_name in enumerate(old_nms):
if i == 0:
self.df[new_name] = self.df[old_name['marker_name']]
else:
self.df[new_name] += self.df[old_name['marker_name']]
if not old_name['to_keep']:
idx = self.channels.index(old_name['marker_name'])
# Remove the unwanted channels
self.channels.pop(idx)
self.markers.pop(idx)
self.labels.pop(idx)
self.df.drop(columns=old_name['marker_name'], inplace=True)
self.channels.append(new_name)
def cytof_txt2img(df, marker_names):
""" Convert from cytof dataframe to d-dimensional image, where d=length of marker names
Each channel of the output image correspond to the pixel intensity of the corresponding marker
Inputs:
df = cytof dataframe
marker_names = markers to take into consideration
Returns:
out_img = d-dimensional image
:param df: pandas.core.frame.DataFrame
:param marker_names: list
:return out_img: numpy.ndarray
"""
nc_in = len(marker_names)
marker_names = [_ for _ in marker_names if _ in df.columns.values]
nc = len(marker_names)
if nc != nc_in:
warnings.warn("{} markers selected instead of {}".format(nc, nc_in))
nrow = max(df['Y'].values) + 1
ncol = max(df['X'].values) + 1
print("Output image shape: [{}, {}, {}]".format(nrow, ncol, nc))
out_image = np.zeros([nrow, ncol, nc], dtype=float)
for _nc in range(nc):
out_image[..., _nc] = df[marker_names[_nc]].values.reshape(nrow, ncol)
return out_image
def cytof_merge_channels(im_cytof: np.ndarray,
channel_names: List,
channel_ids:List = None,
channels: List = None,
quantiles: List = None,
visualize: bool = False):
""" Merge selected channels (given by "channel_ids") of raw cytof image and generate a RGB image
Inputs:
im_cytof = raw cytof image
channel_names = a list of names correspond to all channels of the im_cytof
channel_ids = the indices of channels to show, no more than 6 channels can be shown the same time (default=None)
channels = the names of channels to show, no more than 6 channels can be shown the same time (default=None)
Either "channel_ids" or "channels" should be provided
quantiles = the quantile values for each channel defined by channel_ids (default=None)
visualize = a flag indicating whether print the visualization on screen
Returns:
merged_im = channel merged image
quantiles = the quantile values for each channel defined by channel_ids
:param im_cytof: numpy.ndarray
:param channel_names: list
:param channel_ids: list
:param channels: list
:param quantiles: list
:return merged_im: numpy.ndarray
:return quantiles: list
"""
assert len(channel_names) == im_cytof.shape[-1], 'The length of "channel_names" does not match the image size!'
assert channel_ids or channels, 'At least one should be provided, either "channel_ids" or "channels"!'
if channel_ids is None:
channel_ids = [channel_names.index(n) for n in channels]
assert len(channel_ids) <= 6, "No more than 6 channels can be visualized simultaneously!"
if len(channel_ids) > 3:
warnings.warn(
"Visualizing more than 3 channels the same time results in deteriorated visualization. \
It is not recommended!")
full_colors = ['red', 'green', 'blue', 'cyan', 'magenta', 'yellow']
info = [f"{marker} in {c}\n" for (marker, c) in \
zip([channel_names[i] for i in channel_ids], full_colors[:len(channel_ids)])]
print(f"Visualizing... \n{''.join(info)}")
merged_im = np.zeros((im_cytof.shape[0], im_cytof.shape[1], 3))
if quantiles is None:
quantiles = [np.quantile(im_cytof[..., _], 0.99) for _ in channel_ids]
for _ in range(min(len(channel_ids), 3)):
merged_im[..., _] = np.clip(im_cytof[..., channel_ids[_]] / quantiles[_], 0, 1) * 255
chs = [[1, 2], [0, 2], [0, 1]]
chs_id = 0
while _ < len(channel_ids) - 1:
_ += 1
for j in chs[chs_id]:
merged_im[..., j] += np.clip(im_cytof[..., channel_ids[_]] / quantiles[_], 0, 1) * 255 # /2
merged_im[..., j] = np.clip(merged_im[..., j], 0, 255)
chs_id += 1
merged_im = merged_im.astype(np.uint8)
if visualize:
plt.imshow(merged_im)
plt.show()
return merged_im, quantiles