Spaces:
Runtime error
Runtime error
File size: 7,390 Bytes
6b89792 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# -*- coding: utf-8 -*-
# :noTabs=true:
import time as time_module
import codecs
import signal
import os, sys
try:
from .base import *
except ImportError: # workaround for B2 back-end's
import imp
imp.load_source(__name__, '/'.join(__file__.split('/')[:-1]) + '/base.py') # A bit of Python magic here, what we trying to say is this: from base import *, but path to base is calculated from our source location # from base import HPC_Driver, execute, NT
class MultiCore_HPC_Driver(HPC_Driver):
class JobID:
def __init__(self, pids=None):
self.pids = pids if pids else []
def __bool__(self): return bool(self.pids)
def __len__(self): return len(self.pids)
def add_pid(self, pid): self.pids.append(pid)
def remove_completed_pids(self):
for pid in self.pids[:]:
try:
r = os.waitpid(pid, os.WNOHANG)
if r == (pid, 0): self.pids.remove(pid) # process have ended without error
elif r[0] == pid : # process ended but with error, special case we will have to wait for all process to terminate and call system exit.
#self.cancel_job()
#sys.exit(1)
self.pids.remove(pid)
print('ERROR: Some of the HPC jobs terminated abnormally! Please see HPC logs for details.')
except ChildProcessError: self.pids.remove(pid)
def cancel(self):
for pid in self.pids:
try:
os.killpg(os.getpgid(pid), signal.SIGKILL)
except ChildProcessError: pass
self.pids = []
def __init__(self, *args, **kwds):
HPC_Driver.__init__(self, *args, **kwds)
#print(f'MultiCore_HPC_Driver: cpu_count: {self.cpu_count}')
def remove_completed_jobs(self):
for job in self.jobs[:]: # Need to make a copy so we don't modify a list we're iterating over
job.remove_completed_pids()
if not job: self.jobs.remove(job)
@property
def process_count(self):
''' return number of processes that currently ran by this driver instance
'''
return sum( map(len, self.jobs) )
def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job instead!')
return self.submit_serial_hpc_job(name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory, time, block, shell_wrapper)
def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
cpu_usage = -time_module.time()/60./60.
if shell_wrapper:
shell_wrapper_sh = os.path.abspath(self.working_dir + f'/hpc.{name}.shell_wrapper.sh')
with open(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
executable, arguments = shell_wrapper_sh, ''
def mfork():
''' Check if number of child process is below cpu_count. And if it is - fork the new pocees and return its pid.
'''
while self.process_count >= self.cpu_count:
self.remove_completed_jobs()
if self.process_count >= self.cpu_count: time_module.sleep(.5)
sys.stdout.flush()
pid = os.fork()
# appending at caller level insted if pid: self.jobs.append(pid) # We are parent!
return pid
current_job = self.JobID()
process = 0
for i in range(jobs_to_queue):
pid = mfork()
if not pid: # we are child process
command_line = 'cd {} && {} {}'.format(working_dir, executable, arguments.format(process=process) )
exit_code, log = execute('Running job {}.{}...'.format(name, i), command_line, tracer=self.tracer, return_='tuple')
with codecs.open(log_dir+'/.hpc.{name}.{i:02d}.log'.format(**vars()), 'w', encoding='utf-8', errors='replace') as f:
f.write(command_line+'\n'+log)
if exit_code:
error_report = f'\n\n{command_line}\nERROR: PROCESS {name}.{i:02d} TERMINATED WITH NON-ZERO-EXIT-CODE {exit_code}!\n'
f.write(error_report)
print(log, error_report)
sys.exit(0)
else: # we are parent!
current_job.add_pid(pid)
# Need to potentially re-add to list, as remove_completed_jobs() might trim it.
if current_job not in self.jobs: self.jobs.append(current_job)
process += 1
if block:
#for p in all_queued_jobs: os.waitpid(p, 0) # waiting for all child process to termintate...
self.wait_until_complete(current_job)
self.remove_completed_jobs()
cpu_usage += time_module.time()/60./60.
self.cpu_usage += cpu_usage * jobs_to_queue # approximation...
current_job = self.JobID()
return current_job
@property
def number_of_cpu_per_node(self): return self.cpu_count
@property
def maximum_number_of_mpi_cpu(self): return self.cpu_count
def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, memory=512, time=12, block=True, process_coefficient="1", requested_nodes=1, requested_processes_per_node=1):
if requested_nodes > 1:
print( "WARNING: " + str( requested_nodes ) + " nodes were requested, but we're running locally, so only 1 node will be used." )
if requested_processes_per_node > self.cpu_count:
print( "WARNING: " + str(requested_processes_per_node) + " processes were requested, but I only have " + str(self.cpu_count) + " CPUs. Will launch " + str(self.cpu_count) + " processes." )
actual_processes = min( requested_processes_per_node, self.cpu_count )
cpu_usage = -time_module.time()/60./60.
arguments = arguments.format(process=0)
command_line = f'cd {working_dir} && mpirun -np {actual_processes} {executable} {arguments}'
log = execute(f'Running job {name}...', command_line, tracer=self.tracer, return_='output')
with codecs.open(log_dir+'/.hpc.{name}.log'.format(**vars()), 'w', encoding='utf-8', errors='replace') as f: f.write(command_line+'\n'+log)
cpu_usage += time_module.time()/60./60.
self.cpu_usage += cpu_usage * actual_processes # approximation...
# return None - we do not return anything from this version of submit which imply returning None which in turn will be treated as job-id for already finished job
def complete(self, job_id):
''' Return job completion status. Return True if job completed and False otherwise
'''
self.remove_completed_jobs()
return job_id not in self.jobs
def cancel_job(self, job):
job.cancel();
if job in self.jobs:
self.jobs.remove(job)
def __repr__(self):
return 'MultiCore_HPC_Driver<>'
|