import pandas as pd import pulp import pdb import os import json from rdkit import Chem # pulp_solver = pulp.solvers.CPLEX_CMD(path=None, keepFiles=0, mip=1, msg=1, # options=['mip tolerances mipgap 0', 'mip tolerances absmipgap 0', # 'mip tolerances integrality 0', 'simplex tolerances optimality 1E-9', # 'simplex tolerances feasibility 1E-9',], timelimit=1200) def count_substructures(radius,molecule): """Helper function for get the information of molecular signature of a metabolite. The relaxed signature requires the number of each substructure to construct a matrix for each molecule. Parameters ---------- radius : int the radius is bond-distance that defines how many neighbor atoms should be considered in a reaction center. molecule : Molecule a molecule object create by RDkit (e.g. Chem.MolFromInchi(inchi_code) or Chem.MolToSmiles(smiles_code)) Returns ------- dict dictionary of molecular signature for a molecule, {smiles: molecular_signature} """ m = molecule smi_count = dict() atomList = [atom for atom in m.GetAtoms()] for i in range(len(atomList)): env = Chem.FindAtomEnvironmentOfRadiusN(m,radius,i) atoms=set() for bidx in env: atoms.add(m.GetBondWithIdx(bidx).GetBeginAtomIdx()) atoms.add(m.GetBondWithIdx(bidx).GetEndAtomIdx()) # only one atom is in this environment, such as O in H2O if len(atoms) == 0: atoms = {i} smi = Chem.MolFragmentToSmiles(m,atomsToUse=list(atoms), bondsToUse=env,canonical=True) if smi in smi_count: smi_count[smi] = smi_count[smi] + 1 else: smi_count[smi] = 1 return smi_count def novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction): """apply reaction rules generated from a more relaxed manner to search for reaction rules that are able to fill the gap between the source and sink metabolites. - rePrime procedure is more similar to a morgan fingerprints - the relaxed rule is generated from substructures without considering the bond that connect the atoms at the edge of the substructure to the rest of the molecules Parameters ---------- exchange_mets : dict overall stoichiometry of source and sink metabolites, {met: stoic,...} This is a important input for novoStoic to run correctly because the method requires that overall moieties are balanced. novel_mets : list list of novel metabolites that are not in the database (novoStoic/data/ metanetx_universal_model_kegg_metacyc_rhea_seed_reactome.json) filtered_rules : list list of rules that are filtered by the user (based on expert knowldedge) to reduce the running time of the novoStoic search process project : string a path to store the tmp information of result from running novoStoic iterations : int the number of iterations of searching for alternative solutions data_dir : type Description of parameter `data_dir`. Returns ------- None all the outputs are saved in the project folder. """ if not os.path.exists(project): os.makedirs(project) # the maximum flux of a reaction M = 2 data_dir = './data' # read csv files with molecular signatures and reaction rules molecular_signature = json.load(open( os.path.join(data_dir, 'decompose_vector_ac.json'))) molsigs = pd.DataFrame.from_dict(molecular_signature).fillna(0) rules = pd.read_csv( os.path.join(data_dir, "relaxed_rule_noduplic.csv"), index_col=0 ) ###### sets ############ moiety_index = rules.index.tolist() # moiety sets rules_index = rules.columns.values.tolist() print("Number of rules used in this search:",len(rules_index)) exchange_index = exchange_mets.keys() ###### parameters ###### # T(m,r) contains atom stoichiometry for each rule T = rules.to_dict(orient="index") # C(m,i) contains moiety cardinality for each metabolite C = molsigs.to_dict(orient="index") for m in moiety_index: C[m]["C00080"] = 0 C[m]["C00282"] = 0 # add metabolites that are not present in current database for met in novel_mets: # molsigs_product = pd.read_csv( # project + "/relaxed_molsig_" + met + "_1.csv", index_col=0 # ) # molsigs_product_dict = molsigs_product.to_dict(orient="index") smiles = novel_mets[met] mol = Chem.MolFromSmiles(smiles) mol = Chem.RemoveHs(mol) molsigs_product_dict = count_substructures(1,mol) for m in moiety_index: if m in molsigs_product_dict.keys(): C[m][met] = molsigs_product_dict[m] else: C[m][met] = 0 ###### variables ###### v_rule = pulp.LpVariable.dicts( "v_rule", rules_index, lowBound=-M, upBound=M, cat="Integer" ) v_rule_obj = pulp.LpVariable.dicts( "v_rule_obj", rules_index, lowBound=0, upBound=M, cat="Continuous" ) v_EX = pulp.LpVariable.dicts( "v_EX", exchange_index, lowBound=-M, upBound=M, cat="Continuous" ) y_rule = pulp.LpVariable.dicts( "y", rules_index, lowBound=0, upBound=1, cat="Binary" ) # create MILP problem lp_prob = pulp.LpProblem("novoStoic", pulp.LpMinimize) ####### objective function #### lp_prob += pulp.lpSum([v_rule_obj[j] for j in rules_index]) ####### constraints #### # constraint 1: moiety change balance for m in moiety_index: lp_prob += ( pulp.lpSum([T[m][r] * v_rule[r] for r in rules_index if T[m][r] !=0]) == pulp.lpSum([C[m][i] * v_EX[i] for i in exchange_index if C[m][i] != 0]), "moiety_balance_" + str(moiety_index.index(m)), ) # constraint 2: constraint for exchange reactions for i, stoic in exchange_mets.items(): lp_prob += v_EX[i] == stoic, "exchange" + i # constraint 3: control the number of rules direction_df = pd.read_csv( os.path.join(data_dir, "direction.csv"), index_col=0 ) direction_df.index = direction_df['reaction'] # direction: 0-reversible, 1-backward, 2-forward direction = direction_df['direction'].to_dict() if use_direction: soln_file = os.path.join(project, "solution_use_direction.txt") for j in rules_index: if direction[j] == 0: lp_prob += v_rule[j] >= y_rule[j] * -M, "cons1_%s" % j lp_prob += v_rule[j] <= y_rule[j] * M, "cons2_%s" % j if direction[j] == 1: lp_prob += v_rule[j] >= y_rule[j] * -M, "cons1_%s" % j lp_prob += v_rule[j] <= 0, "cons2_%s" % j if direction[j] == 2: lp_prob += v_rule[j] >= 0, "cons1_%s" % j lp_prob += v_rule[j] <= y_rule[j] * M, "cons2_%s" % j else: soln_file = os.path.join(project, "solution_no_direction.txt") for j in rules_index: lp_prob += v_rule[j] >= y_rule[j] * -M, "cons1_%s" % j lp_prob += v_rule[j] <= y_rule[j] * M, "cons2_%s" % j for j in rules_index: lp_prob += v_rule_obj[j] >= v_rule[j] lp_prob += v_rule_obj[j] >= -v_rule[j] # constraint 5: customized constraints # the number of steps of the pathway lp_prob += pulp.lpSum([v_rule_obj[j] for j in rules_index]) == 2 ### solve integer_cuts(lp_prob,pulp_solver,iterations,rules_index,y_rule,v_rule,soln_file,direction) def integer_cuts(lp_prob,pulp_solver,iterations,rules_index,y_rule,v_rule,soln_file,direction): """add integer cut constraints to a mixed-integer linear programming problem (MILP). The aim of such constraints is to find alternative solutions by adding constraints to exclude the already explored solutions. Reference: Optimization Methods in Metabolic Networks By Costas D. Maranas, Ali R. Zomorrodi, Chapter 4.2.2 Finding alternative optimal integer solutions Returns ------- type Description of returned object. """ for sol_num in range(1, iterations + 1): integer_cut_rules = [] # optinal output: lp file for debug lp_prob.writeLP('./test.lp') # if pulp_solver = "SCIP": # status, values = pulp_solver.solve(lp_prob) lp_prob.solve(pulp_solver) # pulp_solver.solve(lp_prob) print("Status:", pulp.LpStatus[lp_prob.status]) if pulp.LpStatus[lp_prob.status] != 'Optimal': break print('-----------rules--------------') with open(soln_file,'a') as f: f.write('iteration,' + str(sol_num)) f.write('\n') for r in rules_index: if (v_rule[r].varValue >= 0.1 or v_rule[r].varValue <=-0.1): dG_info = '' if (v_rule[r].varValue > 0 and direction[r] == 1) or (v_rule[r].varValue < 0 and direction[r] == 2): # print("##### Found ####: " + str(r)) # with open(soln_file,'a') as f: # f.write('##### Found ####: ' + str(r)) # f.write('\n') dG_info = ' * Thermodynamically infeasible' print("##### Found ####: " + str(r) + dG_info) integer_cut_rules.append(r) print(r,v_rule[r].varValue) with open(soln_file,'a') as f: f.write(r + ',' + str(v_rule[r].varValue) + dG_info) f.write('\n') length = len(integer_cut_rules) - 1 lp_prob += ( pulp.lpSum([y_rule[r] for r in integer_cut_rules]) <= length, "integer_cut_" + str(sol_num), ) def test_bdo(): exchange_mets = { 'C00091': -1, # Succinyl-CoA 'C00004': -4, # NADH 'C00003': 4, # NAD+ 'C00010': 1, # coa 'C00001':1, # h2O '14bdo': 1, } novel_mets = { '14bdo': 'OCCCCO' } iterations = 50 project = './novoStoic_result' # path_to_cplex = '/Users/linuswang/Applications/IBM/ILOG/CPLEX_Studio1261/cplex/bin/x86-64_osx/cplex' # pulp_solver = pulp.CPLEX_CMD(path=path_to_cplex,keepFiles=0, mip=1, msg=1) pulp_solver = pulp.CPLEX_CMD(path=None,keepFiles=0, mip=1, msg=1) # pulp_solver = pulp.solvers.GUROBI_CMD() # pulp_solver = pulp.solvers.GLPK_CMD() use_direction=True novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction) use_direction=False novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction) def test_isovalarate(): exchange_mets = { 'C00141': -1, # 2-keto isovalarate 'C00004': -1, # NADH 'C00003': 1, # NAD+ "C14710": 1, # isobutanol C4H10O 'C00011': 1, # co2 } novel_mets = {} iterations = 50 project = './novoStoic_isovalarate' # path_to_cplex = '/Users/linuswang/Applications/IBM/ILOG/CPLEX_Studio1261/cplex/bin/x86-64_osx/cplex' # pulp_solver = pulp.CPLEX_CMD(path=path_to_cplex,keepFiles=0, mip=1, msg=1) pulp_solver = pulp.CPLEX_CMD(path=None,keepFiles=0, mip=1, msg=1) # pulp_solver = pulp.solvers.GUROBI_CMD() # pulp_solver = pulp.GLPK_CMD() # use_direction=True # novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction) use_direction=False novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction) if __name__ == '__main__': test_isovalarate()