| | |
| | |
| | import os |
| | import gzip |
| | import shutil |
| | import argparse |
| |
|
| | import numpy as np |
| |
|
| | from utils.logger import print_log |
| | from utils.file_utils import get_filename, cnt_num_files |
| | from data.format import VOCAB |
| | from data.converter.pdb_to_list_blocks import pdb_to_list_blocks |
| | from data.converter.blocks_to_data import blocks_to_data |
| | from data.mmap_dataset import create_mmap |
| |
|
| |
|
| | def parse(): |
| | parser = argparse.ArgumentParser(description='Process PDB to monomers') |
| | parser.add_argument('--pdb_dir', type=str, required=True, |
| | help='Directory of pdb database') |
| | parser.add_argument('--out_dir', type=str, required=True, |
| | help='Output directory') |
| | return parser.parse_args() |
| | |
| |
|
| | def process_iterator(data_dir): |
| |
|
| | tmp_dir = './tmp' |
| | if not os.path.exists(tmp_dir): |
| | os.makedirs(tmp_dir) |
| |
|
| | file_cnt = 0 |
| | for category in os.listdir(data_dir): |
| | category_dir = os.path.join(data_dir, category) |
| | for pdb_file in os.listdir(category_dir): |
| | file_cnt += 1 |
| | path = os.path.join(category_dir, pdb_file) |
| | tmp_file = os.path.join(tmp_dir, f'{pdb_file}.decompressed') |
| |
|
| | try: |
| | |
| | with gzip.open(path, 'rb') as fin: |
| | with open(tmp_file, 'wb') as fout: |
| | shutil.copyfileobj(fin, fout) |
| | |
| | list_blocks, chains = pdb_to_list_blocks(tmp_file, return_chain_ids=True) |
| | except Exception as e: |
| | print_log(f'Parsing {pdb_file} failed: {e}', level='WARN') |
| | continue |
| |
|
| | for blocks, chain in zip(list_blocks, chains): |
| |
|
| | |
| | filter_blocks, NC_coords = [], [] |
| | for block in blocks: |
| | N_coord, C_coord, CA_coord = None, None, None |
| | for atom in block: |
| | if atom.name == 'N': |
| | N_coord = atom.coordinate |
| | elif atom.name == 'C': |
| | C_coord = atom.coordinate |
| | elif atom.name == 'CA': |
| | CA_coord = atom.coordinate |
| | if N_coord and C_coord and CA_coord: |
| | filter_blocks.append(block) |
| | NC_coords.append(N_coord) |
| | NC_coords.append(C_coord) |
| |
|
| | if len(filter_blocks) == 0: |
| | continue |
| |
|
| | NC_coords = np.array(NC_coords) |
| | pep_bond_len = np.linalg.norm(NC_coords[1::2][:-1] - NC_coords[2::2], axis=-1) |
| | |
| |
|
| | if np.any(pep_bond_len > 1.5): |
| | continue |
| |
|
| | blocks = filter_blocks |
| | item_id = chain + '_' + pdb_file |
| | |
| | num_blocks = len(blocks) |
| | num_units = sum([len(block.units) for block in blocks]) |
| | data = [block.to_tuple() for block in blocks] |
| |
|
| | seq = ''.join([VOCAB.abrv_to_symbol(block.abrv) for block in blocks]) |
| |
|
| | |
| | yield item_id, data, [num_blocks, num_units, chain, seq], file_cnt |
| | |
| | if os.path.exists(tmp_file): |
| | os.remove(tmp_file) |
| |
|
| | shutil.rmtree(tmp_dir) |
| |
|
| | def main(args): |
| | |
| | cnt = cnt_num_files(args.pdb_dir, recursive=True) |
| |
|
| | print_log(f'Processing data from directory: {args.pdb_dir}.') |
| | print_log(f'Number of entries: {cnt}') |
| | create_mmap( |
| | process_iterator(args.pdb_dir), |
| | args.out_dir, cnt) |
| | |
| | print_log('Finished!') |
| |
|
| |
|
| | if __name__ == '__main__': |
| | main(parse()) |