Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import json | |
| from typing import Optional, Tuple, List | |
| from dataclasses import dataclass | |
| class RelaxTask: | |
| in_path: str | |
| current_path: str | |
| info: dict | |
| status: str | |
| flexible_residue_first: Optional[Tuple] = None | |
| flexible_residue_last: Optional[Tuple] = None | |
| def get_in_path_with_tag(self, tag): | |
| name, ext = os.path.splitext(self.in_path) | |
| new_path = f'{name}_{tag}{ext}' | |
| return new_path | |
| def set_current_path_tag(self, tag): | |
| new_path = self.get_in_path_with_tag(tag) | |
| self.current_path = new_path | |
| return new_path | |
| def check_current_path_exists(self): | |
| ok = os.path.exists(self.current_path) | |
| if not ok: | |
| self.mark_failure() | |
| if os.path.getsize(self.current_path) == 0: | |
| ok = False | |
| self.mark_failure() | |
| os.unlink(self.current_path) | |
| return ok | |
| def update_if_finished(self, tag): | |
| out_path = self.get_in_path_with_tag(tag) | |
| if os.path.exists(out_path) and os.path.getsize(out_path) > 0: | |
| # print('Already finished', out_path) | |
| self.set_current_path_tag(tag) | |
| self.mark_success() | |
| return True | |
| return False | |
| def can_proceed(self): | |
| self.check_current_path_exists() | |
| return self.status != 'failed' | |
| def mark_success(self): | |
| self.status = 'success' | |
| def mark_failure(self): | |
| self.status = 'failed' | |
| class TaskScanner: | |
| def __init__(self, root, final_postfix=None): | |
| super().__init__() | |
| self.root = root | |
| self.visited = set() | |
| self.final_postfix = final_postfix | |
| def _get_metadata(self, fpath): | |
| json_path = os.path.join( | |
| os.path.dirname(os.path.dirname(fpath)), | |
| 'metadata.json' | |
| ) | |
| tag_name = os.path.basename(os.path.dirname(fpath)) | |
| try: | |
| with open(json_path, 'r') as f: | |
| metadata = json.load(f) | |
| for item in metadata['items']: | |
| if item['tag'] == tag_name: | |
| return item | |
| except (json.JSONDecodeError, FileNotFoundError) as e: | |
| return None | |
| return None | |
| def scan(self) -> List[RelaxTask]: | |
| tasks = [] | |
| input_fname_pattern = '(^\d+\.pdb$|^REF\d\.pdb$)' | |
| for parent, _, files in os.walk(self.root): | |
| for fname in files: | |
| fpath = os.path.join(parent, fname) | |
| if not re.match(input_fname_pattern, fname): | |
| continue | |
| if os.path.getsize(fpath) == 0: | |
| continue | |
| if fpath in self.visited: | |
| continue | |
| # If finished | |
| if self.final_postfix is not None: | |
| fpath_name, fpath_ext = os.path.splitext(fpath) | |
| fpath_final = f"{fpath_name}_{self.final_postfix}{fpath_ext}" | |
| if os.path.exists(fpath_final): | |
| continue | |
| # Get metadata | |
| info = self._get_metadata(fpath) | |
| if info is None: | |
| continue | |
| tasks.append(RelaxTask( | |
| in_path = fpath, | |
| current_path = fpath, | |
| info = info, | |
| status = 'created', | |
| flexible_residue_first = info.get('residue_first', None), | |
| flexible_residue_last = info.get('residue_last', None), | |
| )) | |
| self.visited.add(fpath) | |
| return tasks | |