|
|
|
|
|
import numpy as np |
|
import matplotlib.image as mpimg |
|
import matplotlib.pyplot as plt |
|
from sklearn import preprocessing |
|
import math |
|
import scipy.misc as misc |
|
import cv2 |
|
import skimage |
|
from skimage import measure |
|
from skimage import img_as_bool, io, color, morphology, segmentation |
|
from skimage.morphology import binary_closing, binary_opening, disk, closing, opening |
|
from PIL import Image |
|
|
|
import time |
|
import re |
|
import sys |
|
import os |
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
|
import pandas as pd |
|
import xml.etree.ElementTree as ET |
|
from skimage.draw import polygon |
|
import random |
|
|
|
|
|
|
|
|
|
|
|
def normalize(mat, quantile_low=0, quantile_high=1): |
|
"""Do min-max normalization for input matrix of any dimension.""" |
|
mat_normalized = (mat - np.quantile(mat, quantile_low)) / ( |
|
np.quantile(mat, quantile_high) - np.quantile(mat, quantile_low)) |
|
return mat_normalized |
|
|
|
|
|
def convert_to_optical_densities(img_RGB, r0=255, g0=255, b0=255): |
|
"""Conver RGB image to optical densities with same shape as input image.""" |
|
OD = img_RGB.astype(float) |
|
OD[:, :, 0] /= r0 |
|
OD[:, :, 1] /= g0 |
|
OD[:, :, 2] /= b0 |
|
return -np.log(OD + 0.00001) |
|
|
|
|
|
def channel_deconvolution(img_RGB, staining_type, plot_image=False, to_normalize=True): |
|
"""Deconvolute RGB image into different staining channels. |
|
Ref: https://blog.bham.ac.uk/intellimic/g-landini-software/colour-deconvolution/ |
|
|
|
Args: |
|
img_RGB: A uint8 numpy array with RGB channels. |
|
staining_type: Dyes used to stain the image; choose one from ("HDB", "HRB", "HDR", "HEB"). |
|
plot_image: Set True if want to real-time display results. Default is False. |
|
|
|
Returns: |
|
An unnormlized h*w*3 deconvoluted matrix and 3 different channels normalized to [0, 1] seperately. |
|
|
|
Raises: |
|
Exception: An error occured if staining_type is not defined. |
|
""" |
|
if staining_type == "HDB": |
|
channels = ("Hematoxylin", "DAB", "Background") |
|
stain_OD = np.asarray([[0.650, 0.704, 0.286], [0.268, 0.570, 0.776], [0.754, 0.077, 0.652]]) |
|
elif staining_type == "HRB": |
|
channels = ("Hematoxylin", "Red", "Background") |
|
stain_OD = np.asarray([[0.650, 0.704, 0.286], [0.214, 0.851, 0.478], [0.754, 0.077, 0.652]]) |
|
elif staining_type == "HDR": |
|
channels = ("Hematoxylin", "DAB", "Red") |
|
stain_OD = np.asarray([[0.650, 0.704, 0.286], [0.268, 0.570, 0.776], [0.214, 0.851, 0.478]]) |
|
elif staining_type == "HEB": |
|
channels = ("Hematoxylin", "Eosin", "Background") |
|
|
|
stain_OD = np.asarray([[0.644211, 0.716556, 0.266844], [0.092789, 0.964111, 0.283111], [0.754, 0.077, 0.652]]) |
|
else: |
|
raise Exception("Staining type not defined. Choose one from the following: HDB, HRB, HDR, HEB.") |
|
|
|
|
|
normalized_stain_OD = [] |
|
for r in stain_OD: |
|
normalized_stain_OD.append(r / np.linalg.norm(r)) |
|
normalized_stain_OD = np.asarray(normalized_stain_OD) |
|
stain_OD_inverse = np.linalg.inv(normalized_stain_OD) |
|
|
|
|
|
OD = convert_to_optical_densities(img_RGB, 255, 255, 255) |
|
|
|
|
|
img_deconvoluted = np.reshape(np.dot(np.reshape(OD, (-1, 3)), stain_OD_inverse), OD.shape) |
|
|
|
|
|
if to_normalize: |
|
channel1 = normalize(img_deconvoluted[:, :, 0]) |
|
channel2 = normalize(img_deconvoluted[:, :, 1]) |
|
channel3 = normalize(img_deconvoluted[:, :, 2]) |
|
else: |
|
channel1 = img_deconvoluted[:, :, 0] |
|
channel2 = img_deconvoluted[:, :, 1] |
|
channel3 = img_deconvoluted[:, :, 2] |
|
|
|
if plot_image: |
|
fig, axes = plt.subplots(2, 2, figsize=(15, 15), sharex=True, sharey=True, |
|
subplot_kw={'adjustable': 'box-forced'}) |
|
ax = axes.ravel() |
|
ax[0].imshow(img_RGB) |
|
ax[0].set_title("Original image") |
|
ax[1].imshow(channel1, cmap="gray") |
|
ax[1].set_title(channels[0]) |
|
ax[2].imshow(channel2, cmap="gray") |
|
ax[2].set_title(channels[1]) |
|
ax[3].imshow(channel3, cmap="gray") |
|
ax[3].set_title(channels[2]) |
|
plt.show() |
|
|
|
return img_deconvoluted, channel1, channel2, channel3 |
|
|
|
|
|
|
|
|
|
|
|
def make_8UC(mat, normalized=True): |
|
"""Convert the matrix to the equivalent matrix of the unsigned 8 bit integer datatype.""" |
|
if normalized: |
|
mat_uint8 = np.array(mat.copy() * 255, dtype=np.uint8) |
|
else: |
|
mat_uint8 = np.array(normalize(mat) * 255, dtype=np.uint8) |
|
return mat_uint8 |
|
|
|
|
|
def make_8UC3(mat, normalized=True): |
|
"""Convert the matrix to the equivalent matrix of the unsigned 8 bit integer datatype with 3 channels.""" |
|
mat_uint8 = make_8UC(mat, normalized) |
|
mat_uint8 = np.stack((mat_uint8,) * 3, axis=-1) |
|
return mat_uint8 |
|
|
|
|
|
def check_channel(channel): |
|
"""Check whether there is any signals in a channel (yes: 1; no: 0).""" |
|
channel_uint8 = make_8UC(normalize(channel)) |
|
if np.var(channel_uint8) < 0.02: |
|
return 0 |
|
else: |
|
return 1 |
|
|
|
|
|
def fill_holes(img_bw): |
|
"""Fill holes in input 0/255 matrix; equivalent of MATLAB's imfill(BW, 'holes').""" |
|
height, width = img_bw.shape |
|
|
|
|
|
mask = np.zeros((height + 4, width + 4), np.uint8) |
|
|
|
|
|
img_bw_copy = np.zeros((height + 2, width + 2), np.uint8) |
|
img_bw_copy[1:(height + 1), 1:(width + 1)] = img_bw |
|
cv2.floodFill(img_bw_copy, mask, (0, 0), 255) |
|
img_bw = img_bw | (255 - img_bw_copy[1:(height + 1), 1:(width + 1)]) |
|
return img_bw |
|
|
|
|
|
def otsu_thresholding(img, thresh=None, plot_image=False, fill_hole=False): |
|
"""Do image thresholding. |
|
|
|
Args: |
|
img: A uint8 matrix for thresholding. |
|
thresh: If provided, do binary thresholding use this threshold. If not, do default Otsu thresholding. |
|
plot_image: Set Ture if want to real-time display results. Default is False. |
|
fill_hole: Set True if want to fill holes in the generated mask. Default is False. |
|
|
|
Returns: |
|
A 0/255 mask matrix same size as img; object: 255; backgroung: 0. |
|
""" |
|
if thresh is None: |
|
|
|
thresh, mask = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
|
else: |
|
|
|
thresh, mask = cv2.threshold(img, thresh, 255, cv2.THRESH_BINARY) |
|
|
|
mask = skimage.morphology.remove_small_objects(mask, 2) |
|
|
|
|
|
if fill_hole: |
|
mask = fill_holes(mask) |
|
|
|
if plot_image: |
|
plt.figure() |
|
plt.imshow(img, cmap="gray") |
|
plt.title("Original") |
|
plt.figure() |
|
plt.imshow(mask) |
|
plt.title("After Thresholding") |
|
plt.colorbar() |
|
plt.show() |
|
|
|
return mask |
|
|
|
|
|
def watershed(mask, img, plot_image=False, kernel_size=2): |
|
"""Do watershed segmentation for input mask and image. |
|
|
|
Args: |
|
mask: A 0/255 matrix with 255 indicating objects. |
|
img: An 8UC3 matrix for watershed segmentation. |
|
plot_image: Set True if want to real-time display results. Default is False. |
|
kernel_size: Kernal size for inner marker erosion. Default is 2. |
|
|
|
Returns: |
|
A uint8 mask same size as input image, with -1 indicating boundary, 1 indicating background, |
|
and numbers>1 indicating objects. |
|
""" |
|
img_copy = img.copy() |
|
mask_copy = np.array(mask.copy(), dtype=np.uint8) |
|
|
|
|
|
mask_closed = closing(np.array(mask_copy, dtype=np.uint8)) |
|
mask_closed = closing(np.array(mask_closed, dtype=np.uint8)) |
|
kernel = np.ones((kernel_size, kernel_size), np.uint8) |
|
sure_fg = cv2.erode(mask_closed, kernel, iterations=2) |
|
sure_fg = skimage.morphology.closing(np.array(sure_fg, dtype=np.uint8)) |
|
|
|
|
|
sure_fg_bool = 1 - img_as_bool(sure_fg) |
|
sure_bg = np.uint8(1 - morphology.skeletonize(sure_fg_bool)) |
|
|
|
|
|
sure_fg = np.uint8(sure_fg) |
|
unknown = cv2.subtract(sure_bg, sure_fg) |
|
|
|
|
|
_, markers = cv2.connectedComponents(sure_fg) |
|
markers = markers + 1 |
|
markers[unknown == 1] = 0 |
|
|
|
|
|
|
|
marker = cv2.watershed(img_copy, markers.copy()) |
|
|
|
if plot_image: |
|
plt.figure() |
|
plt.imshow(sure_fg) |
|
plt.title("Inner Marker") |
|
plt.figure() |
|
plt.imshow(sure_bg) |
|
plt.title("Outer Marker") |
|
plt.figure() |
|
plt.imshow(unknown) |
|
plt.title("Unknown") |
|
plt.figure() |
|
plt.imshow(markers, cmap='jet') |
|
plt.title("Markers") |
|
plt.figure() |
|
plt.imshow(marker, cmap='jet') |
|
plt.title("Mask") |
|
plt.figure() |
|
plt.imshow(img) |
|
plt.title("Original Image") |
|
plt.figure() |
|
img_copy[marker == -1] = [0, 255, 0] |
|
plt.imshow(img_copy) |
|
plt.title("Marked Image") |
|
plt.show() |
|
|
|
return marker |
|
|
|
|
|
def generate_mask(channel, original_img=None, overlap_color=(0, 1, 0), |
|
plot_process=False, plot_result=False, title="", |
|
fill_hole=False, thresh=None, |
|
use_watershed=True, watershed_kernel_size=2, |
|
save_img=False, save_path=None): |
|
"""Generate mask for a gray-value image. |
|
|
|
Args: |
|
channel: Channel returned by function 'channel_deconvolution'. A gray-value image is also accepted. |
|
original_img: A image used for plotting overlapped segmentation result, optional. |
|
overlap_color: A 3-value tuple setting the color used to mark segmentation boundaries on original |
|
image. Default is green (0, 1, 0). |
|
plot_process: Set True if want to display the whole mask generation process. Default is False. |
|
plot_result: Set True if want to display the final result. Default is False. |
|
title: The title used for plot_result, optional. |
|
fill_hole: Set True if want to fill mask holes. Default is False. |
|
thresh: Provide this value to do binary thresholding instead of default otsu thresholding. |
|
use_watershed: Set False if want to skip the watershed segmentation step. Default is True. |
|
watershed_kernel_size: Kernel size of inner marker erosion. Default is 2. |
|
save_img: Set True if want to save the mask image. Default is False. |
|
save_path: The path to save the mask image, optional. Prefer *.png or *.pdf. |
|
|
|
Returns: |
|
A binary mask with 1 indicating an object and 0 indicating background. |
|
|
|
Raises: |
|
IOError: An error occured writing image to save_path. |
|
""" |
|
if not check_channel(channel): |
|
|
|
print("No signals detected for this channel") |
|
return np.zeros(channel.shape) |
|
else: |
|
channel = normalize(channel) |
|
if use_watershed: |
|
mask_threshold = otsu_thresholding(make_8UC(channel), |
|
plot_image=plot_process, fill_hole=fill_hole, thresh=thresh) |
|
marker = watershed(mask_threshold, make_8UC3(channel), |
|
plot_image=plot_process, kernel_size=watershed_kernel_size) |
|
|
|
mask = np.zeros(marker.shape) |
|
mask[marker == 1] = 1 |
|
mask = 1 - mask |
|
|
|
mask[0, :] = mask_threshold[0, :] == 255 |
|
mask[-1, :] = mask_threshold[-1, :] == 255 |
|
mask[:, 0] = mask_threshold[:, 0] == 255 |
|
mask[:, -1] = mask_threshold[:, -1] == 255 |
|
else: |
|
mask = otsu_thresholding(make_8UC(channel), |
|
plot_image=plot_process, fill_hole=fill_hole, thresh=thresh) |
|
|
|
if plot_result or save_img: |
|
if original_img is None: |
|
|
|
plt.figure() |
|
plt.imshow(mask, cmap="gray") |
|
else: |
|
|
|
overlapped_img = segmentation.mark_boundaries(original_img, skimage.measure.label(mask), |
|
overlap_color, mode="thick") |
|
fig, axes = plt.subplots(1, 2, figsize=(15, 15), sharex=True, sharey=True, |
|
subplot_kw={'adjustable': 'box-forced'}) |
|
ax = axes.ravel() |
|
ax[0].imshow(mask, cmap="gray") |
|
ax[0].set_title(str(title) + " Mask") |
|
ax[1].imshow(overlapped_img) |
|
ax[1].set_title("Overlapped with Original Image") |
|
if save_img: |
|
try: |
|
plt.savefig(save_path) |
|
except: |
|
raise IOError("Error saving image to {}".format(save_path)) |
|
if plot_result: |
|
plt.show() |
|
plt.close() |
|
return mask |
|
|
|
|
|
def get_mask_for_slide_image(filePath, display_progress=False): |
|
"""Generate mask for slide""" |
|
slide = open_slide(filePath) |
|
|
|
|
|
level_dims = slide.level_dimensions |
|
level_to_analyze = len(level_dims) - 1 |
|
dims_of_selected = level_dims[-1] |
|
|
|
if display_progress: |
|
print('Selected image of size (' + str(dims_of_selected[0]) + ', ' + str(dims_of_selected[1]) + ')') |
|
slide_image = slide.read_region((0, 0), level_to_analyze, dims_of_selected) |
|
slide_image = np.array(slide_image) |
|
if display_progress: |
|
plt.figure() |
|
plt.imshow(slide_image) |
|
|
|
|
|
|
|
|
|
threshB, maskB = cv2.threshold(slide_image[:, :, 2], 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
|
|
|
|
|
|
|
mask = 255 - maskB |
|
if display_progress: |
|
plt.figure() |
|
plt.imshow(mask) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
kernel = np.ones((3, 3), np.uint8) |
|
mask = cv2.dilate(mask, kernel, iterations=1) |
|
mask = cv2.erode(mask, kernel, iterations=1) |
|
mask = cv2.dilate(mask, kernel, iterations=1) |
|
|
|
|
|
mask = fill_holes(mask) |
|
if display_progress: |
|
plt.figure() |
|
plt.imshow(mask) |
|
plt.show() |
|
|
|
return mask, slide_image |
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_patch_by_location(filepath, location, patch_size=(500, 500), |
|
plot_image=False, level_to_analyze=0, save=False, savepath='.'): |
|
if not os.path.isfile(filepath): |
|
raise IOError("Image not found!") |
|
return [] |
|
|
|
slide = open_slide(filepath) |
|
slide_image = slide.read_region(location, level_to_analyze, patch_size) |
|
if plot_image: |
|
plt.figure() |
|
plt.imshow(slide_image) |
|
plt.show() |
|
|
|
if save: |
|
filename = re.search("(?<=/)[^/]+\.svs", filepath).group(0)[0:-4] |
|
savename = os.path.join(savepath, str(filename) + '_' + str(location[0]) + '_' + str(location[1]) + '.png') |
|
misc.imsave(savename, slide_image) |
|
print("Writed to " + savename) |
|
return slide_image |
|
|
|
|
|
def extract_patch_by_tissue_area(filePath, nPatch=0, patchSize=500, maxPatch=10, |
|
filename=None, savePath=None, displayProgress=False, desiredLevel=0, random=False): |
|
'''Input: slide |
|
Output: image patches''' |
|
if filename is None: |
|
filename = re.search("(?<=/)[0-9]+\.svs", filePath).group(0) |
|
if savePath is None: |
|
savePath = '/home/swan15/python/brainTumor/sample_patches/' |
|
bwMask, slideImageCV = get_mask_for_slide_image(filePath, display_progress=displayProgress) |
|
slide = open_slide(filePath) |
|
levelDims = slide.level_dimensions |
|
|
|
for i in range(0, len(levelDims)): |
|
if bwMask.shape[0] == levelDims[i][1]: |
|
magnitude = levelDims[0][1] / levelDims[i][1] |
|
break |
|
|
|
if not random: |
|
nCol = int(math.ceil(levelDims[0][1] / patchSize)) |
|
nRow = int(math.ceil(levelDims[0][0] / patchSize)) |
|
|
|
_, contours, _ = cv2.findContours(bwMask, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE) |
|
for nContours in range(0, len(contours)): |
|
print(nContours) |
|
|
|
for i in range(0, nRow): |
|
minRow = i * patchSize / magnitude |
|
maxRow = (i + 1) * patchSize / magnitude |
|
matches = [x for x in range(0, len(contours[nContours][:, 0, 0])) |
|
if (contours[nContours][x, 0, 1] > minRow and contours[nContours][x, 0, 1] < maxRow)] |
|
try: |
|
print([min(contours[nContours][matches, 0, 0]), max(contours[nContours][matches, 0, 0])]) |
|
|
|
|
|
minCol = min(contours[nContours][matches, 0, 0]) * magnitude |
|
maxCol = max(contours[nContours][matches, 0, 0]) * magnitude |
|
minColInt = int(math.floor(minCol / patchSize)) |
|
maxColInt = int(math.ceil(maxCol / patchSize)) |
|
|
|
for j in range(minColInt, maxColInt): |
|
startCol = j * patchSize |
|
startRow = i * patchSize |
|
patch = slide.read_region((startCol, startRow), desiredLevel, (patchSize, patchSize)) |
|
patchCV = np.array(patch) |
|
patchCV = patchCV[:, :, 0:3] |
|
|
|
fname = os.path.join(savePath, filename + '_' + str(i) + '_' + str(j) + '.png') |
|
|
|
if not os.path.isfile(fname): |
|
misc.imsave(fname, patchCV) |
|
nPatch = nPatch + 1 |
|
print(nPatch) |
|
|
|
if nPatch >= maxPatch: |
|
break |
|
except ValueError: |
|
continue |
|
if nPatch >= maxPatch: |
|
break |
|
if nPatch >= maxPatch: |
|
break |
|
else: |
|
|
|
for i in range(nPatch, maxPatch): |
|
coords = np.transpose(np.nonzero(bwMask >= 1)) |
|
y, x = coords[np.random.randint(0, len(coords) - 1)] |
|
x = int(x * magnitude) - int(patchSize / 2) |
|
y = int(y * magnitude) - int(patchSize / 2) |
|
|
|
image = np.array(slide.read_region((x, y), desiredLevel, (patchSize, patchSize)))[..., 0:3] |
|
|
|
fname = os.path.join(savePath, filename + '_' + str(i) + '.png') |
|
|
|
if not os.path.isfile(fname): |
|
misc.imsave(fname, image) |
|
print(i) |
|
|
|
|
|
def parseXML(xmlFile, pattern): |
|
""" |
|
Parse XML File and returns an object containing all the vertices |
|
Verticies: (dict) |
|
pattern: (list) of dicts, each with 'X' and 'Y' key |
|
[{ 'X': [1,2,3], |
|
'Y': [1,2,3] }] |
|
""" |
|
|
|
tree = ET.parse(xmlFile) |
|
root = tree.getroot() |
|
|
|
regions = root.iter('Region') |
|
vertices = {pattern: []} |
|
|
|
for region in regions: |
|
label = region.get('Text') |
|
if label == pattern: |
|
vertices[label].append({'X': [], 'Y': []}) |
|
|
|
for vertex in region.iter('Vertex'): |
|
X = float(vertex.get('X')) |
|
Y = float(vertex.get('Y')) |
|
|
|
vertices[label][-1]['X'].append(X) |
|
vertices[label][-1]['Y'].append(Y) |
|
|
|
return vertices |
|
|
|
|
|
def calculateRatio(levelDims): |
|
""" Calculates the ratio between the highest resolution image and lowest resolution image. |
|
Returns the ratio as a tuple (Xratio, Yratio). |
|
""" |
|
highestReso = np.asarray(levelDims[0]) |
|
lowestReso = np.asarray(levelDims[-1]) |
|
Xratio, Yratio = highestReso / lowestReso |
|
return (Xratio, Yratio) |
|
|
|
|
|
def createMask(levelDims, vertices, pattern): |
|
""" |
|
Input: levelDims (nested list): dimensions of each layer of the slide. |
|
vertices (dict object as describe above) |
|
Output: (tuple) mask |
|
numpy nd array of 0/1, where 1 indicates inside the region |
|
and 0 is outside the region |
|
""" |
|
|
|
|
|
Xratio, Yratio = calculateRatio(levelDims) |
|
|
|
nRows, nCols = levelDims[-1] |
|
mask = np.zeros((nRows, nCols), dtype=np.uint8) |
|
|
|
for i in range(len(vertices[pattern])): |
|
lowX = np.array(vertices[pattern][i]['X']) / Xratio |
|
lowY = np.array(vertices[pattern][i]['Y']) / Yratio |
|
rr, cc = polygon(lowX, lowY, (nRows, nCols)) |
|
mask[rr, cc] = 1 |
|
|
|
return mask |
|
|
|
|
|
def getMask(xmlFile, svsFile, pattern): |
|
""" Parses XML File to get mask vertices and returns matrix masks |
|
where 1 indicates the pixel is inside the mask, and 0 indicates outside the mask. |
|
|
|
@param: {string} xmlFile: name of xml file that contains annotation vertices outlining the mask. |
|
@param: {string} svsFile: name of svs file that contains the slide image. |
|
@param: {pattern} string: name of the xml labeling |
|
Returns: slide - openslide slide Object |
|
mask - matrix mask of pattern |
|
""" |
|
vertices = parseXML(xmlFile, pattern) |
|
|
|
if not len(vertices[pattern]): |
|
slide = 0 |
|
mask = 0 |
|
return slide, mask |
|
|
|
slide = open_slide(svsFile) |
|
levelDims = slide.level_dimensions |
|
mask = createMask(levelDims, vertices, pattern) |
|
|
|
return slide, mask |
|
|
|
|
|
def plotMask(mask): |
|
fig, ax1 = plt.subplots(nrows=1, figsize=(6, 10)) |
|
ax1.imshow(mask) |
|
plt.show() |
|
|
|
|
|
def chooseRandPixel(mask): |
|
""" Returns [x,y] numpy array of random pixel. |
|
|
|
NOTE: the returned [x, y] correspond to [row, col] in the mask |
|
|
|
@param {numpy matrix} mask from which to choose random pixel. |
|
E.g., self.level_dims = self.slide.level_dimensions |
|
self.zoom = self.level_dims[0][0] / self.level_dims[-1][0] |
|
self.slide, mask = getMask(xml_file, slide_file, pattern) |
|
self.mask = cv2.erode(mask, np.ones((50, 50))) |
|
def get_patch(self): |
|
x, y = chooseRandPixel(self.mask) # x is the columns of original image |
|
x = int(x * self.zoom) |
|
y = int(y * self.zoom) |
|
patch = self.slide.read_region((x, y), 0, (self.PATCH_SIZE, self.PATCH_SIZE)) |
|
patch = np.array(patch)[..., 0:3] |
|
return patch, x, y |
|
self.get_patch() |
|
""" |
|
array = np.transpose(np.nonzero(mask)) |
|
index = random.randint(0, len(array) - 1) |
|
return array[index] |
|
|
|
|
|
def plotImage(image): |
|
plt.imshow(image) |
|
plt.show() |
|
|
|
|
|
def checkWhiteSlide(image): |
|
im = np.array(image.convert(mode='RGB')) |
|
pixels = np.ravel(im) |
|
mean = np.mean(pixels) |
|
return mean >= 230 |
|
|
|
|
|
|
|
def getPatches(slide, mask, numPatches=0, dims=(0, 0), dirPath='', slideNum='', plot=False, plotMask=False): |
|
""" Generates and saves 'numPatches' patches with dimension 'dims' from image 'slide' contained within 'mask'. |
|
@param {Openslide Slide obj} slide: image object |
|
@param {numpy matrix} mask: where 0 is outside region of interest and 1 indicates within |
|
@param {int} numPatches |
|
@param {tuple} dims: (w,h) dimensions of patches |
|
@param {string} dirPath: directory in which to save patches |
|
@param {string} slideNum: slide number |
|
Saves patches in directory specified by dirPath as [slideNum]_[patchNum]_[Xpixel]x[Ypixel].png |
|
""" |
|
w, h = dims |
|
levelDims = slide.level_dimensions |
|
Xratio, Yratio = calculateRatio(levelDims) |
|
|
|
i = 0 |
|
while i < numPatches: |
|
firstLoop = True |
|
|
|
while firstLoop: |
|
firstLoop = False |
|
x, y = chooseRandPixel(mask) |
|
xVertices = np.array([x, x + (w / Xratio), x + (w / Xratio), x, x]) |
|
yVertices = np.array([y, y, y - (h / Yratio), y - (h / Yratio), y]) |
|
rr, cc = polygon(xVertices, yVertices) |
|
|
|
image = slide.read_region((int(x * Xratio), int(y * Yratio)), 0, (w, h)) |
|
|
|
isWhite = checkWhiteSlide(image) |
|
|
|
if not isWhite: i += 1 |
|
|
|
slideName = '_'.join([slideNum, 'x'.join([str(x * Xratio), str(y * Yratio)])]) |
|
image.save(os.path.join(dirPath, slideName + ".png")) |
|
|
|
if plot: |
|
plotImage(image) |
|
if plotMask: mask[rr, cc] = 0 |
|
|
|
if plotMask: |
|
plotImage(mask) |
|
|
|
|
|
'''Example codes for getting patches from labeled svs files: |
|
#define the patterns |
|
patterns = ['small_acinar', |
|
'large_acinar', |
|
'tubular', |
|
'trabecular', |
|
'aveolar', |
|
'solid', |
|
'pseudopapillary', |
|
'rhabdoid', |
|
'sarcomatoid', |
|
'necrosis', |
|
'normal', |
|
'other'] |
|
#create folders |
|
for pattern in patterns: |
|
if not os.path.exists(pattern): |
|
os.makedirs(pattern) |
|
#define parameters |
|
patchSize = 500 |
|
numPatches = 50 |
|
dirName = '/home/swan15/kidney/ccRCC/slides' |
|
annotatedSlides = 'slide_region_of_interests.txt' |
|
|
|
f = open(annotatedSlides, 'r+') |
|
slides = [re.search('.*(?=\.svs)', line).group(0) for line in f |
|
if re.search('.*(?=\.svs)', line) is not None] |
|
print slides |
|
f.close() |
|
for slideID in slides: |
|
print('Start '+slideID) |
|
try: |
|
xmlFile = slideID+'.xml' |
|
svsFile = slideID+'.svs' |
|
|
|
xmlFile = os.path.join(dirName, xmlFile) |
|
svsFile = os.path.join(dirName, svsFile) |
|
|
|
if not os.path.isfile(xmlFile): |
|
print xmlFile+' not exist' |
|
continue |
|
|
|
for pattern in patterns: |
|
|
|
numPatchesGenerated = len([files for files in os.listdir(pattern) |
|
if re.search(slideID+'_.+\.png', files) is not None]) |
|
if numPatchesGenerated >= numPatches: |
|
print(pattern+' existed') |
|
continue |
|
else: |
|
numPatchesTemp = numPatches - numPatchesGenerated |
|
|
|
slide, mask = getMask(xmlFile, svsFile, pattern) |
|
|
|
if not slide: |
|
#print(pattern+' not detected.') |
|
continue |
|
|
|
getPatches(slide, mask, numPatches = numPatchesTemp, dims = (patchSize, patchSize), |
|
dirPath = pattern+'/', slideNum = slideID, plotMask = False) # Get Patches |
|
print(pattern+' done.') |
|
|
|
print('Done with ' + slideID) |
|
print('----------------------') |
|
|
|
except: |
|
print('Error with ' + slideID) |
|
''' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_RGBA(RGBA_img): |
|
if np.shape(RGBA_img)[2] == 4: |
|
RGB_img = np.zeros((np.shape(RGBA_img)[0], np.shape(RGBA_img)[1], 3)) |
|
RGB_img[RGBA_img[:, :, 3] == 0] = [255, 255, 255] |
|
RGB_img[RGBA_img[:, :, 3] == 255] = RGBA_img[RGBA_img[:, :, 3] == 255, 0:3] |
|
return RGB_img |
|
else: |
|
print("Not an RGBA image") |
|
return RGBA_img |
|
|
|
|
|
|
|
def RGB_to_index(RGB_img, RGB_markers=None, RGB_labels=None): |
|
"""Change RGB to 2D index matrix; each RGB color corresponds to one index. |
|
|
|
Args: |
|
RGB_markers: start from background (marked as 0); |
|
Example format: |
|
[[255, 255, 255], |
|
[160, 255, 0]] |
|
RGB_labels: a numeric vector corresponding to the labels of RGB_markers; |
|
length should be the same as RGB_markers. |
|
""" |
|
if np.shape(RGB_img)[2] != 3: |
|
print("Not an RGB image") |
|
return RGB_img |
|
else: |
|
if RGB_markers == None: |
|
RGB_markers = [[255, 255, 255]] |
|
if RGB_labels == None: |
|
RGB_labels = range(np.shape(RGB_markers)[0]) |
|
mask_index = np.zeros((np.shape(RGB_img)[0], np.shape(RGB_img)[1])) |
|
for i, RGB_label in enumerate(RGB_labels): |
|
mask_index[np.all(RGB_img == RGB_markers[i], axis=2)] = RGB_label |
|
return mask_index |
|
|
|
|
|
def index_to_RGB(mask_index, RGB_markers=None): |
|
"""Change index to 2D image; each index corresponds to one color""" |
|
mask_index_copy = mask_index.copy() |
|
mask_index_copy = np.squeeze(mask_index_copy) |
|
if RGB_markers == None: |
|
print("RGB_markers not provided!") |
|
RGB_markers = [[255, 255, 255]] |
|
RGB_img = np.zeros((np.shape(mask_index_copy)[0], np.shape(mask_index_copy)[1], 3), dtype=np.uint8) |
|
RGB_img[:, :] = RGB_markers[0] |
|
for i in range(np.shape(RGB_markers)[0]): |
|
RGB_img[mask_index_copy == i] = RGB_markers[i] |
|
return RGB_img |
|
|
|
|
|
def shift_HSV(img, amount=(0.9, 0.9, 0.9)): |
|
"""Function to tune Hue, Saturation, and Value for image img""" |
|
img = Image.fromarray(img, 'RGB') |
|
hsv = img.convert('HSV') |
|
hsv = np.array(hsv) |
|
hsv[..., 0] = np.clip((hsv[..., 0] * amount[0]), a_max=255, a_min=0) |
|
hsv[..., 1] = np.clip((hsv[..., 1] * amount[1]), a_max=255, a_min=0) |
|
hsv[..., 2] = np.clip((hsv[..., 2] * amount[2]), a_max=255, a_min=0) |
|
new_img = Image.fromarray(hsv, 'HSV') |
|
return np.array(new_img.convert('RGB')) |
|
|
|
|