| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """Library to run Jackhmmer from Python.""" |
|
|
| from concurrent import futures |
| import glob |
| import logging |
| import os |
| import subprocess |
| from typing import Any, Callable, Mapping, Optional, Sequence |
| from urllib import request |
|
|
| from openfold.data.tools import utils |
|
|
|
|
| class Jackhmmer: |
| """Python wrapper of the Jackhmmer binary.""" |
|
|
| def __init__( |
| self, |
| *, |
| binary_path: str, |
| database_path: str, |
| n_cpu: int = 8, |
| n_iter: int = 1, |
| e_value: float = 0.0001, |
| z_value: Optional[int] = None, |
| get_tblout: bool = False, |
| filter_f1: float = 0.0005, |
| filter_f2: float = 0.00005, |
| filter_f3: float = 0.0000005, |
| incdom_e: Optional[float] = None, |
| dom_e: Optional[float] = None, |
| num_streamed_chunks: Optional[int] = None, |
| streaming_callback: Optional[Callable[[int], None]] = None, |
| ): |
| """Initializes the Python Jackhmmer wrapper. |
| |
| Args: |
| binary_path: The path to the jackhmmer executable. |
| database_path: The path to the jackhmmer database (FASTA format). |
| n_cpu: The number of CPUs to give Jackhmmer. |
| n_iter: The number of Jackhmmer iterations. |
| e_value: The E-value, see Jackhmmer docs for more details. |
| z_value: The Z-value, see Jackhmmer docs for more details. |
| get_tblout: Whether to save tblout string. |
| filter_f1: MSV and biased composition pre-filter, set to >1.0 to turn off. |
| filter_f2: Viterbi pre-filter, set to >1.0 to turn off. |
| filter_f3: Forward pre-filter, set to >1.0 to turn off. |
| incdom_e: Domain e-value criteria for inclusion of domains in MSA/next |
| round. |
| dom_e: Domain e-value criteria for inclusion in tblout. |
| num_streamed_chunks: Number of database chunks to stream over. |
| streaming_callback: Callback function run after each chunk iteration with |
| the iteration number as argument. |
| """ |
| self.binary_path = binary_path |
| self.database_path = database_path |
| self.num_streamed_chunks = num_streamed_chunks |
|
|
| if ( |
| not os.path.exists(self.database_path) |
| and num_streamed_chunks is None |
| ): |
| logging.error("Could not find Jackhmmer database %s", database_path) |
| raise ValueError( |
| f"Could not find Jackhmmer database {database_path}" |
| ) |
|
|
| self.n_cpu = n_cpu |
| self.n_iter = n_iter |
| self.e_value = e_value |
| self.z_value = z_value |
| self.filter_f1 = filter_f1 |
| self.filter_f2 = filter_f2 |
| self.filter_f3 = filter_f3 |
| self.incdom_e = incdom_e |
| self.dom_e = dom_e |
| self.get_tblout = get_tblout |
| self.streaming_callback = streaming_callback |
|
|
| def _query_chunk( |
| self, input_fasta_path: str, database_path: str |
| ) -> Mapping[str, Any]: |
| """Queries the database chunk using Jackhmmer.""" |
| with utils.tmpdir_manager(base_dir="/tmp") as query_tmp_dir: |
| sto_path = os.path.join(query_tmp_dir, "output.sto") |
|
|
| |
| |
| |
| |
| |
| cmd_flags = [ |
| |
| "-o", |
| "/dev/null", |
| "-A", |
| sto_path, |
| "--noali", |
| "--F1", |
| str(self.filter_f1), |
| "--F2", |
| str(self.filter_f2), |
| "--F3", |
| str(self.filter_f3), |
| "--incE", |
| str(self.e_value), |
| |
| "-E", |
| str(self.e_value), |
| "--cpu", |
| str(self.n_cpu), |
| "-N", |
| str(self.n_iter), |
| ] |
| if self.get_tblout: |
| tblout_path = os.path.join(query_tmp_dir, "tblout.txt") |
| cmd_flags.extend(["--tblout", tblout_path]) |
|
|
| if self.z_value: |
| cmd_flags.extend(["-Z", str(self.z_value)]) |
|
|
| if self.dom_e is not None: |
| cmd_flags.extend(["--domE", str(self.dom_e)]) |
|
|
| if self.incdom_e is not None: |
| cmd_flags.extend(["--incdomE", str(self.incdom_e)]) |
|
|
| cmd = ( |
| [self.binary_path] |
| + cmd_flags |
| + [input_fasta_path, database_path] |
| ) |
|
|
| logging.info('Launching subprocess "%s"', " ".join(cmd)) |
| process = subprocess.Popen( |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| ) |
| with utils.timing( |
| f"Jackhmmer ({os.path.basename(database_path)}) query" |
| ): |
| _, stderr = process.communicate() |
| retcode = process.wait() |
|
|
| if retcode: |
| raise RuntimeError( |
| "Jackhmmer failed\nstderr:\n%s\n" % stderr.decode("utf-8") |
| ) |
|
|
| |
| tbl = "" |
| if self.get_tblout: |
| with open(tblout_path) as f: |
| tbl = f.read() |
|
|
| with open(sto_path) as f: |
| sto = f.read() |
|
|
| raw_output = dict( |
| sto=sto, |
| tbl=tbl, |
| stderr=stderr, |
| n_iter=self.n_iter, |
| e_value=self.e_value, |
| ) |
|
|
| return raw_output |
|
|
| def query(self, input_fasta_path: str) -> Sequence[Mapping[str, Any]]: |
| """Queries the database using Jackhmmer.""" |
| if self.num_streamed_chunks is None: |
| return [self._query_chunk(input_fasta_path, self.database_path)] |
|
|
| db_basename = os.path.basename(self.database_path) |
| db_remote_chunk = lambda db_idx: f"{self.database_path}.{db_idx}" |
| db_local_chunk = lambda db_idx: f"/tmp/ramdisk/{db_basename}.{db_idx}" |
|
|
| |
| for f in glob.glob(db_local_chunk("[0-9]*")): |
| try: |
| os.remove(f) |
| except OSError: |
| print(f"OSError while deleting {f}") |
|
|
| |
| with futures.ThreadPoolExecutor(max_workers=2) as executor: |
| chunked_output = [] |
| for i in range(1, self.num_streamed_chunks + 1): |
| |
| if i == 1: |
| future = executor.submit( |
| request.urlretrieve, |
| db_remote_chunk(i), |
| db_local_chunk(i), |
| ) |
| if i < self.num_streamed_chunks: |
| next_future = executor.submit( |
| request.urlretrieve, |
| db_remote_chunk(i + 1), |
| db_local_chunk(i + 1), |
| ) |
|
|
| |
| future.result() |
| chunked_output.append( |
| self._query_chunk(input_fasta_path, db_local_chunk(i)) |
| ) |
|
|
| |
| os.remove(db_local_chunk(i)) |
| future = next_future |
| if self.streaming_callback: |
| self.streaming_callback(i) |
| return chunked_output |
|
|