Spaces:
Sleeping
Sleeping
| import scanpy as sc | |
| import numpy as np | |
| from sklearn.model_selection import train_test_split | |
| import os | |
| import warnings | |
| warnings.filterwarnings("ignore", category=FutureWarning, module="anndata") | |
| warnings.filterwarnings("ignore", message="Moving element from .uns") | |
| def process(): | |
| os.makedirs("celldreamer/data/processed", exist_ok=True) | |
| adata = sc.read("celldreamer/data/raw/panc8_raw.h5ad") | |
| sc.pp.filter_cells(adata, min_genes=200) | |
| sc.pp.filter_genes(adata, min_cells=3) | |
| print(f"cleaned Shape: {adata.shape}") | |
| print("getting K-nearest nieghbors") | |
| sc.pp.pca(adata, n_comps=50) | |
| sc.pp.neighbors(adata, n_neighbors=30, n_pcs=20) | |
| sc.tl.diffmap(adata) | |
| # find step 0 stem cell | |
| try: | |
| root_candidates = np.where(adata.obs['celltype'].str.contains('ductal', case=False))[0] | |
| adata.uns['iroot'] = root_candidates[0] if len(root_candidates) > 0 else 0 | |
| except: | |
| adata.uns['iroot'] = 0 | |
| sc.tl.dpt(adata) | |
| # create t,t+1 pairs | |
| print("creating pairs") | |
| graph = adata.obsp['connectivities'] | |
| times = adata.obs['dpt_pseudotime'].values | |
| pairs = [] | |
| rows, cols = graph.nonzero() | |
| for i, j in zip(rows, cols): | |
| t_i, t_j = times[i], times[j] | |
| # max time diff is 0.1 for ~similar time diffs | |
| if t_j > t_i and (t_j - t_i) < 0.1: | |
| pairs.append([i, j]) | |
| pairs = np.array(pairs) | |
| train, temp = train_test_split(pairs, test_size=0.2, random_state=42) | |
| val, test = train_test_split(temp, test_size=0.5, random_state=42) | |
| np.save("celldreamer/data/processed/train_pairs.npy", train) | |
| np.save("celldreamer/data/processed/val_pairs.npy", val) | |
| np.save("celldreamer/data/processed/test_pairs.npy", test) | |
| print(f"Train({len(train)}), Val({len(val)}), Test({len(test)})") | |
| adata.write("celldreamer/data/processed/cleaned.h5ad") | |
| np.save("celldreamer/data/processed/full_set.npy", pairs) |