DeepLoop / app.py
dylan-plummer's picture
dpnii ref
bb6d255
import gradio as gr
import os
import scipy
from scipy.sparse import tril, triu
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from pathlib import Path
from tensorflow.keras.models import model_from_json
from huggingface_hub import hf_hub_download
#input_file = hf_hub_download(repo_id="dylanplummer/hicorr", filename="arima_beta.chr22", repo_type="dataset", token=os.environ['DATASET_SECRET'])
input_file = hf_hub_download(repo_id="dylanplummer/hicorr", filename="ORC2.chr22", repo_type="dataset", token=os.environ['DATASET_SECRET'])
data_dir = 'data/'
sparse_data_dir = 'data/sparse_data/'
def get_chromosome_from_filename(filename):
"""
Extract the chromosome string from any of the file name formats we use
Args:
filename (:obj:`str`) : name of anchor to anchor file
Returns:
Chromosome string of form chr<>
"""
chr_index = filename.find('chr') # index of chromosome name
if chr_index == 0: # if chromosome name is file prefix
return filename[:filename.find('.')]
file_ending_index = filename.rfind('.') # index of file ending
if chr_index > file_ending_index: # if chromosome name is file ending
return filename[chr_index:]
else:
return filename[chr_index: file_ending_index]
def draw_heatmap(matrix, color_scale, ax=None, return_image=False):
"""
Display ratio heatmap containing only strong signals (values > 1 or 0.98th quantile)
Args:
matrix (:obj:`numpy.array`) : ratio matrix to be displayed
color_scale (:obj:`int`) : max ratio value to be considered strongest by color mapping
ax (:obj:`matplotlib.axes.Axes`) : axes which will contain the heatmap. If None, new axes are created
return_image (:obj:`bool`) : set to True to return the image obtained from drawing the heatmap with the generated color map
Returns:
``numpy.array`` : if ``return_image`` is set to True, return the heatmap as an array
"""
if color_scale != 0:
breaks = np.append(np.arange(1.001, color_scale, (color_scale - 1.001) / 18), np.max(matrix))
elif np.max(matrix) < 2:
breaks = np.arange(1.001, np.max(matrix), (np.max(matrix) - 1.001) / 19)
else:
step = (np.quantile(matrix, q=0.95) - 1) / 18
up = np.quantile(matrix, q=0.95) + 0.011
if up < 2:
up = 2
step = 0.999 / 18
breaks = np.append(np.arange(1.001, up, step), np.max(matrix))
n_bin = 20 # Discretizes the interpolation into bins
colors = ["#FFFFFF", "#FFE4E4", "#FFD7D7", "#FFC9C9", "#FFBCBC", "#FFAEAE", "#FFA1A1", "#FF9494", "#FF8686",
"#FF7979", "#FF6B6B", "#FF5E5E", "#FF5151", "#FF4343", "#FF3636", "#FF2828", "#FF1B1B", "#FF0D0D",
"#FF0000"]
cmap_name = 'my_list'
# Create the colormap
cm = matplotlib.colors.LinearSegmentedColormap.from_list(
cmap_name, colors, N=n_bin)
norm = matplotlib.colors.BoundaryNorm(breaks, 20)
# Fewer bins will result in "coarser" colomap interpolation
if ax is None:
_, ax = plt.subplots()
img = ax.imshow(matrix, cmap=cm, norm=norm, interpolation='nearest')
if return_image:
plt.close()
return img.get_array()
def anchor_list_to_dict(anchors):
"""
Converts the array of anchor names to a dictionary mapping each anchor to its chromosomal index
Args:
anchors (:obj:`numpy.array`) : array of anchor name values
Returns:
`dict` : dictionary mapping each anchor to its index from the array
"""
anchor_dict = {}
for i, anchor in enumerate(anchors):
anchor_dict[anchor] = i
return anchor_dict
def anchor_to_locus(anchor_dict):
"""
Function to convert an anchor name to its genomic locus which can be easily vectorized
Args:
anchor_dict (:obj:`dict`) : dictionary mapping each anchor to its chromosomal index
Returns:
`function` : function which returns the locus of an anchor name
"""
def f(anchor):
return anchor_dict[anchor]
return f
def load_chr_ratio_matrix_from_sparse(dir_name, file_name, anchor_dir, sparse_dir=None, anchor_list=None, chr_name=None, dummy=5, ignore_sparse=False, force_symmetry=True, use_raw=False):
"""
Loads data as a sparse matrix by either reading a precompiled sparse matrix or an anchor to anchor file which is converted to sparse CSR format.
Ratio values are computed using the observed (obs) and expected (exp) values:
.. math::
ratio = \\frac{obs + dummy}{exp + dummy}
Args:
dir_name (:obj:`str`) : directory containing the anchor to anchor or precompiled (.npz) sparse matrix file
file_name (:obj:`str`) : name of anchor to anchor or precompiled (.npz) sparse matrix file
anchor_dir (:obj:`str`) : directory containing the reference anchor ``.bed`` files
dummy (:obj:`int`) : dummy value to used when computing ratio values
ignore_sparse (:obj:`bool`) : set to True to ignore precompiled sparse matrices even if they exist
Returns:
``scipy.sparse.csr_matrix``: sparse matrix of ratio values
"""
global data_dir
global sparse_data_dir
if chr_name is None:
chr_name = get_chromosome_from_filename(file_name)
sparse_rep_dir = dir_name[dir_name[: -1].rfind('/') + 1:] # directory where the pre-compiled sparse matrices are saved
if sparse_dir is not None:
sparse_data_dir = sparse_dir
os.makedirs(os.path.join(sparse_data_dir, sparse_rep_dir), exist_ok=True)
if file_name.endswith('.npz'): # loading pre-combined and pre-compiled sparse data
sparse_matrix = scipy.sparse.load_npz(dir_name + file_name)
else: # load from file name
if file_name + '.npz' in os.listdir(os.path.join(sparse_data_dir, sparse_rep_dir)) and not ignore_sparse: # check if pre-compiled data already exists
sparse_matrix = scipy.sparse.load_npz(os.path.join(sparse_data_dir, sparse_rep_dir, file_name + '.npz'))
else: # otherwise generate sparse matrix from anchor2anchor file and save pre-compiled data
if anchor_list is None:
if anchor_dir is None:
assert 'You must supply either an anchor reference list or the directory containing one'
anchor_list = pd.read_csv(os.path.join(anchor_dir, '%s.bed' % chr_name), sep='\t',
names=['chr', 'start', 'end', 'anchor']) # read anchor list file
matrix_size = len(anchor_list) # matrix size is needed to construct sparse CSR matrix
anchor_dict = anchor_list_to_dict(anchor_list['anchor'].values) # convert to anchor --> index dictionary
try: # first try reading anchor to anchor file as <a1> <a2> <obs> <exp>
chr_anchor_file = pd.read_csv(
os.path.join(dir_name, file_name),
delimiter='\t',
names=['anchor1', 'anchor2', 'obs', 'exp'],
usecols=['anchor1', 'anchor2', 'obs', 'exp']) # read chromosome anchor to anchor file
rows = np.vectorize(anchor_to_locus(anchor_dict))(chr_anchor_file['anchor1'].values) # convert anchor names to row indices
cols = np.vectorize(anchor_to_locus(anchor_dict))(chr_anchor_file['anchor2'].values) # convert anchor names to column indices
ratio = (chr_anchor_file['obs'] + dummy) / (chr_anchor_file['exp'] + dummy) # compute matrix ratio value
sparse_matrix = scipy.sparse.csr_matrix((ratio, (rows, cols)), shape=(matrix_size, matrix_size)) # construct sparse CSR matrix
except: # otherwise read anchor to anchor file as <a1> <a2> <ratio>
chr_anchor_file = pd.read_csv(
os.path.join(dir_name, file_name),
delimiter='\t',
names=['anchor1', 'anchor2', 'ratio'],
usecols=['anchor1', 'anchor2', 'ratio'])
rows = np.vectorize(anchor_to_locus(anchor_dict))(chr_anchor_file['anchor1'].values) # convert anchor names to row indices
cols = np.vectorize(anchor_to_locus(anchor_dict))(chr_anchor_file['anchor2'].values) # convert anchor names to column indices
if use_raw:
sparse_matrix = scipy.sparse.csr_matrix((chr_anchor_file['obs'], (rows, cols)), shape=(
matrix_size, matrix_size)) # construct sparse CSR matrix
else:
sparse_matrix = scipy.sparse.csr_matrix((chr_anchor_file['ratio'], (rows, cols)), shape=(matrix_size, matrix_size)) # construct sparse CSR matrix
if force_symmetry:
upper_sum = triu(sparse_matrix, k=1).sum()
lower_sum = tril(sparse_matrix, k=-1).sum()
if upper_sum == 0 or lower_sum == 0:
sparse_matrix = sparse_matrix + sparse_matrix.transpose()
sparse_triu = scipy.sparse.triu(sparse_matrix)
sparse_matrix = sparse_triu + sparse_triu.transpose()
if not ignore_sparse:
scipy.sparse.save_npz(os.path.join(sparse_data_dir, sparse_rep_dir, file_name), sparse_matrix) # save precompiled data
return sparse_matrix
model_depths = ['1.5M', '2M', '2.4M', '4.88M', '5M', '6.29M', '8.5M', '12.5M', '16.5M', '25M', '32M', '50M', '100M', '150M']
# Load the model
model_weights = 'DeepLoop_models/CPGZ_trained/12.5M.h5' # Replace with your model weights file
model_architecture = 'DeepLoop_models/CPGZ_trained/12.5M.json' # Replace with your model architecture file
with open(model_architecture, 'r') as f:
model = model_from_json(f.read())
model.load_weights(model_weights)
# Define the anchor file path
anchor_file = 'ref/hg19_DPNII_anchor_bed/chr22.bed'
#anchor_file = 'ref/hg19_Arima_anchor_bed/chr22.bed'
# Define the tile size
tile_size = 128
# Load the input matrix
# input_file = '../anchor_2_anchor.loop.chr22'
input_matrix = load_chr_ratio_matrix_from_sparse(os.path.dirname(input_file), os.path.basename(input_file),
os.path.dirname(anchor_file), force_symmetry=True)
# input_file = None
# input_matrix = None
# Load the anchor list
anchor_list = pd.read_csv(anchor_file, sep='\t', names=['chr', 'start', 'end', 'anchor'])
def predict(depth_idx):
"""Loads the input file, predicts the output, and visualizes the tile."""
selected_depth = model_depths[depth_idx]
model_weights = f'DeepLoop_models/CPGZ_trained/{selected_depth}.h5' # Replace with your model weights file
model_architecture = f'DeepLoop_models/CPGZ_trained/{selected_depth}.json' # Replace with your model architecture file
with open(model_architecture, 'r') as f:
model = model_from_json(f.read())
model.load_weights(model_weights)
# Get the tile
center_anchor = int(len(anchor_list) / 2)
i = max(0, center_anchor - int(tile_size / 2))
j = i + tile_size
tile = input_matrix[i:j, i:j].toarray()
tile = np.expand_dims(tile, -1)
tile = np.expand_dims(tile, 0)
# Predict the output
denoised_tile = model.predict(tile).reshape((tile_size, tile_size))
denoised_tile[denoised_tile < 0] = 0
# Normalize the tiles
tile = tile[0, ..., 0]
denoised_tile = (denoised_tile + denoised_tile.T) / 2
# Visualize the tiles
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 4))
draw_heatmap(tile, 0, ax=ax1)
draw_heatmap(denoised_tile, 0, ax=ax2)
ax1.set_title('Input Tile')
ax2.set_title(f'{selected_depth} model')
plt.tight_layout()
# return as a numpy array
fig.canvas.draw()
data = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,))
plt.close(fig)
return data
def upload_file(file):
global input_file, input_matrix
print(file)
input_file = file
input_matrix = load_chr_ratio_matrix_from_sparse(os.path.dirname(input_file), os.path.basename(input_file),
os.path.dirname(anchor_file), force_symmetry=True)
with gr.Blocks() as demo:
with gr.Row():
upload = gr.UploadButton("Upload a file", file_count="single")
with gr.Row():
slider = gr.Slider(minimum=0, maximum=len(model_depths) - 1, step=1, label='Model Depth', interactive=True)
heatmap = gr.Image(label='Visualization')
upload.upload(upload_file, upload)
slider.change(predict, [slider], heatmap)
if __name__ == "__main__":
demo.queue().launch()