|
from collections import deque, Counter
|
|
import warnings
|
|
import pandas as pd
|
|
import numpy as np
|
|
from xml.etree import ElementTree as ET
|
|
import math
|
|
|
|
BIOLOGICAL_PROCESS = 'GO:0008150'
|
|
MOLECULAR_FUNCTION = 'GO:0003674'
|
|
CELLULAR_COMPONENT = 'GO:0005575'
|
|
FUNC_DICT = {
|
|
'cc': CELLULAR_COMPONENT,
|
|
'mf': MOLECULAR_FUNCTION,
|
|
'bp': BIOLOGICAL_PROCESS}
|
|
|
|
NAMESPACES = {
|
|
'cc': 'cellular_component',
|
|
'mf': 'molecular_function',
|
|
'bp': 'biological_process'
|
|
}
|
|
|
|
EXP_CODES = set([
|
|
'EXP', 'IDA', 'IPI', 'IMP', 'IGI', 'IEP', 'TAS', 'IC',
|
|
'HTP', 'HDA', 'HMP', 'HGI', 'HEP'])
|
|
|
|
|
|
CAFA_TARGETS = set([
|
|
'287', '3702', '4577', '6239', '7227', '7955', '9606', '9823', '10090',
|
|
'10116', '44689', '83333', '99287', '226900', '243273', '284812', '559292'])
|
|
|
|
|
|
def is_cafa_target(org):
|
|
return org in CAFA_TARGETS
|
|
|
|
|
|
def is_exp_code(code):
|
|
return code in EXP_CODES
|
|
|
|
|
|
def get_goplus_defs(filename='data/definitions.txt'):
|
|
plus_defs = {}
|
|
with open(filename) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
go_id, definition = line.split(': ')
|
|
go_id = go_id.replace('_', ':')
|
|
definition = definition.replace('_', ':')
|
|
plus_defs[go_id] = set(definition.split(' and '))
|
|
return plus_defs
|
|
|
|
|
|
class Ontology(object):
|
|
|
|
def __init__(self, filename='data/go.obo', with_rels=False):
|
|
self.ont = self.load(filename, with_rels)
|
|
self.ic = None
|
|
self.ic_norm = 0.0
|
|
|
|
def has_term(self, term_id):
|
|
return term_id in self.ont
|
|
|
|
def get_term(self, term_id):
|
|
if self.has_term(term_id):
|
|
return self.ont[term_id]
|
|
return None
|
|
|
|
def calculate_ic(self, annots):
|
|
cnt = Counter()
|
|
for x in annots:
|
|
cnt.update(x)
|
|
self.ic = {}
|
|
for go_id, n in cnt.items():
|
|
parents = self.get_parents(go_id)
|
|
if len(parents) == 0:
|
|
min_n = n
|
|
else:
|
|
min_n = min([cnt[x] for x in parents])
|
|
|
|
self.ic[go_id] = math.log(min_n / n, 2)
|
|
self.ic_norm = max(self.ic_norm, self.ic[go_id])
|
|
|
|
def get_ic(self, go_id):
|
|
if self.ic is None:
|
|
raise Exception('Not yet calculated')
|
|
if go_id not in self.ic:
|
|
return 0.0
|
|
return self.ic[go_id]
|
|
|
|
def get_norm_ic(self, go_id):
|
|
return self.get_ic(go_id) / self.ic_norm
|
|
|
|
def load(self, filename, with_rels):
|
|
ont = dict()
|
|
obj = None
|
|
with open(filename, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line == '[Term]':
|
|
if obj is not None:
|
|
ont[obj['id']] = obj
|
|
obj = dict()
|
|
obj['is_a'] = list()
|
|
obj['part_of'] = list()
|
|
obj['regulates'] = list()
|
|
obj['alt_ids'] = list()
|
|
obj['is_obsolete'] = False
|
|
continue
|
|
elif line == '[Typedef]':
|
|
if obj is not None:
|
|
ont[obj['id']] = obj
|
|
obj = None
|
|
else:
|
|
if obj is None:
|
|
continue
|
|
l = line.split(": ")
|
|
if l[0] == 'id':
|
|
obj['id'] = l[1]
|
|
elif l[0] == 'alt_id':
|
|
obj['alt_ids'].append(l[1])
|
|
elif l[0] == 'namespace':
|
|
obj['namespace'] = l[1]
|
|
elif l[0] == 'is_a':
|
|
obj['is_a'].append(l[1].split(' ! ')[0])
|
|
elif with_rels and l[0] == 'relationship':
|
|
it = l[1].split()
|
|
|
|
obj['is_a'].append(it[1])
|
|
elif l[0] == 'name':
|
|
obj['name'] = l[1]
|
|
elif l[0] == 'is_obsolete' and l[1] == 'true':
|
|
obj['is_obsolete'] = True
|
|
if obj is not None:
|
|
ont[obj['id']] = obj
|
|
for term_id in list(ont.keys()):
|
|
for t_id in ont[term_id]['alt_ids']:
|
|
ont[t_id] = ont[term_id]
|
|
if ont[term_id]['is_obsolete']:
|
|
del ont[term_id]
|
|
for term_id, val in ont.items():
|
|
if 'children' not in val:
|
|
val['children'] = set()
|
|
for p_id in val['is_a']:
|
|
if p_id in ont:
|
|
if 'children' not in ont[p_id]:
|
|
ont[p_id]['children'] = set()
|
|
ont[p_id]['children'].add(term_id)
|
|
|
|
return ont
|
|
|
|
def get_anchestors(self, term_id):
|
|
if term_id not in self.ont:
|
|
return set()
|
|
term_set = set()
|
|
q = deque()
|
|
q.append(term_id)
|
|
while (len(q) > 0):
|
|
t_id = q.popleft()
|
|
if t_id not in term_set:
|
|
term_set.add(t_id)
|
|
for parent_id in self.ont[t_id]['is_a']:
|
|
if parent_id in self.ont:
|
|
q.append(parent_id)
|
|
return term_set
|
|
|
|
def get_prop_terms(self, terms):
|
|
prop_terms = set()
|
|
|
|
for term_id in terms:
|
|
prop_terms |= self.get_anchestors(term_id)
|
|
return prop_terms
|
|
|
|
def get_parents(self, term_id):
|
|
if term_id not in self.ont:
|
|
return set()
|
|
term_set = set()
|
|
for parent_id in self.ont[term_id]['is_a']:
|
|
if parent_id in self.ont:
|
|
term_set.add(parent_id)
|
|
return term_set
|
|
|
|
def get_namespace_terms(self, namespace):
|
|
terms = set()
|
|
for go_id, obj in self.ont.items():
|
|
if obj['namespace'] == namespace:
|
|
terms.add(go_id)
|
|
return terms
|
|
|
|
def get_namespace(self, term_id):
|
|
return self.ont[term_id]['namespace']
|
|
|
|
def get_term_set(self, term_id):
|
|
if term_id not in self.ont:
|
|
return set()
|
|
term_set = set()
|
|
q = deque()
|
|
q.append(term_id)
|
|
while len(q) > 0:
|
|
t_id = q.popleft()
|
|
if t_id not in term_set:
|
|
term_set.add(t_id)
|
|
for ch_id in self.ont[t_id]['children']:
|
|
q.append(ch_id)
|
|
return term_set
|
|
|
|
|
|
def read_fasta(filename):
|
|
seqs = list()
|
|
info = list()
|
|
seq = ''
|
|
inf = ''
|
|
with open(filename, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line.startswith('>'):
|
|
if seq != '':
|
|
seqs.append(seq)
|
|
info.append(inf)
|
|
seq = ''
|
|
inf = line[1:].split()[0]
|
|
else:
|
|
seq += line
|
|
seqs.append(seq)
|
|
info.append(inf)
|
|
return info, seqs
|
|
|
|
|
|
class DataGenerator(object):
|
|
|
|
def __init__(self, batch_size, is_sparse=False):
|
|
self.batch_size = batch_size
|
|
self.is_sparse = is_sparse
|
|
|
|
def fit(self, inputs, targets=None):
|
|
self.start = 0
|
|
self.inputs = inputs
|
|
self.targets = targets
|
|
if isinstance(self.inputs, tuple) or isinstance(self.inputs, list):
|
|
self.size = self.inputs[0].shape[0]
|
|
else:
|
|
self.size = self.inputs.shape[0]
|
|
self.has_targets = targets is not None
|
|
|
|
def __next__(self):
|
|
return self.next()
|
|
|
|
def reset(self):
|
|
self.start = 0
|
|
|
|
def next(self):
|
|
if self.start < self.size:
|
|
batch_index = np.arange(
|
|
self.start, min(self.size, self.start + self.batch_size))
|
|
if isinstance(self.inputs, tuple) or isinstance(self.inputs, list):
|
|
res_inputs = []
|
|
for inp in self.inputs:
|
|
if self.is_sparse:
|
|
res_inputs.append(
|
|
inp[batch_index, :].toarray())
|
|
else:
|
|
res_inputs.append(inp[batch_index, :])
|
|
else:
|
|
if self.is_sparse:
|
|
res_inputs = self.inputs[batch_index, :].toarray()
|
|
else:
|
|
res_inputs = self.inputs[batch_index, :]
|
|
self.start += self.batch_size
|
|
if self.has_targets:
|
|
if self.is_sparse:
|
|
labels = self.targets[batch_index, :].toarray()
|
|
else:
|
|
labels = self.targets[batch_index, :]
|
|
return (res_inputs, labels)
|
|
return res_inputs
|
|
else:
|
|
self.reset()
|
|
return self.next()
|
|
|
|
|
|
|