wenkai's picture
Upload 22 files
7c8fc75 verified
from collections import deque, Counter
import warnings
import pandas as pd
import numpy as np
from xml.etree import ElementTree as ET
import math
BIOLOGICAL_PROCESS = 'GO:0008150'
MOLECULAR_FUNCTION = 'GO:0003674'
CELLULAR_COMPONENT = 'GO:0005575'
FUNC_DICT = {
'cc': CELLULAR_COMPONENT,
'mf': MOLECULAR_FUNCTION,
'bp': BIOLOGICAL_PROCESS}
NAMESPACES = {
'cc': 'cellular_component',
'mf': 'molecular_function',
'bp': 'biological_process'
}
EXP_CODES = set([
'EXP', 'IDA', 'IPI', 'IMP', 'IGI', 'IEP', 'TAS', 'IC',
'HTP', 'HDA', 'HMP', 'HGI', 'HEP'])
# CAFA4 Targets
CAFA_TARGETS = set([
'287', '3702', '4577', '6239', '7227', '7955', '9606', '9823', '10090',
'10116', '44689', '83333', '99287', '226900', '243273', '284812', '559292'])
def is_cafa_target(org):
return org in CAFA_TARGETS
def is_exp_code(code):
return code in EXP_CODES
def get_goplus_defs(filename='data/definitions.txt'):
plus_defs = {}
with open(filename) as f:
for line in f:
line = line.strip()
go_id, definition = line.split(': ')
go_id = go_id.replace('_', ':')
definition = definition.replace('_', ':')
plus_defs[go_id] = set(definition.split(' and '))
return plus_defs
class Ontology(object):
def __init__(self, filename='data/go.obo', with_rels=False):
self.ont = self.load(filename, with_rels)
self.ic = None
self.ic_norm = 0.0
def has_term(self, term_id):
return term_id in self.ont
def get_term(self, term_id):
if self.has_term(term_id):
return self.ont[term_id]
return None
def calculate_ic(self, annots):
cnt = Counter()
for x in annots:
cnt.update(x)
self.ic = {}
for go_id, n in cnt.items():
parents = self.get_parents(go_id)
if len(parents) == 0:
min_n = n
else:
min_n = min([cnt[x] for x in parents])
self.ic[go_id] = math.log(min_n / n, 2)
self.ic_norm = max(self.ic_norm, self.ic[go_id])
def get_ic(self, go_id):
if self.ic is None:
raise Exception('Not yet calculated')
if go_id not in self.ic:
return 0.0
return self.ic[go_id]
def get_norm_ic(self, go_id):
return self.get_ic(go_id) / self.ic_norm
def load(self, filename, with_rels):
ont = dict()
obj = None
with open(filename, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
if line == '[Term]':
if obj is not None:
ont[obj['id']] = obj
obj = dict()
obj['is_a'] = list()
obj['part_of'] = list()
obj['regulates'] = list()
obj['alt_ids'] = list()
obj['is_obsolete'] = False
continue
elif line == '[Typedef]':
if obj is not None:
ont[obj['id']] = obj
obj = None
else:
if obj is None:
continue
l = line.split(": ")
if l[0] == 'id':
obj['id'] = l[1]
elif l[0] == 'alt_id':
obj['alt_ids'].append(l[1])
elif l[0] == 'namespace':
obj['namespace'] = l[1]
elif l[0] == 'is_a':
obj['is_a'].append(l[1].split(' ! ')[0])
elif with_rels and l[0] == 'relationship':
it = l[1].split()
# add all types of relationships
obj['is_a'].append(it[1])
elif l[0] == 'name':
obj['name'] = l[1]
elif l[0] == 'is_obsolete' and l[1] == 'true':
obj['is_obsolete'] = True
if obj is not None:
ont[obj['id']] = obj
for term_id in list(ont.keys()):
for t_id in ont[term_id]['alt_ids']:
ont[t_id] = ont[term_id]
if ont[term_id]['is_obsolete']:
del ont[term_id]
for term_id, val in ont.items():
if 'children' not in val:
val['children'] = set()
for p_id in val['is_a']:
if p_id in ont:
if 'children' not in ont[p_id]:
ont[p_id]['children'] = set()
ont[p_id]['children'].add(term_id)
return ont
def get_anchestors(self, term_id):
if term_id not in self.ont:
return set()
term_set = set()
q = deque()
q.append(term_id)
while (len(q) > 0):
t_id = q.popleft()
if t_id not in term_set:
term_set.add(t_id)
for parent_id in self.ont[t_id]['is_a']:
if parent_id in self.ont:
q.append(parent_id)
return term_set
def get_prop_terms(self, terms):
prop_terms = set()
for term_id in terms:
prop_terms |= self.get_anchestors(term_id)
return prop_terms
def get_parents(self, term_id):
if term_id not in self.ont:
return set()
term_set = set()
for parent_id in self.ont[term_id]['is_a']:
if parent_id in self.ont:
term_set.add(parent_id)
return term_set
def get_namespace_terms(self, namespace):
terms = set()
for go_id, obj in self.ont.items():
if obj['namespace'] == namespace:
terms.add(go_id)
return terms
def get_namespace(self, term_id):
return self.ont[term_id]['namespace']
def get_term_set(self, term_id):
if term_id not in self.ont:
return set()
term_set = set()
q = deque()
q.append(term_id)
while len(q) > 0:
t_id = q.popleft()
if t_id not in term_set:
term_set.add(t_id)
for ch_id in self.ont[t_id]['children']:
q.append(ch_id)
return term_set
def read_fasta(filename):
seqs = list()
info = list()
seq = ''
inf = ''
with open(filename, 'r') as f:
for line in f:
line = line.strip()
if line.startswith('>'):
if seq != '':
seqs.append(seq)
info.append(inf)
seq = ''
inf = line[1:].split()[0]
else:
seq += line
seqs.append(seq)
info.append(inf)
return info, seqs
class DataGenerator(object):
def __init__(self, batch_size, is_sparse=False):
self.batch_size = batch_size
self.is_sparse = is_sparse
def fit(self, inputs, targets=None):
self.start = 0
self.inputs = inputs
self.targets = targets
if isinstance(self.inputs, tuple) or isinstance(self.inputs, list):
self.size = self.inputs[0].shape[0]
else:
self.size = self.inputs.shape[0]
self.has_targets = targets is not None
def __next__(self):
return self.next()
def reset(self):
self.start = 0
def next(self):
if self.start < self.size:
batch_index = np.arange(
self.start, min(self.size, self.start + self.batch_size))
if isinstance(self.inputs, tuple) or isinstance(self.inputs, list):
res_inputs = []
for inp in self.inputs:
if self.is_sparse:
res_inputs.append(
inp[batch_index, :].toarray())
else:
res_inputs.append(inp[batch_index, :])
else:
if self.is_sparse:
res_inputs = self.inputs[batch_index, :].toarray()
else:
res_inputs = self.inputs[batch_index, :]
self.start += self.batch_size
if self.has_targets:
if self.is_sparse:
labels = self.targets[batch_index, :].toarray()
else:
labels = self.targets[batch_index, :]
return (res_inputs, labels)
return res_inputs
else:
self.reset()
return self.next()