import json, os, logging, csv, gzip, numpy, pdb from compound import Compound base_path = os.path.split(os.path.realpath(__file__))[0] ### Input Files: # original version of the KEGG compound file OLD_COMPOUND_JSON_FNAME = os.path.join(base_path, './data_cc/equilibrator_compounds.json.gz') # a CSV file with additional names and InChIs (mostly compounds missing from KEGG # and added manually) KEGG_ADDITIONS_TSV_FNAME = os.path.join(base_path, './data_cc/kegg_additions.tsv') ### Files created by this module: # names and InChIs only KEGG_COMPOUND_JSON_FNAME = os.path.join(base_path, './data_cc/kegg_compounds.json.gz') # names, InChIs and pKa data DEFAULT_CACHE_FNAME = os.path.join(base_path, './data_cc/compounds.json.gz') class CompoundEncoder(json.JSONEncoder): def default(self, obj): if (isinstance(obj, Compound)): return obj.to_json_dict() return json.JSONEncoder.default(self, obj) class Singleton(type): def __init__(cls,name,bases,dic): super(Singleton,cls).__init__(name,bases,dic) cls.instance=None def __call__(cls,*args,**kw): if cls.instance is None: cls.instance=super(Singleton,cls).__call__(*args,**kw) return cls.instance class CompoundCacher(object, metaclass=Singleton): """ CompoundCacher is a singleton that handles caching of Compound objects for the component-contribution package. The Compounds are retrieved by their ID (which is the KEGG ID in most cases). The first time a Compound is requested, it is obtained from the relevant database and a Compound object is created (this takes a while because it usually involves internet communication and then invoking the ChemAxon plugin for calculating the pKa values for that structure). Any further request for the same Compound ID will draw the object from the cache. When the method dump() is called, all cached data is written to a file that will be loaded in future python sessions. """ def __init__(self, cache_fname=None): self.cache_fname = cache_fname if self.cache_fname is None: self.cache_fname = DEFAULT_CACHE_FNAME compounds = json.load(gzip.open(KEGG_COMPOUND_JSON_FNAME, 'r')) self.compound_id2inchi = { d['compound_id']: d['inchi'] for d in compounds } self.need_to_update_cache_file = False self.load() def get_all_compound_ids(self): return sorted(self.compound_id2inchi.keys()) def load(self): # parse the JSON cache file and store in a dictionary 'compound_dict' self.compound_dict = {} self.compound_ids = [] if os.path.exists(self.cache_fname): for d in json.load(gzip.open(self.cache_fname, 'r')): self.compound_ids.append(d['compound_id']) self.compound_dict[d['compound_id']] = Compound.from_json_dict(d) def dump(self): if self.need_to_update_cache_file: fp = gzip.open(self.cache_fname, 'w') data = sorted(list(self.compound_dict.values()), key=lambda d:d.compound_id) dict_data = [x.to_json_dict() for x in data] json.dump(dict_data, fp, cls=CompoundEncoder, sort_keys=True, indent=4, separators=(',', ': ')) fp.close() self.need_to_update_cache_file = False def get_compound(self, compound_id, kegg_additions_cids=None): if compound_id not in self.compound_dict: logging.debug('Cache miss: %s' % str(compound_id)) inchi = self.compound_id2inchi[compound_id] comp = Compound.from_inchi('KEGG', compound_id, inchi) self.add(comp) #if a compound id is in the kegg_additions.tsv #remove the one in cache, and replace it with new one else: if kegg_additions_cids is not None: if compound_id in kegg_additions_cids: self.remove(compound_id) logging.debug('Cache update: %s' % str(compound_id)) inchi = self.compound_id2inchi[compound_id] comp = Compound.from_inchi('KEGG', compound_id, inchi) self.add(comp) logging.debug('Cache hit: %s' % str(compound_id)) return self.compound_dict[compound_id] def remove(self, compound_id): if compound_id in self.compound_dict: del self.compound_dict[compound_id] else: logging.debug('%s is not cached, cannot remove it' % str(compound_id)) def add(self, comp): self.compound_dict[comp.compound_id] = comp self.need_to_update_cache_file = True def get_element_matrix(self, compound_ids): if type(compound_ids) == str: compound_ids = [compound_ids] # gather the "atom bags" of all compounds in a list 'atom_bag_list' elements = set() atom_bag_list = [] for compound_id in compound_ids: comp = self.get_compound(compound_id) atom_bag = comp.atom_bag if atom_bag is not None: elements = elements.union(list(atom_bag.keys())) atom_bag_list.append(atom_bag) elements.discard('H') # don't balance H (it's enough to balance e-) elements = sorted(elements) # create the elemental matrix, where each row is a compound and each # column is an element (or e-) Ematrix = numpy.matrix(numpy.zeros((len(atom_bag_list), len(elements)))) for i, atom_bag in enumerate(atom_bag_list): if atom_bag is None: Ematrix[i, :] = numpy.nan else: for j, elem in enumerate(elements): Ematrix[i, j] = atom_bag.get(elem, 0) return elements, Ematrix ############################################################################### @staticmethod def RebuildCompoundJSON(): kegg_dict = {} for d in json.load(gzip.open(OLD_COMPOUND_JSON_FNAME, 'r')): cid = d['CID'] kegg_dict[cid] = {'compound_id': cid, 'name': d['name'], 'names': d['names'], 'inchi': d['InChI']} # override some of the compounds or add new ones with 'fake' IDs, # i.e. C80000 or higher. kegg_additions_cids = [] for d in csv.DictReader(open(KEGG_ADDITIONS_TSV_FNAME, 'r'), delimiter='\t'): cid = 'C%05d' % int(d['cid']) kegg_additions_cids.append(cid) kegg_dict[cid] = {'compound_id': cid, 'name': d['name'], 'names': [d['name']], 'inchi': d['inchi']} compound_json = [kegg_dict[compound_id] for compound_id in sorted(kegg_dict.keys())] new_json = gzip.open(KEGG_COMPOUND_JSON_FNAME, 'w') json.dump(compound_json, new_json, sort_keys=True, indent=4) new_json.close() return kegg_additions_cids ############################################################################### @staticmethod def BuildCache(start_from_scratch=False, kegg_additions_cids=None): if start_from_scratch and os.path.exists(DEFAULT_CACHE_FNAME): os.remove(DEFAULT_CACHE_FNAME) ccache = CompoundCacher(cache_fname=DEFAULT_CACHE_FNAME) i = 0 for compound_id in ccache.get_all_compound_ids(): logging.debug('Caching %s' % compound_id) comp = ccache.get_compound(compound_id, kegg_additions_cids=kegg_additions_cids) logging.debug(str(comp)) i += 1 if i % 100 == 0: logging.debug('Dumping Cache ...') ccache.dump() ccache.dump() ############################################################################### if __name__ == '__main__': logger = logging.getLogger('') #logger.setLevel(logging.WARNING) logger.setLevel(logging.DEBUG) kegg_additions_cids = CompoundCacher.RebuildCompoundJSON() CompoundCacher.BuildCache(start_from_scratch=False, kegg_additions_cids=kegg_additions_cids)