| | import episcanpy.api as epi |
| | import numpy as np |
| | import pandas as pd |
| | import scanpy as sc |
| | import scipy.sparse |
| | import sklearn |
| | from scipy import sparse |
| | from statsmodels.distributions.empirical_distribution import ECDF |
| |
|
| |
|
| | def quality_control( |
| | adata_atac, |
| | min_features=1000, |
| | max_features=60000, |
| | min_percent=0.05, |
| | min_cells=None, |
| | cell_type_col='cell type', |
| | ): |
| | epi.pp.filter_cells(adata_atac, min_features=min_features) |
| | epi.pp.filter_cells(adata_atac, max_features=max_features) |
| | if min_percent is not None: |
| | by = adata_atac.obs[cell_type_col] |
| | agg_idx = pd.Index(by.cat.categories) if isinstance(by, pd.CategoricalDtype) else pd.Index(np.unique(by)) |
| | agg_sum = sparse.coo_matrix( |
| | (np.ones(adata_atac.shape[0]), (agg_idx.get_indexer(by), np.arange(adata_atac.shape[0]))) |
| | ).tocsr() |
| | |
| | if not scipy.sparse.issparse(adata_atac.X): |
| | adata_atac.X = scipy.sparse.csr_matrix(adata_atac.X) |
| | sum_x = agg_sum @ (adata_atac.X != 0) |
| | df_percent = pd.DataFrame( |
| | sum_x.toarray(), index=agg_idx, columns=adata_atac.var.index |
| | ) / adata_atac.obs.value_counts(cell_type_col).loc[agg_idx].to_numpy()[:, np.newaxis] |
| | df_percent_max = np.max(df_percent, axis=0) |
| | sel_peaks = df_percent.columns[df_percent_max > min_percent] |
| | adata_atac = adata_atac[:, sel_peaks] |
| | elif min_cells is not None: |
| | epi.pp.filter_features(adata_atac, min_cells=min_cells) |
| | return adata_atac |
| |
|
| |
|
| | def tfidf(x): |
| | idf = x.shape[0] / (x.sum(axis=0) + 1e-6) |
| | if sparse.issparse(x): |
| | tf = x.multiply(1 / (x.sum(axis=1) + 1e-6)) |
| | return tf.multiply(idf) |
| | else: |
| | tf = x / (x.sum(axis=1, keepdims=True) + 1e-6) |
| | return tf * idf |
| |
|
| |
|
| | def lsi( |
| | adata, |
| | n_components=20, |
| | use_top_features=False, |
| | min_cutoff=0.05, |
| | **kwargs |
| | ): |
| | if "random_state" not in kwargs: |
| | kwargs["random_state"] = 0 |
| |
|
| | adata_use = adata.copy() |
| | if use_top_features: |
| | adata_use.var['featurecounts'] = np.array(np.sum(adata_use.X, axis=0))[0] |
| | df_var = adata_use.var.sort_values(by='featurecounts') |
| | ecdf = ECDF(df_var['featurecounts']) |
| | df_var['percentile'] = ecdf(df_var['featurecounts']) |
| | df_var["selected_feature"] = (df_var['percentile'] > min_cutoff) |
| | adata_use.var = df_var.loc[adata_use.var.index, :] |
| |
|
| | |
| | x_norm = np.log1p(tfidf(adata_use.X) * 1e4) |
| | if use_top_features: |
| | x_norm = x_norm.toarray()[:, adata_use.var["selected_feature"]] |
| | else: |
| | x_norm = x_norm.toarray() |
| | svd = sklearn.decomposition.TruncatedSVD(n_components=n_components, algorithm='arpack') |
| | X_lsi = svd.fit_transform(x_norm) |
| | X_lsi -= X_lsi.mean(axis=1, keepdims=True) |
| | X_lsi /= X_lsi.std(axis=1, ddof=1, keepdims=True) |
| | adata.obsm["X_lsi"] = X_lsi |
| |
|
| |
|
| | def deepen_atac_data(adata, num_pc=50, num_cell_merge=10): |
| | adata_atac_sample_cluster = adata.copy() |
| | lsi(adata_atac_sample_cluster, n_components=num_pc) |
| | adata_atac_sample_cluster.obsm["X_lsi"] = adata_atac_sample_cluster.obsm["X_lsi"][:, 1:] |
| | sc.pp.neighbors( |
| | adata_atac_sample_cluster, |
| | use_rep="X_lsi", |
| | metric="cosine", |
| | n_neighbors=int(num_cell_merge), |
| | n_pcs=num_pc-1 |
| | ) |
| |
|
| | list_atac_index = [] |
| | list_neigh_index = [] |
| | for cell_atac in list(adata_atac_sample_cluster.obs.index): |
| | cell_atac = [cell_atac] |
| | cell_atac_index = np.where(adata_atac_sample_cluster.obs.index == cell_atac[0])[0] |
| | cell_neighbor_idx = np.nonzero(adata_atac_sample_cluster.obsp['connectivities'].getcol(cell_atac_index).toarray())[0] |
| | if num_cell_merge >= len(cell_neighbor_idx): |
| | cell_sample_atac = np.hstack([cell_atac_index, cell_neighbor_idx]) |
| | else: |
| | cell_sample_atac = np.hstack([ |
| | cell_atac_index, np.random.choice(cell_neighbor_idx, num_cell_merge, replace=False) |
| | ]) |
| | list_atac_index.extend([cell_atac_index[0] for _ in range(len(cell_sample_atac))]) |
| | list_neigh_index.append(cell_sample_atac) |
| |
|
| | agg_sum = sparse.coo_matrix(( |
| | np.ones(len(list_atac_index)), (np.array(list_atac_index), np.hstack(list_neigh_index)) |
| | )).tocsr() |
| | array_atac = agg_sum @ adata.X |
| |
|
| | |
| | adata.X = None |
| | adata.X = array_atac |
| | return adata |
| |
|
| |
|
| | def chr_map_int(x): |
| | if x == "X" or x == "x": |
| | return 23 |
| | elif x == "Y" or x == "y": |
| | return 24 |
| | return int(x) |
| |
|