Spaces:
Sleeping
Sleeping
from shiny import render | |
from shiny.express import input, output, ui | |
from datasets import load_dataset | |
import pandas as pd | |
from pathlib import Path | |
import matplotlib | |
import numpy as np | |
import gradio as gr | |
import matplotlib.pyplot as plt | |
import matplotlib.style as mplstyle | |
from scipy.interpolate import interp1d | |
from typing import Dict, Optional | |
from collections import namedtuple | |
# Mapping of nucleotides to float coordinates | |
mapping_easy = { | |
'A': np.array([0.5, -0.8660254037844386]), | |
'T': np.array([0.5, 0.8660254037844386]), | |
'G': np.array([0.8660254037844386, -0.5]), | |
'C': np.array([0.8660254037844386, 0.5]), | |
'N': np.array([0, 0]) | |
} | |
# coordinates for x+iy | |
Coord = namedtuple("Coord", ["x","y"]) | |
# coordinates for a CGR encoding | |
CGRCoords = namedtuple("CGRCoords", ["N","x","y"]) | |
# coordinates for each nucleotide in the 2d-plane | |
DEFAULT_COORDS = dict(A=Coord(1,1),C=Coord(-1,1),G=Coord(-1,-1),T=Coord(1,-1)) | |
# Function to convert a DNA sequence to a list of coordinates | |
def _dna_to_coordinates(dna_sequence, mapping): | |
dna_sequence = dna_sequence.upper() | |
coordinates = np.array([mapping.get(nucleotide, mapping['N']) for nucleotide in dna_sequence]) | |
return coordinates | |
# Function to create the cumulative sum of a list of coordinates | |
def _get_cumulative_coords(mapped_coords): | |
cumulative_coords = np.cumsum(mapped_coords, axis=0) | |
return cumulative_coords | |
# Function to take a list of DNA sequences and plot them in a single figure | |
def plot_2d_sequences(dna_sequences, mapping=mapping_easy, single_sequence=False): | |
fig, ax = plt.subplots() | |
if single_sequence: | |
dna_sequences = [dna_sequences] | |
for dna_sequence in dna_sequences: | |
mapped_coords = _dna_to_coordinates(dna_sequence, mapping) | |
cumulative_coords = _get_cumulative_coords(mapped_coords) | |
ax.plot(*cumulative_coords.T) | |
return fig | |
# Function to plot a comparison of DNA sequences | |
def plot_2d_comparison(dna_sequences_grouped, labels, mapping=mapping_easy): | |
fig, ax = plt.subplots() | |
colors = plt.cm.rainbow(np.linspace(0, 1, len(dna_sequences_grouped))) | |
for count, (dna_sequences, color) in enumerate(zip(dna_sequences_grouped, colors)): | |
for dna_sequence in dna_sequences: | |
mapped_coords = _dna_to_coordinates(dna_sequence, mapping) | |
cumulative_coords = _get_cumulative_coords(mapped_coords) | |
ax.plot(*cumulative_coords.T, color=color, label=labels[count]) | |
# Only show unique labels in the legend | |
handles, labels = ax.get_legend_handles_labels() | |
by_label = dict(zip(labels, handles)) | |
ax.legend(by_label.values(), by_label.keys()) | |
return fig | |
# Function to plot a comparison of DNA sequences | |
def plot_distrobutions(dna_sequences_grouped, labels, basepair, mapping=mapping_easy): | |
fig, ax = plt.subplots() | |
colors = plt.cm.rainbow(np.linspace(0, 1, len(dna_sequences_grouped))) | |
for count, (dna_sequences, color) in enumerate(zip(dna_sequences_grouped, colors)): | |
virus_y = [] | |
for dna_sequence in dna_sequences: | |
mapped_coords = _dna_to_coordinates(dna_sequence, mapping) | |
cumulative_coords = _get_cumulative_coords(mapped_coords) | |
y = cumulative_coords[:, 1][basepair] | |
virus_y.append(y) | |
count_bins, bins = np.histogram(virus_y) | |
ax.stairs(count_bins, bins , color= color, label=labels[count]) | |
# Only show unique labels in the legend | |
handles, labels = ax.get_legend_handles_labels() | |
by_label = dict(zip(labels, handles)) | |
ax.legend(by_label.values(), by_label.keys()) | |
return fig | |
############################################################# Virus Dataset ######################################################## | |
#ds = load_dataset('Hack90/virus_tiny') | |
df = pd.read_parquet('virus_ds.parquet') | |
virus = df['Organism_Name'].unique() | |
virus = {v: v for v in virus} | |
############################################################# Filter and Select ######################################################## | |
def filter_and_select(group): | |
if len(group) >= 3: | |
return group.head(3) | |
############################################################# Wens Method ######################################################## | |
import numpy as np | |
WEIGHTS = {'0100': 1/6, '0101': 2/6, '1100' : 3/6, '0110':3/6, '1101': 4/6, '1110': 5/6,'0111':5/6, '1111': 6/6} | |
LOWEST_LENGTH = 5000 | |
def _get_subsequences(sequence): | |
return {nuc: [i+1 for i, x in enumerate(sequence) if x == nuc] for nuc in 'ACTG'} | |
def _calculate_coordinates_fixed(subsequence, L=LOWEST_LENGTH): | |
return [((2 * np.pi / (L - 1)) * (K-1), np.sqrt((2 * np.pi / (L - 1)) * (K-1))) for K in subsequence] | |
def _calculate_weighting_full(sequence, WEIGHTS, L=LOWEST_LENGTH, E=0.0375): | |
weightings = [0] | |
for i in range(1, len(sequence) - 1): | |
if i < len(sequence) - 2: | |
subsequence = sequence[i-1:i+3] | |
comparison_pattern = f"{'1' if subsequence[0] == subsequence[1] else '0'}1{'1' if subsequence[2] == subsequence[1] else '0'}{'1' if subsequence[3] == subsequence[1] else '0'}" | |
weight = WEIGHTS.get(comparison_pattern, 0) | |
weight = weight * E if i > L else weight | |
else: | |
weight = 0 | |
weightings.append(weight) | |
weightings.append(0) | |
return weightings | |
def _centre_of_mass(polar_coordinates, weightings): | |
x, y = _calculate_standard_coordinates(polar_coordinates) | |
return sum(weightings[i] * ((x[i] - (x[i]*weightings[i]))**2 + (y[i] - y[i]*weightings[i])**2) for i in range(len(x))) | |
def _normalised_moment_of_inertia(polar_coordinates, weightings): | |
moment = _centre_of_mass(polar_coordinates, weightings) | |
return np.sqrt(moment / sum(weightings)) | |
def _calculate_standard_coordinates(polar_coordinates): | |
return [rho * np.cos(theta) for theta, rho in polar_coordinates], [rho * np.sin(theta) for theta, rho in polar_coordinates] | |
def _moments_of_inertia(polar_coordinates, weightings): | |
return [_normalised_moment_of_inertia(indices, weightings) for subsequence, indices in polar_coordinates.items()] | |
def moment_of_inertia(sequence, WEIGHTS, L=5000, E=0.0375): | |
subsequences = _get_subsequences(sequence) | |
polar_coordinates = {subsequence: _calculate_coordinates_fixed(indices, len(sequence)) for subsequence, indices in subsequences.items()} | |
weightings = _calculate_weighting_full(sequence, WEIGHTS, L=L, E=E) | |
return _moments_of_inertia(polar_coordinates, weightings) | |
def similarity_wen(sequence1, sequence2, WEIGHTS, L=5000, E=0.0375): | |
L = min(len(sequence1), len(sequence2)) | |
inertia1 = moment_of_inertia(sequence1, WEIGHTS, L=L, E=E) | |
inertia2 = moment_of_inertia(sequence2, WEIGHTS, L=L, E=E) | |
similarity = np.sqrt(sum((x - y)**2 for x, y in zip(inertia1, inertia2))) | |
return similarity | |
def heatmap(data, row_labels, col_labels, ax=None, | |
cbar_kw=None, cbarlabel="", **kwargs): | |
""" | |
Create a heatmap from a numpy array and two lists of labels. | |
Parameters | |
---------- | |
data | |
A 2D numpy array of shape (M, N). | |
row_labels | |
A list or array of length M with the labels for the rows. | |
col_labels | |
A list or array of length N with the labels for the columns. | |
ax | |
A `matplotlib.axes.Axes` instance to which the heatmap is plotted. If | |
not provided, use current axes or create a new one. Optional. | |
cbar_kw | |
A dictionary with arguments to `matplotlib.Figure.colorbar`. Optional. | |
cbarlabel | |
The label for the colorbar. Optional. | |
**kwargs | |
All other arguments are forwarded to `imshow`. | |
""" | |
if ax is None: | |
ax = plt.gca() | |
if cbar_kw is None: | |
cbar_kw = {} | |
# Plot the heatmap | |
im = ax.imshow(data, **kwargs) | |
# Create colorbar | |
cbar = ax.figure.colorbar(im, ax=ax, **cbar_kw) | |
cbar.ax.set_ylabel(cbarlabel, rotation=-90, va="bottom") | |
# Show all ticks and label them with the respective list entries. | |
ax.set_xticks(np.arange(data.shape[1]), labels=col_labels) | |
ax.set_yticks(np.arange(data.shape[0]), labels=row_labels) | |
# Let the horizontal axes labeling appear on top. | |
ax.tick_params(top=True, bottom=False, | |
labeltop=True, labelbottom=False) | |
# Rotate the tick labels and set their alignment. | |
plt.setp(ax.get_xticklabels(), rotation=-30, ha="right", | |
rotation_mode="anchor") | |
# Turn spines off and create white grid. | |
ax.spines[:].set_visible(False) | |
ax.set_xticks(np.arange(data.shape[1]+1)-.5, minor=True) | |
ax.set_yticks(np.arange(data.shape[0]+1)-.5, minor=True) | |
ax.grid(which="minor", color="w", linestyle='-', linewidth=3) | |
ax.tick_params(which="minor", bottom=False, left=False) | |
return im, cbar | |
def annotate_heatmap(im, data=None, valfmt="{x:.2f}", | |
textcolors=("black", "white"), | |
threshold=None, **textkw): | |
""" | |
A function to annotate a heatmap. | |
Parameters | |
---------- | |
im | |
The AxesImage to be labeled. | |
data | |
Data used to annotate. If None, the image's data is used. Optional. | |
valfmt | |
The format of the annotations inside the heatmap. This should either | |
use the string format method, e.g. "$ {x:.2f}", or be a | |
`matplotlib.ticker.Formatter`. Optional. | |
textcolors | |
A pair of colors. The first is used for values below a threshold, | |
the second for those above. Optional. | |
threshold | |
Value in data units according to which the colors from textcolors are | |
applied. If None (the default) uses the middle of the colormap as | |
separation. Optional. | |
**kwargs | |
All other arguments are forwarded to each call to `text` used to create | |
the text labels. | |
""" | |
if not isinstance(data, (list, np.ndarray)): | |
data = im.get_array() | |
# Normalize the threshold to the images color range. | |
if threshold is not None: | |
threshold = im.norm(threshold) | |
else: | |
threshold = im.norm(data.max())/2. | |
# Set default alignment to center, but allow it to be | |
# overwritten by textkw. | |
kw = dict(horizontalalignment="center", | |
verticalalignment="center") | |
kw.update(textkw) | |
# Get the formatter in case a string is supplied | |
if isinstance(valfmt, str): | |
valfmt = matplotlib.ticker.StrMethodFormatter(valfmt) | |
# Loop over the data and create a `Text` for each "pixel". | |
# Change the text's color depending on the data. | |
texts = [] | |
for i in range(data.shape[0]): | |
for j in range(data.shape[1]): | |
kw.update(color=textcolors[int(im.norm(data[i, j]) > threshold)]) | |
text = im.axes.text(j, i, valfmt(data[i, j], None), **kw) | |
texts.append(text) | |
return texts | |
def wens_method_heatmap(df, virus_species): | |
# Create a dataframe to store the similarity values | |
similarity_df = pd.DataFrame(index=virus_species, columns=virus_species) | |
# Fill the dataframe with similarity values | |
for virus1 in virus_species: | |
for virus2 in virus_species: | |
if virus1 == virus2: | |
sequence1 = df[df['Organism_Name'] == virus1]['Sequence'].values[0] | |
sequence2 = df[df['Organism_Name'] == virus2]['Sequence'].values[1] | |
similarity = similarity_wen(sequence1, sequence2, WEIGHTS) | |
similarity_df.loc[virus1, virus2] = similarity | |
else: | |
sequence1 = df[df['Organism_Name'] == virus1]['Sequence'].values[0] | |
sequence2 = df[df['Organism_Name'] == virus2]['Sequence'].values[0] | |
similarity = similarity_wen(sequence1, sequence2, WEIGHTS) | |
similarity_df.loc[virus1, virus2] = similarity | |
similarity_df = similarity_df.apply(pd.to_numeric) | |
# Optional: Handle NaN values if your similarity computation might result in them | |
# similarity_df.fillna(0, inplace=True) | |
fig, ax = plt.subplots() | |
# Plotting | |
im = ax.imshow(similarity_df, cmap="YlGn") | |
ax.set_xticks(np.arange(len(virus_species)), labels=virus_species) | |
ax.set_yticks(np.arange(len(virus_species)), labels=virus_species) | |
plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor") | |
cbar = ax.figure.colorbar(im, ax=ax) | |
cbar.ax.set_ylabel("Similarity", rotation=-90, va="bottom") | |
return fig | |
############################################################# Sub-Specie ######################################################## | |
import numpy as np | |
from scipy.interpolate import interp1d, CubicSpline | |
import pandas as pd | |
from tqdm import tqdm | |
# Define constants | |
MIN_DISTANCE = 2581 | |
VECTORS = { | |
'A': [0.5, -0.8660254], | |
'T': [0.5, 0.8660254], | |
'G': [0.8660254, -0.5], | |
'C': [0.8660254, 0.5] | |
} | |
def create_dna_representation_ew_subs(seq): | |
"""Create a 2D representation of DNA sequence using cubic spline interpolation.""" | |
# Clean the sequence | |
clean_seq = ''.join(char for char in seq if char in VECTORS) | |
# Convert sequence to numerical representation | |
num_seq = np.array([VECTORS[char] for char in clean_seq], dtype=float) | |
# Calculate cumulative sum | |
cum_sum = num_seq.cumsum(axis=0) | |
# Perform cubic spline interpolation | |
x = np.arange(len(cum_sum)) | |
cs_x = CubicSpline(x, cum_sum[:, 0]) | |
cs_y = CubicSpline(x, cum_sum[:, 1]) | |
# Interpolate to 2048 points | |
x_new = np.linspace(0, len(cum_sum) - 1, 2048) | |
return np.column_stack([cs_x(x_new), cs_y(x_new)]).tolist() | |
def create_dna_representation_for_subs(row): | |
"""Create a 1D representation of DNA sequence using linear interpolation.""" | |
min_distance = int(row['min_distance']) | |
seq = ''.join(char for char in row['seq'] if char in VECTORS)[:min_distance] | |
min_distance = int(min_distance * 0.66) | |
# Convert sequence to numerical representation | |
num_seq = np.array([VECTORS[char] for char in seq], dtype=float) | |
# Calculate cumulative sum | |
cum_sum = num_seq.cumsum(axis=0) | |
# Perform linear interpolation | |
f = interp1d(cum_sum[:, 0], cum_sum[:, 1], kind='cubic', fill_value='extrapolate') | |
x_new = np.linspace(0, min_distance - 1, min_distance) | |
return f(x_new) | |
def create_groups_subs(closest_matches): | |
"""Create groups based on closest matches.""" | |
groups = {} | |
visited = set() | |
def dfs(node, group): | |
if node in visited: | |
return | |
visited.add(node) | |
group.add(node) | |
for neighbor in closest_matches[node]: | |
dfs(neighbor, group) | |
for i in range(len(closest_matches)): | |
if i not in visited: | |
group = set() | |
dfs(i, group) | |
if len(group) > 1: # Ignore elements with no closest match | |
groups[f"group_{len(groups) + 1}"] = sorted(list(group)) | |
return groups | |
def process_data_sub_specie(df, species, varience): | |
"""Process DNA data for a given species.""" | |
# Filter data for the given species | |
df_plot = df[df['organism_name'] == species].reset_index(drop=True).copy() | |
# Calculate median sequence length and filter sequences | |
median = df_plot['seq_len'].median() * 0.8 | |
df_plot['min_distance'] = median | |
df_plot = df_plot[df_plot['seq_len'] > median].reset_index(drop=True) | |
# Create DNA representations | |
df_plot['two_d'] = df_plot.apply(create_dna_representation_for_subs, axis=1) | |
values = np.array(df_plot['two_d'].tolist()) | |
# Calculate differences between sequences | |
n_rows = values.shape[0] | |
b_list = [] | |
for i in tqdm(range(n_rows)): | |
diff = np.abs(values[i:i+1, :] - values).sum(axis=1) | |
b_list.append(diff) | |
bbbb = np.array(b_list) | |
print(bbbb) | |
np.fill_diagonal(bbbb, 10000) | |
median_filter = median * varience | |
maxxx = [np.where(bbbb[i] < median_filter)[0] for i in range(len(bbbb))] | |
# Create groups | |
groups = create_groups_subs(maxxx) | |
# Add group information to dataframe | |
df_plot['group'] = 'No Group' | |
for group_name, group_indices in groups.items(): | |
df_plot.loc[group_indices, 'group'] = group_name | |
# Create 2D representations | |
df_plot['two_d'] = df_plot['seq'].apply(create_dna_representation_ew_subs) | |
return df_plot | |
############################################################# ColorSquare ######################################################## | |
import math | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from matplotlib.colors import ListedColormap | |
import pandas as pd | |
def _fill_spiral(matrix, seq_colors, k): | |
left, top, right, bottom = 0, 0, k-1, k-1 | |
index = 0 | |
while left <= right and top <= bottom: | |
for i in range(left, right + 1): # Top row | |
if index < len(seq_colors): | |
matrix[top][i] = seq_colors[index] | |
index += 1 | |
top += 1 | |
for i in range(top, bottom + 1): # Right column | |
if index < len(seq_colors): | |
matrix[i][right] = seq_colors[index] | |
index += 1 | |
right -= 1 | |
for i in range(right, left - 1, -1): # Bottom row | |
if index < len(seq_colors): | |
matrix[bottom][i] = seq_colors[index] | |
index += 1 | |
bottom -= 1 | |
for i in range(bottom, top - 1, -1): # Left column | |
if index < len(seq_colors): | |
matrix[i][left] = seq_colors[index] | |
index += 1 | |
left += 1 | |
def _generate_color_square(sequence,virus, save=False, count=0, label=None): | |
# Define the sequence and corresponding colors with indices | |
colors = {'a': 0, 't': 1, 'c': 2, 'g': 3, 'n': 4} # Assign indices to each color | |
seq_colors = [colors[char] for char in sequence.lower()] # Map the sequence to color indices | |
# Calculate k (size of the square) | |
k = math.ceil(math.sqrt(len(sequence))) | |
# Initialize a k x k matrix filled with the index for 'white' | |
matrix = np.full((k, k), colors['n'], dtype=int) | |
# Fill the matrix in a clockwise spiral | |
_fill_spiral(matrix, seq_colors, k) | |
# Define a custom color map for plotting | |
cmap = ListedColormap(['red', 'green', 'yellow', 'blue', 'white']) | |
# Plot the matrix | |
plt.figure(figsize=(5, 5)) | |
plt.imshow(matrix, cmap=cmap, interpolation='nearest') | |
if label: | |
plt.title(label) | |
plt.axis('off') # Hide the axes | |
if save: | |
plt.savefig(f'color_square_{virus}_{count}.png', dpi=300, bbox_inches='tight') | |
# plt.show() | |
def plot_color_square(df, virus_species): | |
ncols = 3 | |
nrows = len(virus_species) | |
fig, axeses = plt.subplots( | |
nrows=nrows, | |
ncols=ncols, | |
squeeze=False, | |
) | |
for i in range(0, ncols * nrows): | |
row = i // ncols | |
col = i % ncols | |
axes = axeses[row, col] | |
data = df[i] | |
virus = virus_species[row] | |
# Define the sequence and corresponding colors with indices | |
colors = {'a': 0, 't': 1, 'c': 2, 'g': 3, 'n': 4} | |
# remove all non-nucleotide characters | |
data = ''.join([char for char in data.lower() if char in 'atcgn']) | |
# Assign indices to each color | |
seq_colors = [colors[char] for char in data.lower()] # Map the sequence to color indices | |
# Calculate k (size of the square) | |
k = math.ceil(math.sqrt(len(data))) | |
# Initialize a k x k matrix filled with the index for 'white' | |
matrix = np.full((k, k), colors['n'], dtype=int) | |
# Fill the matrix in a clockwise spiral | |
_fill_spiral(matrix, seq_colors, k) | |
# Define a custom color map for plotting | |
cmap = ListedColormap(['red', 'green', 'yellow', 'blue', 'white']) | |
axes.imshow(matrix, cmap=cmap, interpolation='nearest') | |
axes.set_title(virus) | |
return fig | |
def generate_color_square(sequence,virus, multi=False, save=False, label=None): | |
if multi: | |
for i,seq in enumerate(sequence): | |
_generate_color_square(seq, virus,save, i, label[i] if label else None) | |
else: | |
_generate_color_square(sequence, save, label=label) | |
############################################################# FCGR ######################################################## | |
from typing import Dict, Optional | |
from collections import namedtuple | |
# coordinates for x+iy | |
Coord = namedtuple("Coord", ["x","y"]) | |
# coordinates for a CGR encoding | |
CGRCoords = namedtuple("CGRCoords", ["N","x","y"]) | |
# coordinates for each nucleotide in the 2d-plane | |
DEFAULT_COORDS = dict(A=Coord(1,1),C=Coord(-1,1),G=Coord(-1,-1),T=Coord(1,-1)) | |
class CGR: | |
"Chaos Game Representation for DNA" | |
def __init__(self, coords: Optional[Dict[chr,tuple]]=None): | |
self.nucleotide_coords = DEFAULT_COORDS if coords is None else coords | |
self.cgr_coords = CGRCoords(0,0,0) | |
def nucleotide_by_coords(self,x,y): | |
"Get nucleotide by coordinates (x,y)" | |
# filter nucleotide by coordinates | |
filtered = dict(filter(lambda item: item[1] == Coord(x,y), self.nucleotide_coords.items())) | |
return list(filtered.keys())[0] | |
def forward(self, nucleotide: str): | |
"Compute next CGR coordinates" | |
x = (self.cgr_coords.x + self.nucleotide_coords.get(nucleotide).x)/2 | |
y = (self.cgr_coords.y + self.nucleotide_coords.get(nucleotide).y)/2 | |
# update cgr_coords | |
self.cgr_coords = CGRCoords(self.cgr_coords.N+1,x,y) | |
def backward(self,): | |
"Compute last CGR coordinates. Current nucleotide can be inferred from (x,y)" | |
# get current nucleotide based on coordinates | |
n_x,n_y = self.coords_current_nucleotide() | |
nucleotide = self.nucleotide_by_coords(n_x,n_y) | |
# update coordinates to the previous one | |
x = 2*self.cgr_coords.x - n_x | |
y = 2*self.cgr_coords.y - n_y | |
# update cgr_coords | |
self.cgr_coords = CGRCoords(self.cgr_coords.N-1,x,y) | |
return nucleotide | |
def coords_current_nucleotide(self,): | |
x = 1 if self.cgr_coords.x>0 else -1 | |
y = 1 if self.cgr_coords.y>0 else -1 | |
return x,y | |
def encode(self, sequence: str): | |
"From DNA sequence to CGR" | |
# reset starting position to (0,0,0) | |
self.reset_coords() | |
for nucleotide in sequence: | |
self.forward(nucleotide) | |
return self.cgr_coords | |
def reset_coords(self,): | |
self.cgr_coords = CGRCoords(0,0,0) | |
def decode(self, N:int, x:int, y:int)->str: | |
"From CGR to DNA sequence" | |
self.cgr_coords = CGRCoords(N,x,y) | |
# decoded sequence | |
sequence = [] | |
# Recover the entire genome | |
while self.cgr_coords.N>0: | |
nucleotide = self.backward() | |
sequence.append(nucleotide) | |
return "".join(sequence[::-1]) | |
from itertools import product | |
from collections import defaultdict | |
import numpy as np | |
class FCGR(CGR): | |
"""Frequency matrix CGR | |
an (2**k x 2**k) 2D representation will be created for a | |
n-long sequence. | |
- k represents the k-mer. | |
- 2**k x 2**k = 4**k the total number of k-mers (sequences of length k) | |
- pixel value correspond to the value of the frequency for each k-mer | |
""" | |
def __init__(self, k: int,): | |
super().__init__() | |
self.k = k # k-mer representation | |
self.kmers = list("".join(kmer) for kmer in product("ACGT", repeat=self.k)) | |
self.kmer2pixel = self.kmer2pixel_position() | |
def __call__(self, sequence: str): | |
"Given a DNA sequence, returns an array with his frequencies in the same order as FCGR" | |
self.count_kmers(sequence) | |
# Create an empty array to save the FCGR values | |
array_size = int(2**self.k) | |
freq_matrix = np.zeros((array_size,array_size)) | |
# Assign frequency to each box in the matrix | |
for kmer, freq in self.freq_kmer.items(): | |
pos_x, pos_y = self.kmer2pixel[kmer] | |
freq_matrix[int(pos_x)-1,int(pos_y)-1] = freq | |
return freq_matrix | |
def count_kmer(self, kmer): | |
if "N" not in kmer: | |
self.freq_kmer[kmer] += 1 | |
def count_kmers(self, sequence: str): | |
self.freq_kmer = defaultdict(int) | |
# representativity of kmers | |
last_j = len(sequence) - self.k + 1 | |
kmers = (sequence[i:(i+self.k)] for i in range(last_j)) | |
# count kmers in a dictionary | |
list(self.count_kmer(kmer) for kmer in kmers) | |
def kmer_probabilities(self, sequence: str): | |
self.probabilities = defaultdict(float) | |
N=len(sequence) | |
for key, value in self.freq_kmer.items(): | |
self.probabilities[key] = float(value) / (N - self.k + 1) | |
def pixel_position(self, kmer: str): | |
"Get pixel position in the FCGR matrix for a k-mer" | |
coords = self.encode(kmer) | |
N,x,y = coords.N, coords.x, coords.y | |
# Coordinates from [-1,1]² to [1,2**k]² | |
np_coords = np.array([(x + 1)/2, (y + 1)/2]) # move coordinates from [-1,1]² to [0,1]² | |
np_coords *= 2**self.k # rescale coordinates from [0,1]² to [0,2**k]² | |
x,y = np.ceil(np_coords) # round to upper integer | |
# Turn coordinates (cx,cy) into pixel (px,py) position | |
# px = 2**k-cy+1, py = cx | |
return 2**self.k-int(y)+1, int(x) | |
def kmer2pixel_position(self,): | |
kmer2pixel = dict() | |
for kmer in self.kmers: | |
kmer2pixel[kmer] = self.pixel_position(kmer) | |
return kmer2pixel | |
from tqdm import tqdm | |
from pathlib import Path | |
import numpy as np | |
class GenerateFCGR: | |
def __init__(self, kmer: int = 5, ): | |
self.kmer = kmer | |
self.fcgr = FCGR(kmer) | |
self.counter = 0 # count number of time a sequence is converted to fcgr | |
def __call__(self, list_fasta,): | |
for fasta in tqdm(list_fasta, desc="Generating FCGR"): | |
self.from_fasta(fasta) | |
def from_seq(self, seq: str): | |
"Get FCGR from a sequence" | |
seq = self.preprocessing(seq) | |
chaos = self.fcgr(seq) | |
self.counter +=1 | |
return chaos | |
def reset_counter(self,): | |
self.counter=0 | |
def preprocessing(seq): | |
seq = seq.upper() | |
for letter in seq: | |
if letter not in "ATCG": | |
seq = seq.replace(letter,"N") | |
return seq | |
def plot_fcgr(df, virus_species): | |
ncols = 3 | |
nrows = len(virus_species) | |
fig, axeses = plt.subplots( | |
nrows=nrows, | |
ncols=ncols, | |
squeeze=False, | |
) | |
for i in range(0, ncols * nrows): | |
row = i // ncols | |
col = i % ncols | |
axes = axeses[row, col] | |
data = df[i].upper() | |
chaos = GenerateFCGR().from_seq(seq=data) | |
virus = virus_species[row] | |
axes.imshow(chaos) | |
axes.set_title(virus) | |
return fig | |
############################################################# Persistant Homology ######################################################## | |
import numpy as np | |
import persim | |
import ripser | |
import matplotlib.pyplot as plt | |
NUCLEOTIDE_MAPPING = { | |
'a': np.array([1, 0, 0, 0]), | |
'c': np.array([0, 1, 0, 0]), | |
'g': np.array([0, 0, 1, 0]), | |
't': np.array([0, 0, 0, 1]) | |
} | |
def encode_nucleotide_to_vector(nucleotide): | |
return NUCLEOTIDE_MAPPING.get(nucleotide) | |
def chaos_4d_representation(dna_sequence): | |
points = [encode_nucleotide_to_vector(dna_sequence[0])] | |
for nucleotide in dna_sequence[1:]: | |
vector = encode_nucleotide_to_vector(nucleotide) | |
if vector is None: | |
continue | |
next_point = 0.5 * (points[-1] + vector) | |
points.append(next_point) | |
return np.array(points) | |
def persistence_homology(dna_sequence, multi=False, plot=False, sample_rate=7): | |
if multi: | |
c4dr_points = np.array([chaos_4d_representation(sequence) for sequence in dna_sequence]) | |
dgm_dna = [ripser.ripser(points[::sample_rate], maxdim=1)['dgms'] for points in c4dr_points] | |
if plot: | |
persim.plot_diagrams([dgm[1] for dgm in dgm_dna], labels=[f'sequence {i}' for i in range(len(dna_sequence))]) | |
else: | |
c4dr_points = chaos_4d_representation(dna_sequence) | |
dgm_dna = ripser.ripser(c4dr_points[::sample_rate], maxdim=1)['dgms'] | |
if plot: | |
persim.plot_diagrams(dgm_dna[1]) | |
return dgm_dna | |
def plot_diagrams( | |
diagrams, | |
plot_only=None, | |
title=None, | |
xy_range=None, | |
labels=None, | |
colormap="default", | |
size=20, | |
ax_color=np.array([0.0, 0.0, 0.0]), | |
diagonal=True, | |
lifetime=False, | |
legend=True, | |
show=False, | |
ax=None | |
): | |
"""A helper function to plot persistence diagrams. | |
Parameters | |
---------- | |
diagrams: ndarray (n_pairs, 2) or list of diagrams | |
A diagram or list of diagrams. If diagram is a list of diagrams, | |
then plot all on the same plot using different colors. | |
plot_only: list of numeric | |
If specified, an array of only the diagrams that should be plotted. | |
title: string, default is None | |
If title is defined, add it as title of the plot. | |
xy_range: list of numeric [xmin, xmax, ymin, ymax] | |
User provided range of axes. This is useful for comparing | |
multiple persistence diagrams. | |
labels: string or list of strings | |
Legend labels for each diagram. | |
If none are specified, we use H_0, H_1, H_2,... by default. | |
colormap: string, default is 'default' | |
Any of matplotlib color palettes. | |
Some options are 'default', 'seaborn', 'sequential'. | |
See all available styles with | |
.. code:: python | |
import matplotlib as mpl | |
print(mpl.styles.available) | |
size: numeric, default is 20 | |
Pixel size of each point plotted. | |
ax_color: any valid matplotlib color type. | |
See [https://matplotlib.org/api/colors_api.html](https://matplotlib.org/api/colors_api.html) for complete API. | |
diagonal: bool, default is True | |
Plot the diagonal x=y line. | |
lifetime: bool, default is False. If True, diagonal is turned to False. | |
Plot life time of each point instead of birth and death. | |
Essentially, visualize (x, y-x). | |
legend: bool, default is True | |
If true, show the legend. | |
show: bool, default is False | |
Call plt.show() after plotting. If you are using self.plot() as part | |
of a subplot, set show=False and call plt.show() only once at the end. | |
""" | |
fig, ax = plt.subplots() if ax is None else ax | |
plt.style.use(colormap) | |
xlabel, ylabel = "Birth", "Death" | |
if not isinstance(diagrams, list): | |
# Must have diagrams as a list for processing downstream | |
diagrams = [diagrams] | |
if labels is None: | |
# Provide default labels for diagrams if using self.dgm_ | |
labels = ["$H_{{{}}}$".format(i) for i , _ in enumerate(diagrams)] | |
if plot_only: | |
diagrams = [diagrams[i] for i in plot_only] | |
labels = [labels[i] for i in plot_only] | |
if not isinstance(labels, list): | |
labels = [labels] * len(diagrams) | |
# Construct copy with proper type of each diagram | |
# so we can freely edit them. | |
diagrams = [dgm.astype(np.float32, copy=True) for dgm in diagrams] | |
# find min and max of all visible diagrams | |
concat_dgms = np.concatenate(diagrams).flatten() | |
has_inf = np.any(np.isinf(concat_dgms)) | |
finite_dgms = concat_dgms[np.isfinite(concat_dgms)] | |
# clever bounding boxes of the diagram | |
if not xy_range: | |
# define bounds of diagram | |
ax_min, ax_max = np.min(finite_dgms), np.max(finite_dgms) | |
x_r = ax_max - ax_min | |
# Give plot a nice buffer on all sides. | |
# ax_range=0 when only one point, | |
buffer = 1 if xy_range == 0 else x_r / 5 | |
x_down = ax_min - buffer / 2 | |
x_up = ax_max + buffer | |
y_down, y_up = x_down, x_up | |
else: | |
x_down, x_up, y_down, y_up = xy_range | |
yr = y_up - y_down | |
if lifetime: | |
# Don't plot landscape and diagonal at the same time. | |
diagonal = False | |
# reset y axis so it doesn't go much below zero | |
y_down = -yr * 0.05 | |
y_up = y_down + yr | |
# set custom ylabel | |
ylabel = "Lifetime" | |
# set diagrams to be (x, y-x) | |
for dgm in diagrams: | |
dgm[:, 1] -= dgm[:, 0] | |
# plot horizon line | |
ax.plot([x_down, x_up], [0, 0], c=ax_color) | |
# Plot diagonal | |
if diagonal: | |
ax.plot([x_down, x_up], [x_down, x_up], "--", c=ax_color) | |
# Plot inf line | |
if has_inf: | |
# put inf line slightly below top | |
b_inf = y_down + yr * 0.95 | |
ax.plot([x_down, x_up], [b_inf, b_inf], "--", c="k", label=r"$\infty$") | |
# convert each inf in each diagram with b_inf | |
for dgm in diagrams: | |
dgm[np.isinf(dgm)] = b_inf | |
# Plot each diagram | |
for dgm, label in zip(diagrams, labels): | |
# plot persistence pairs | |
ax.scatter(dgm[:, 0], dgm[:, 1], size, label=label, edgecolor="none") | |
ax.set_xlabel(xlabel) | |
ax.set_ylabel(ylabel) | |
ax.set_xlim([x_down, x_up]) | |
ax.set_ylim([y_down, y_up]) | |
ax.set_aspect('equal', 'box') | |
if title is not None: | |
ax.set_title(title) | |
if legend is True: | |
ax.legend(loc="lower right") | |
if show is True: | |
plt.show() | |
return fig, ax | |
def plot_persistence_homology(df, virus_species): | |
# if len(virus_species.unique()) > 1: | |
c4dr_points = [chaos_4d_representation(sequence.lower()) for sequence in df] | |
dgm_dna = [ripser.ripser(points[::15], maxdim=1)['dgms'] for points in c4dr_points] | |
labels =[f'{virus_specie}_{i}' for i, virus_specie in enumerate(virus_species)] | |
fig, ax = plot_diagrams([dgm[1] for dgm in dgm_dna], labels=labels) | |
# else: | |
# c4dr_points = [chaos_4d_representation(sequence.lower()) for sequence in df] | |
# dgm_dna = [ripser.ripser(points[::10], maxdim=1)['dgms'] for points in c4dr_points] | |
# labels =[f'{virus_specie}_{i}' for i, virus_specie in enumerate(virus_species)] | |
# print(labels) | |
# print(len(dgm_dna)) | |
# fig, ax = plot_diagrams([dgm[1] for dgm in dgm_dna], labels=labels) | |
return fig | |
def compare_persistence_homology(dna_sequence1, dna_sequence2): | |
dgm_dna1 = persistence_homology(dna_sequence1) | |
dgm_dna2 = persistence_homology(dna_sequence2) | |
distance = persim.sliced_wasserstein(dgm_dna1[1], dgm_dna2[1]) | |
return distance | |