Seq-TransfoRNA / kba_pipeline /src /annotate_from_mapping.py
Yak-hbdx's picture
uploaded TransfoRNA repo
0b11a42 verified
raw
history blame contribute delete
No virus
36.7 kB
######################################################################################################
# annotate sequences based on mapping results
######################################################################################################
#%%
import os
import logging
import numpy as np
import pandas as pd
from difflib import get_close_matches
from Levenshtein import distance
import json
from joblib import Parallel, delayed
import multiprocessing
from utils import (fasta2df, fasta2df_subheader,log_time, reverse_complement)
from precursor_bins import get_bin_with_max_overlap
log = logging.getLogger(__name__)
pd.options.mode.chained_assignment = None
######################################################################################################
# paths to reference and mapping files
######################################################################################################
version = '_v4'
HBDxBase_csv = f'../../references/HBDxBase/HBDxBase_all{version}.csv'
miRBase_mature_path = '../../references/HBDxBase/miRBase/mature.fa'
mat_miRNA_pos_path = '../../references/HBDxBase/miRBase/hsa_mature_position.txt'
mapped_file = 'seqsmapped2HBDxBase_combined.txt'
unmapped_file = 'tmp_seqs3mm2HBDxBase_pseudo__unmapped.fa'
TE_file = 'tmp_seqsmapped2genome_intersect_TE.txt'
mapped_genome_file = 'seqsmapped2genome_combined.txt'
toomanyloci_genome_file = 'tmp_seqs0mm2genome__toomanyalign.fa'
unmapped_adapter_file = 'tmp_seqs3mm2adapters__unmapped.fa'
unmapped_genome_file = 'tmp_seqs0mm2genome__unmapped.fa'
unmapped_bacterial_file = 'tmp_seqs0mm2bacterial__unmapped.fa'
unmapped_viral_file = 'tmp_seqs0mm2viral__unmapped.fa'
sRNA_anno_file = 'sRNA_anno_from_mapping.csv'
aggreg_sRNA_anno_file = 'sRNA_anno_aggregated_on_seq.csv'
#%%
######################################################################################################
# specific functions
######################################################################################################
@log_time(log)
def extract_general_info(mapping_file):
# load mapping file
mapping_df = pd.read_csv(mapping_file, sep='\t', header=None)
mapping_df.columns = ['tmp_seq_id','reference','ref_start','sequence','other_alignments','mm_descriptors']
# add precursor length + number of bins that will be used for names
HBDxBase_df = pd.read_csv(HBDxBase_csv, index_col=0)
HBDxBase_df = HBDxBase_df[['precursor_length','precursor_bins','pseudo_class']].reset_index()
HBDxBase_df.rename(columns={'index': "reference"}, inplace=True)
mapping_df = mapping_df.merge(HBDxBase_df, left_on='reference', right_on='reference', how='left')
# extract information
mapping_df.loc[:,'mms'] = mapping_df.mm_descriptors.fillna('').str.count('>')
mapping_df.loc[:,'mm_descriptors'] = mapping_df.mm_descriptors.str.replace(',', ';')
mapping_df.loc[:,'small_RNA_class_annotation'] = mapping_df.reference.str.split('|').str[0]
mapping_df.loc[:,'subclass_type'] = mapping_df.reference.str.split('|').str[2]
mapping_df.loc[:,'precursor_name_full'] = mapping_df.reference.str.split('|').str[1].str.split('|').str[0]
mapping_df.loc[:,'precursor_name'] = mapping_df.precursor_name_full.str.split('__').str[0].str.split('|').str[0]
mapping_df.loc[:,'seq_length'] = mapping_df.sequence.apply(lambda x: len(x))
mapping_df.loc[:,'ref_end'] = mapping_df.ref_start + mapping_df.seq_length - 1
mapping_df.loc[:,'mitochondrial'] = np.where(mapping_df.reference.str.contains(r'(\|MT-)|(12S)|(16S)'), 'mito', 'nuclear')
return mapping_df
#%%
@log_time(log)
def tRNA_annotation(mapping_df):
"""Extract tRNA specific annotation from mapping.
"""
# keep only tRNA leader/trailer with right cutting sites (+/- 5nt)
# leader
tRF_leader_df = mapping_df[mapping_df['subclass_type'] == 'leader_tRF']
# assign as misc-leader-tRF if exceeding defined cutting site range
tRF_leader_df.loc[:,'subclass_type'] = np.where((tRF_leader_df.ref_start + tRF_leader_df.sequence.apply(lambda x: len(x))).between(45, 55, inclusive='both'), 'leader_tRF', 'misc-leader-tRF')
# trailer
tRF_trailer_df = mapping_df[mapping_df['subclass_type'] == 'trailer_tRF']
# assign as misc-trailer-tRF if exceeding defined cutting site range
tRF_trailer_df.loc[:,'subclass_type'] = np.where(tRF_trailer_df.ref_start.between(0, 5, inclusive='both'), 'trailer_tRF', 'misc-trailer-tRF')
# define tRF subclasses (leader_tRF and trailer_tRF have been assigned previously)
# NOTE: allow more flexibility at ends (similar to miRNA annotation)
tRNAs_df = mapping_df[((mapping_df['small_RNA_class_annotation'] == 'tRNA') & mapping_df['subclass_type'].isna())]
tRNAs_df.loc[((tRNAs_df.ref_start < 3) & (tRNAs_df.seq_length >= 30)),'subclass_type'] = '5p-tR-half'
tRNAs_df.loc[((tRNAs_df.ref_start < 3) & (tRNAs_df.seq_length < 30)),'subclass_type'] = '5p-tRF'
tRNAs_df.loc[(((tRNAs_df.precursor_length - (tRNAs_df.ref_end + 1)) < 6) & (tRNAs_df.seq_length >= 30)),'subclass_type'] = '3p-tR-half'
tRNAs_df.loc[(((tRNAs_df.precursor_length - (tRNAs_df.ref_end + 1)).between(3,6,inclusive='neither')) & (tRNAs_df.seq_length < 30)),'subclass_type'] = '3p-tRF'
tRNAs_df.loc[(((tRNAs_df.precursor_length - (tRNAs_df.ref_end + 1)) < 3) & (tRNAs_df.seq_length < 30)),'subclass_type'] = '3p-CCA-tRF'
tRNAs_df.loc[tRNAs_df.subclass_type.isna(),'subclass_type'] = 'misc-tRF'
# add ref_iso flag
tRNAs_df['tRNA_ref_iso'] = np.where(
(
(tRNAs_df.ref_start == 0)
| ((tRNAs_df.ref_end + 1) == tRNAs_df.precursor_length)
| ((tRNAs_df.ref_end + 1) == (tRNAs_df.precursor_length - 3))
), 'reftRF', 'isotRF'
)
# concat tRNA, leader & trailer dfs
tRNAs_df = pd.concat([tRNAs_df, tRF_leader_df, tRF_trailer_df],axis=0)
# adjust precursor name and create tRNA name
tRNAs_df['precursor_name'] = tRNAs_df.precursor_name.str.extract(r"((tRNA-...-...)|(MT-..)|(tRX-...-...)|(tRNA-i...-...))", expand=True)[0]
tRNAs_df['subclass_name'] = tRNAs_df.subclass_type + '__' + tRNAs_df.precursor_name
return tRNAs_df
#%%
def faustrules_check(row):
"""Check if isomiRs follow Faustrules (based on Tomasello et al. 2021).
"""
# mark seqs that are not in range +/- 2nt of mature start
# check if ref_start.between(miRNAs_df.mature_start-2, miRNAs_df.mature_start+2, inclusive='both')]
ref_start = row['ref_start']
mature_start = row['mature_start']
if ref_start < mature_start - 2 or ref_start > mature_start + 2:
return False
# mark seqs with mismatch unless A>G or C>T in seed region (= position 0-8) or 3' polyA/polyT (max 3nt)
if pd.isna(row['mm_descriptors']):
return True
seed_region_positions = set(range(9))
non_templated_ends = {'A', 'AA', 'AAA', 'T', 'TT', 'TTT'}
sequence = row['sequence']
mm_descriptors = row['mm_descriptors'].split(';')
seed_region_mismatches = 0
three_prime_end_mismatches = 0
for descriptor in mm_descriptors:
pos, change = descriptor.split(':')
pos = int(pos)
original, new = change.split('>')
if pos in seed_region_positions and (original == 'A' and new == 'G' or original == 'C' and new == 'T'):
seed_region_mismatches += 1
if pos >= len(sequence) - 3 and sequence[pos:] in non_templated_ends:
three_prime_end_mismatches += 1
total_mismatches = seed_region_mismatches + three_prime_end_mismatches
return total_mismatches == len(mm_descriptors)
@log_time(log)
def miRNA_annotation(mapping_df):
"""Extract miRNA specific annotation from mapping. RaH Faustrules are applied.
"""
miRNAs_df = mapping_df[mapping_df.small_RNA_class_annotation == 'miRNA']
nr_missing_alignments_expected = len(miRNAs_df.loc[miRNAs_df.duplicated(['tmp_seq_id','reference'], keep='first'),:])
# load positions of mature miRNAs within precursor
miRNA_pos_df = pd.read_csv(mat_miRNA_pos_path, sep='\t')
miRNA_pos_df.drop(columns=['precursor_length'], inplace=True)
miRNAs_df = miRNAs_df.merge(miRNA_pos_df, left_on='precursor_name_full', right_on='name_precursor', how='left')
# load mature miRNA sequences from miRBase
miRBase_mature_df = fasta2df_subheader(miRBase_mature_path,0)
# subset to human miRNAs
miRBase_mature_df = miRBase_mature_df.loc[miRBase_mature_df.index.str.contains('hsa-'),:]
miRBase_mature_df.index = miRBase_mature_df.index.str.replace('hsa-','')
miRBase_mature_df.reset_index(inplace=True)
miRBase_mature_df.columns = ['name_mature','ref_miR_seq']
# add 'ref_miR_seq'
miRNAs_df = miRNAs_df.merge(miRBase_mature_df, left_on='name_mature', right_on='name_mature', how='left')
# for each duplicated tmp_seq_id/reference combi, keep the one lowest lev dist of sequence to ref_miR_seq
miRNAs_df['lev_dist'] = miRNAs_df.apply(lambda x: distance(x['sequence'], x['ref_miR_seq']), axis=1)
miRNAs_df = miRNAs_df.sort_values(by=['tmp_seq_id','lev_dist'], ascending=[True, True]).drop_duplicates(['tmp_seq_id','reference'], keep='first')
# add ref_iso flag
miRNAs_df['miRNA_ref_iso'] = np.where(
(
(miRNAs_df.ref_start == miRNAs_df.mature_start)
& (miRNAs_df.ref_end == miRNAs_df.mature_end)
& (miRNAs_df.mms == 0)
), 'refmiR', 'isomiR'
)
# apply RaH Faustrules
miRNAs_df['faustrules_check'] = miRNAs_df.apply(faustrules_check, axis=1)
# set miRNA_ref_iso to 'misc-miR' if faustrules_check is False
miRNAs_df.loc[~miRNAs_df.faustrules_check,'miRNA_ref_iso'] = 'misc-miR'
# set subclass_name to name_mature if faustrules_check is True, else use precursor_name
miRNAs_df['subclass_name'] = np.where(miRNAs_df.faustrules_check, miRNAs_df.name_mature, miRNAs_df.precursor_name)
# store name_mature for functional analysis as miRNA_names, set miR- to mir- if faustrules_check is False
miRNAs_df['miRNA_names'] = np.where(miRNAs_df.faustrules_check, miRNAs_df.name_mature, miRNAs_df.name_mature.str.replace('miR-', 'mir-'))
# add subclass (NOTE: in cases where subclass is not part of mature name, use position relative to precursor half to define group )
miRNAs_df['subclass_type'] = np.where(miRNAs_df.name_mature.str.endswith('5p'), '5p', np.where(miRNAs_df.name_mature.str.endswith('3p'), '3p', 'tbd'))
miRNAs_df.loc[((miRNAs_df.subclass_type == 'tbd') & (miRNAs_df.mature_start < miRNAs_df.precursor_length/2)), 'subclass_type'] = '5p'
miRNAs_df.loc[((miRNAs_df.subclass_type == 'tbd') & (miRNAs_df.mature_start >= miRNAs_df.precursor_length/2)), 'subclass_type'] = '3p'
# subset to relevant columns
miRNAs_df = miRNAs_df[list(mapping_df.columns) + ['subclass_name','miRNA_ref_iso','miRNA_names','ref_miR_seq']]
return miRNAs_df, nr_missing_alignments_expected
#%%
######################################################################################################
# annotation of other sRNA classes
######################################################################################################
def get_bin_with_max_overlap_parallel(df):
return df.apply(get_bin_with_max_overlap, axis=1)
def applyParallel(df, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for group in np.array_split(df,30))
return pd.concat(retLst)
@log_time(log)
def other_sRNA_annotation_new_binning(mapping_df):
"""Generate subclass_name for non-tRNA/miRNA sRNAs by precursor-binning.
New binning approach: bin size is dynamically determined by the precursor length. Assignments are based on the bin with the highest overlap.
"""
other_sRNAs_df = mapping_df[~((mapping_df.small_RNA_class_annotation == 'miRNA') | (mapping_df.small_RNA_class_annotation == 'tRNA'))]
#create empty columns; bin start and bin end
other_sRNAs_df['bin_start'] = ''
other_sRNAs_df['bin_end'] = ''
other_sRNAs_df = applyParallel(other_sRNAs_df, get_bin_with_max_overlap_parallel)
return other_sRNAs_df
#%%
@log_time(log)
def extract_sRNA_class_specific_info(mapping_df):
tRNAs_df = tRNA_annotation(mapping_df)
miRNAs_df, nr_missing_alignments_expected = miRNA_annotation(mapping_df)
other_sRNAs_df = other_sRNA_annotation_new_binning(mapping_df)
# add miRNA columns
tRNAs_df[['miRNA_ref_iso', 'miRNA_names', 'ref_miR_seq']] = pd.DataFrame(columns=['miRNA_ref_iso', 'miRNA_names', 'ref_miR_seq'])
other_sRNAs_df[['miRNA_ref_iso', 'miRNA_names', 'ref_miR_seq']] = pd.DataFrame(columns=['miRNA_ref_iso', 'miRNA_names', 'ref_miR_seq'])
# re-concat sRNA class dfs
sRNA_anno_df = pd.concat([miRNAs_df, tRNAs_df, other_sRNAs_df],axis=0)
# TEST if alignments were lost or duplicated
assert ((len(mapping_df) - nr_missing_alignments_expected) == len(sRNA_anno_df)), "alignments were lost or duplicated"
return sRNA_anno_df
#%%
def get_nth_nt(row):
return row['sequence'][int(row['PTM_position_in_seq'])-1]
#%%
@log_time(log)
def aggregate_info_per_seq(sRNA_anno_df):
# fillna of 'subclass_name_bin_pos' with 'subclass_name'
sRNA_anno_df['subclass_name_bin_pos'] = sRNA_anno_df['subclass_name_bin_pos'].fillna(sRNA_anno_df['subclass_name'])
# get aggregated info per seq
aggreg_per_seq_df = sRNA_anno_df.groupby(['sequence']).agg({'small_RNA_class_annotation': lambda x: ';'.join(sorted(x.unique())), 'pseudo_class': lambda x: ';'.join(x.astype(str).sort_values(ascending=True).unique()), 'subclass_type': lambda x: ';'.join(x.astype(str).sort_values(ascending=True).unique()), 'subclass_name': lambda x: ';'.join(sorted(x.unique())), 'subclass_name_bin_pos': lambda x: ';'.join(sorted(x.unique())), 'miRNA_names': lambda x: ';'.join(x.fillna('').unique()), 'precursor_name_full': lambda x: ';'.join(sorted(x.unique())), 'mms': lambda x: ';'.join(x.astype(str).sort_values(ascending=True).unique()), 'reference': lambda x: len(x), 'mitochondrial': lambda x: ';'.join(x.astype(str).sort_values(ascending=True).unique()), 'ref_miR_seq': lambda x: ';'.join(x.fillna('').unique())})
aggreg_per_seq_df['miRNA_names'] = aggreg_per_seq_df.miRNA_names.str.replace(r';$','', regex=True)
aggreg_per_seq_df['ref_miR_seq'] = aggreg_per_seq_df.ref_miR_seq.str.replace(r';$','', regex=True)
aggreg_per_seq_df['mms'] = aggreg_per_seq_df['mms'].astype(int)
# re-add 'miRNA_ref_iso','tRNA_ref_iso'
refmir_df = sRNA_anno_df[['sequence','miRNA_ref_iso','tRNA_ref_iso']]
refmir_df.drop_duplicates('sequence', inplace=True)
refmir_df.set_index('sequence', inplace=True)
aggreg_per_seq_df = aggreg_per_seq_df.merge(refmir_df, left_index=True, right_index=True, how='left')
# TEST if sequences were lost
assert (len(aggreg_per_seq_df) == len(sRNA_anno_df.sequence.unique())), "sequences were lost by aggregation"
# load unmapped seqs, if it exits
if os.path.exists(unmapped_file):
unmapped_df = fasta2df(unmapped_file)
unmapped_df = pd.DataFrame(data='no_annotation', index=unmapped_df.sequence, columns=aggreg_per_seq_df.columns)
unmapped_df['mms'] = np.nan
unmapped_df['reference'] = np.nan
unmapped_df['pseudo_class'] = True # set no annotation as pseudo_class
# merge mapped and unmapped
annotation_df = pd.concat([aggreg_per_seq_df,unmapped_df])
else:
annotation_df = aggreg_per_seq_df.copy()
# load mapping to genome file
mapping_genome_df = pd.read_csv(mapped_genome_file, index_col=0, sep='\t', header=None)
mapping_genome_df.columns = ['strand','reference','ref_start','sequence','other_alignments','mm_descriptors']
mapping_genome_df = mapping_genome_df[['strand','reference','ref_start','sequence','other_alignments']]
# use reverse complement of 'sequence' for 'strand' == '-'
mapping_genome_df.loc[:,'sequence'] = np.where(mapping_genome_df.strand == '-', mapping_genome_df.sequence.apply(lambda x: reverse_complement(x)), mapping_genome_df.sequence)
# get aggregated info per seq
aggreg_per_seq__genome_df = mapping_genome_df.groupby('sequence').agg({'reference': lambda x: ';'.join(sorted(x.unique())), 'other_alignments': lambda x: len(x)})
aggreg_per_seq__genome_df['other_alignments'] = aggreg_per_seq__genome_df['other_alignments'].astype(int)
# number of genomic loci
genomic_loci_df = pd.DataFrame(mapping_genome_df.sequence.value_counts())
genomic_loci_df.columns = ['num_genomic_loci_maps']
# load too many aligments seqs
if os.path.exists(toomanyloci_genome_file):
toomanyloci_genome_df = fasta2df(toomanyloci_genome_file)
toomanyloci_genome_df = pd.DataFrame(data=101, index=toomanyloci_genome_df.sequence, columns=genomic_loci_df.columns)
else:
toomanyloci_genome_df = pd.DataFrame(columns=genomic_loci_df.columns)
# load unmapped seqs
if os.path.exists(unmapped_genome_file):
unmapped_genome_df = fasta2df(unmapped_genome_file)
unmapped_genome_df = pd.DataFrame(data=0, index=unmapped_genome_df.sequence, columns=genomic_loci_df.columns)
else:
unmapped_genome_df = pd.DataFrame(columns=genomic_loci_df.columns)
# concat toomanyloci, unmapped, and genomic_loci
num_genomic_loci_maps_df = pd.concat([genomic_loci_df,toomanyloci_genome_df,unmapped_genome_df])
# merge to annotation_df
annotation_df = annotation_df.merge(num_genomic_loci_maps_df, left_index=True, right_index=True, how='left')
annotation_df.reset_index(inplace=True)
# add 'miRNA_seed'
annotation_df.loc[:,"miRNA_seed"] = np.where(annotation_df.small_RNA_class_annotation.str.contains('miRNA', na=False), annotation_df.sequence.str[1:9], "")
# TEST if nan values in 'num_genomic_loci_maps'
assert (annotation_df.num_genomic_loci_maps.isna().any() == False), "nan values in 'num_genomic_loci_maps'"
return annotation_df
#%%
@log_time(log)
def get_five_prime_adapter_info(annotation_df, five_prime_adapter):
adapter_df = pd.DataFrame(index=annotation_df.sequence)
min_length = 6
is_prefixed = None
print("5' adapter affixes:")
for l in range(0, len(five_prime_adapter) - min_length):
is_prefixed_l = adapter_df.index.str.startswith(five_prime_adapter[l:])
print(f"{five_prime_adapter[l:].ljust(30, ' ')}{is_prefixed_l.sum()}")
adapter_df.loc[adapter_df.index.str.startswith(five_prime_adapter[l:]), "five_prime_adapter_length"] = len(five_prime_adapter[l:])
if is_prefixed is None:
is_prefixed = is_prefixed_l
else:
is_prefixed |= is_prefixed_l
print(f"There are {is_prefixed.sum()} prefixed features.")
print("\n")
adapter_df['five_prime_adapter_length'] = adapter_df['five_prime_adapter_length'].fillna(0)
adapter_df['five_prime_adapter_length'] = adapter_df['five_prime_adapter_length'].astype('int')
adapter_df['five_prime_adapter_filter'] = np.where(adapter_df['five_prime_adapter_length'] == 0, True, False)
adapter_df = adapter_df.reset_index()
return adapter_df
#%%
@log_time(log)
def reduce_ambiguity(annotation_df: pd.DataFrame) -> pd.DataFrame:
"""Reduce ambiguity by
a) using subclass_name of precursor with shortest genomic context, if all other assigned precursors overlap with its genomic region
b) using subclass_name whose bin is at the 5' or 3' end of the precursor
Parameters
----------
annotation_df : pd.DataFrame
A DataFrame containing the annotation of the sequences (var)
Returns
-------
pd.DataFrame
An improved version of the input DataFrame with reduced ambiguity
"""
# extract ambigious assignments for subclass name
ambigious_matches_df = annotation_df[annotation_df.subclass_name.str.contains(';',na=False)]
if len(ambigious_matches_df) == 0:
print('No ambigious assignments for subclass name found.')
return annotation_df
clear_matches_df = annotation_df[~annotation_df.subclass_name.str.contains(';',na=False)]
# extract required information from HBDxBase
HBDxBase_all_df = pd.read_csv(HBDxBase_csv, index_col=0)
bin_dict = HBDxBase_all_df[['precursor_name','precursor_bins']].set_index('precursor_name').to_dict()['precursor_bins']
sRNA_class_dict = HBDxBase_all_df[['precursor_name','small_RNA_class_annotation']].set_index('precursor_name').to_dict()['small_RNA_class_annotation']
pseudo_class_dict = HBDxBase_all_df[['precursor_name','pseudo_class']].set_index('precursor_name').to_dict()['pseudo_class']
sc_type_dict = HBDxBase_all_df[['precursor_name','subclass_type']].set_index('precursor_name').to_dict()['subclass_type']
genomic_context_bed = HBDxBase_all_df[['chr','start','end','precursor_name','score','strand']]
genomic_context_bed.columns = ['seq_id','start','end','name','score','strand']
genomic_context_bed.reset_index(drop=True, inplace=True)
genomic_context_bed['genomic_length'] = genomic_context_bed.end - genomic_context_bed.start
def get_overlaps(genomic_context_bed: pd.DataFrame, name: str = None, complement: bool = False) -> list:
"""Get genomic overlap of a given precursor name
Parameters
----------
genomic_context_bed : pd.DataFrame
A DataFrame containing genomic locations of precursors in bed format
with column names: 'chr','start','end','precursor_name','score','strand'
name : str
The name of the precursor to get genomic context for
complement : bool
If True, return all precursors that do not overlap with the given precursor
Returns
-------
list
A list containing the precursors in the genomic (anti-)context of the given precursor
(including the precursor itself)
"""
series_OI = genomic_context_bed[genomic_context_bed['name'] == name]
start = series_OI['start'].values[0]
end = series_OI['end'].values[0]
seq_id = series_OI['seq_id'].values[0]
strand = series_OI['strand'].values[0]
overlap_df = genomic_context_bed.copy()
condition = (((overlap_df.start > start) &
(overlap_df.start < end)) |
((overlap_df.end > start) &
(overlap_df.end < end)) |
((overlap_df.start < start) &
(overlap_df.end > start)) |
((overlap_df.start == start) &
(overlap_df.end == end)) |
((overlap_df.start == start) &
(overlap_df.end > end)) |
((overlap_df.start < start) &
(overlap_df.end == end)))
if not complement:
overlap_df = overlap_df[condition]
else:
overlap_df = overlap_df[~condition]
overlap_df = overlap_df[overlap_df.seq_id == seq_id]
if strand is not None:
overlap_df = overlap_df[overlap_df.strand == strand]
overlap_list = overlap_df['name'].tolist()
return overlap_list
def check_genomic_ctx_of_smallest_prec(precursor_name: str) -> str:
"""Check for a given ambigious precursor assignment (several names separated by ';')
if all assigned precursors overlap with the genomic region
of the precursor with the shortest genomic context
Parameters
----------
precursor_name: str
A string containing several precursor names separated by ';'
Returns
-------
str
The precursor suggested to be used instead of the multi assignment,
or None if the ambiguity could not be resolved
"""
assigned_names = precursor_name.split(';')
tmp_genomic_context = genomic_context_bed[genomic_context_bed.name.isin(assigned_names)]
# get name of smallest genomic region
if len(tmp_genomic_context) > 0:
smallest_name = tmp_genomic_context.name[tmp_genomic_context.genomic_length.idxmin()]
# check if all assigned names are in overlap of smallest genomic region
if set(assigned_names).issubset(set(get_overlaps(genomic_context_bed,smallest_name))):
return smallest_name
else:
return None
else:
return None
def get_subclass_name(subclass_name: str, short_prec_match_new_name: str) -> str:
"""Get subclass name matching to a precursor name from a ambigious assignment (several names separated by ';')
Parameters
----------
subclass_name: str
A string containing several subclass names separated by ';'
short_prec_match_new_name: str
The name of the precursor to be used instead of the multi assignment
Returns
-------
str
The subclass name suggested to be used instead of the multi assignment,
or None if the ambiguity could not be resolved
"""
if short_prec_match_new_name is not None:
matches = get_close_matches(short_prec_match_new_name,subclass_name.split(';'),cutoff=0.2)
if matches:
return matches[0]
else:
print(f"Could not find match for {short_prec_match_new_name} in {subclass_name}")
return subclass_name
else:
return None
def check_end_bins(subclass_name: str) -> str:
"""Check for a given ambigious subclass name assignment (several names separated by ';')
if ambiguity can be resolved by selecting the subclass name whose bin matches the 3'/5' end of the precursor
Parameters
----------
subclass_name: str
A string containing several subclass names separated by ';'
Returns
-------
str
The subclass name suggested to be used instead of the multi assignment,
or None if the ambiguity could not be resolved
"""
for name in subclass_name.split(';'):
if '_bin-' in name:
name_parts = name.split('_bin-')
if name_parts[0] in bin_dict and bin_dict[name_parts[0]] == int(name_parts[1]):
return name
elif int(name_parts[1]) == 1:
return name
return None
def adjust_4_resolved_cases(row: pd.Series) -> tuple:
"""For a resolved ambiguous subclass names return adjusted values of
precursor_name_full, small_RNA_class_annotation, pseudo_class, and subclass_type
Parameters
----------
row: pd.Series
A row of the var annotation containing the columns 'subclass_name', 'precursor_name_full',
'small_RNA_class_annotation', 'pseudo_class', 'subclass_type', and 'ambiguity_resolved'
Returns
-------
tuple
A tuple containing the adjusted values of 'precursor_name_full', 'small_RNA_class_annotation',
'pseudo_class', and 'subclass_type' for resolved ambiguous cases and the original values for unresolved cases
"""
if row.ambiguity_resolved:
matches_prec = get_close_matches(row.subclass_name, row.precursor_name_full.split(';'), cutoff=0.2)
if matches_prec:
return matches_prec[0], sRNA_class_dict[matches_prec[0]], pseudo_class_dict[matches_prec[0]], sc_type_dict[matches_prec[0]]
return row.precursor_name_full, row.small_RNA_class_annotation, row.pseudo_class, row.subclass_type
# resolve ambiguity by checking genomic context of smallest precursor
ambigious_matches_df['short_prec_match_new_name'] = ambigious_matches_df.precursor_name_full.apply(check_genomic_ctx_of_smallest_prec)
ambigious_matches_df['short_prec_match_new_name'] = ambigious_matches_df.apply(lambda x: get_subclass_name(x.subclass_name, x.short_prec_match_new_name), axis=1)
ambigious_matches_df['short_prec_match'] = ambigious_matches_df['short_prec_match_new_name'].notnull()
# resolve ambiguity by checking if bin matches 3'/5' end of precursor
ambigious_matches_df['end_bin_match_new_name'] = ambigious_matches_df.subclass_name.apply(check_end_bins)
ambigious_matches_df['end_bin_match'] = ambigious_matches_df['end_bin_match_new_name'].notnull()
# check if short_prec_match and end_bin_match are equal in any case
test_df = ambigious_matches_df[((ambigious_matches_df.short_prec_match == True) & (ambigious_matches_df.end_bin_match == True))]
if not (test_df.short_prec_match_new_name == test_df.end_bin_match_new_name).all():
print('Number of cases where short_prec_match is not matching end_bin_match_new_name:',len(test_df[(test_df.short_prec_match_new_name != test_df.end_bin_match_new_name)]))
# replace subclass_name with short_prec_match_new_name or end_bin_match_new_name
# NOTE: if short_prec_match and end_bin_match are True, short_prec_match_new_name is used
ambigious_matches_df['subclass_name'] = ambigious_matches_df.apply(lambda x: x.end_bin_match_new_name if x.end_bin_match == True else x.subclass_name, axis=1)
ambigious_matches_df['subclass_name'] = ambigious_matches_df.apply(lambda x: x.short_prec_match_new_name if x.short_prec_match == True else x.subclass_name, axis=1)
# generate column 'ambiguity_resolved' which is True if short_prec_match and/or end_bin_match is True
ambigious_matches_df['ambiguity_resolved'] = ambigious_matches_df.short_prec_match | ambigious_matches_df.end_bin_match
print("Ambiguity resolved?\n",ambigious_matches_df.ambiguity_resolved.value_counts(normalize=True))
# for resolved ambiguous matches, adjust precursor_name_full, small_RNA_class_annotation, pseudo_class, subclass_type
ambigious_matches_df[['precursor_name_full','small_RNA_class_annotation','pseudo_class','subclass_type']] = ambigious_matches_df.apply(adjust_4_resolved_cases, axis=1, result_type='expand')
# drop temporary columns
ambigious_matches_df.drop(columns=['short_prec_match_new_name','short_prec_match','end_bin_match_new_name','end_bin_match'], inplace=True)
# concat with clear_matches_df
clear_matches_df['ambiguity_resolved'] = False
improved_annotation_df = pd.concat([clear_matches_df, ambigious_matches_df], axis=0)
improved_annotation_df = improved_annotation_df.reindex(annotation_df.index)
return improved_annotation_df
#%%
######################################################################################################
# HICO (=high confidence) annotation
######################################################################################################
@log_time(log)
def add_hico_annotation(annotation_df, five_prime_adapter):
"""For miRNAs only use hico annotation if part of miRBase hico set AND refmiR
"""
# add 'TE_annotation'
TE_df = pd.read_csv(TE_file, sep='\t', header=None, names=['sequence','TE_annotation'])
annotation_df = annotation_df.merge(TE_df, left_on='sequence', right_on='sequence', how='left')
# add 'bacterial' mapping filter
bacterial_unmapped_df = fasta2df(unmapped_bacterial_file)
annotation_df.loc[:,'bacterial'] = np.where(annotation_df.sequence.isin(bacterial_unmapped_df.sequence), False, True)
# add 'viral' mapping filter
viral_unmapped_df = fasta2df(unmapped_viral_file)
annotation_df.loc[:,'viral'] = np.where(annotation_df.sequence.isin(viral_unmapped_df.sequence), False, True)
# add 'adapter_mapping_filter' column
adapter_unmapped_df = fasta2df(unmapped_adapter_file)
annotation_df.loc[:,'adapter_mapping_filter'] = np.where(annotation_df.sequence.isin(adapter_unmapped_df.sequence), True, False)
# add filter column 'five_prime_adapter_filter' and column 'five_prime_adapter_length' indicating the length of the prefixed 5' adapter sequence
adapter_df = get_five_prime_adapter_info(annotation_df, five_prime_adapter)
annotation_df = annotation_df.merge(adapter_df, left_on='sequence', right_on='sequence', how='left')
# apply ambiguity reduction
annotation_df = reduce_ambiguity(annotation_df)
# add 'single_class_annotation'
annotation_df.loc[:,'single_class_annotation'] = np.where(annotation_df.small_RNA_class_annotation.str.contains(';',na=True), False, True)
# add 'single_name_annotation'
annotation_df.loc[:,'single_name_annotation'] = np.where(annotation_df.subclass_name.str.contains(';',na=True), False, True)
# add 'hypermapper' for sequences where more than 50 potential mapping references are recorded
annotation_df.loc[annotation_df.reference > 50,'subclass_name'] = 'hypermapper_' + annotation_df.reference.fillna(0).astype(int).astype(str)
annotation_df.loc[annotation_df.reference > 50,'subclass_name_bin_pos'] = 'hypermapper_' + annotation_df.reference.fillna(0).astype(int).astype(str)
annotation_df.loc[annotation_df.reference > 50,'precursor_name_full'] = 'hypermapper_' + annotation_df.reference.fillna(0).astype(int).astype(str)
annotation_df.loc[:,'mitochondrial'] = np.where(annotation_df.mitochondrial.str.contains('mito',na=False), True, False)
# add 'hico'
annotation_df.loc[:,'hico'] = np.where((
(annotation_df.mms == 0)
& (annotation_df.single_name_annotation == True)
& (annotation_df.TE_annotation.isna() == True)
& (annotation_df.bacterial == False)
& (annotation_df.viral == False)
& (annotation_df.adapter_mapping_filter == True)
& (annotation_df.five_prime_adapter_filter == True)
), True, False)
## NOTE: for miRNAs only use hico annotation if part of refmiR set
annotation_df.loc[annotation_df.small_RNA_class_annotation == 'miRNA','hico'] = annotation_df.loc[annotation_df.small_RNA_class_annotation == 'miRNA','hico'] & (annotation_df.miRNA_ref_iso == 'refmiR')
print(annotation_df[annotation_df.single_class_annotation == True].groupby('small_RNA_class_annotation').hico.value_counts())
return annotation_df
#%%
######################################################################################################
# annotation pipeline
######################################################################################################
@log_time(log)
def main(five_prime_adapter):
"""Executes 'annotate_from_mapping'.
Uses:
- HBDxBase_csv
- miRBase_mature_path
- mat_miRNA_pos_path
- mapping_file
- unmapped_file
- mapped_genome_file
- toomanyloci_genome_file
- unmapped_genome_file
- TE_file
- unmapped_adapter_file
- unmapped_bacterial_file
- unmapped_viral_file
- five_prime_adapter
"""
print('-------- extract general information for sequences that mapped to the HBDxBase --------')
mapped_info_df = extract_general_info(mapped_file)
print("\n")
print('-------- extract sRNA class specific information for sequences that mapped to the HBDxBase --------')
mapped_sRNA_anno_df = extract_sRNA_class_specific_info(mapped_info_df)
print('-------- save to file --------')
mapped_sRNA_anno_df.to_csv(sRNA_anno_file)
print("\n")
print('-------- aggregate information for mapped and unmapped sequences (HBDxBase & human genome) --------')
sRNA_anno_per_seq_df = aggregate_info_per_seq(mapped_sRNA_anno_df)
print("\n")
print('-------- add hico annotation (based on aggregated infos + mapping to viral/bacterial genomes + intersection with TEs) --------')
sRNA_anno_per_seq_df = add_hico_annotation(sRNA_anno_per_seq_df, five_prime_adapter)
print("\n")
print('-------- save to file --------')
# set sequence as index again
sRNA_anno_per_seq_df.set_index('sequence', inplace=True)
sRNA_anno_per_seq_df.to_csv(aggreg_sRNA_anno_file)
print("\n")
print('-------- generate subclass_to_annotation dict --------')
result_df = sRNA_anno_per_seq_df[['subclass_name', 'small_RNA_class_annotation']].copy()
result_df.reset_index(drop=True, inplace=True)
result_df.drop_duplicates(inplace=True)
result_df = result_df[~result_df["subclass_name"].str.contains(";")]
subclass_to_annotation = dict(zip(result_df["subclass_name"],result_df["small_RNA_class_annotation"]))
with open('subclass_to_annotation.json', 'w') as fp:
json.dump(subclass_to_annotation, fp)
print('-------- delete tmp files --------')
os.system("rm *tmp_*")
#%%