GlandVergil's picture
Upload 597 files
6b89792 verified
# -*- coding: utf-8 -*-
# :noTabs=true:
import os, sys, subprocess, stat
import time as time_module
import signal as signal_module
class NT: # named tuple
def __init__(self, **entries): self.__dict__.update(entries)
def __repr__(self):
r = 'NT: |'
for i in dir(self):
if not i.startswith('__') and not isinstance(getattr(self, i), types.MethodType): r += '{} --> {}, '.format(i, getattr(self, i))
return r[:-2]+'|'
class HPC_Exception(Exception):
def __init__(self, value): self.value = value
def __str__(self): return self.value
def execute(message, command_line, return_='status', until_successes=False, terminate_on_failure=True, silent=False, silence_output=False, tracer=print):
if not silent: tracer(message); tracer(command_line); sys.stdout.flush();
while True:
p = subprocess.Popen(command_line, bufsize=0, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, errors = p.communicate()
output = output + errors
output = output.decode(encoding="utf-8", errors="replace")
exit_code = p.returncode
if exit_code and not (silent or silence_output): tracer(output); sys.stdout.flush();
if exit_code and until_successes: pass # Thats right - redability COUNT!
else: break
tracer( "Error while executing {}: {}\n".format(message, output) )
tracer("Sleeping 60s... then I will retry...")
sys.stdout.flush();
time.sleep(60)
if return_ == 'tuple': return(exit_code, output)
if exit_code and terminate_on_failure:
tracer("\nEncounter error while executing: " + command_line)
if return_==True: return True
else: print("\nEncounter error while executing: " + command_line + '\n' + output); sys.exit(1)
if return_ == 'output': return output
else: return False
def Sleep(time_, message, dict_={}):
''' Fancy sleep function '''
len_ = 0
for i in range(time_, 0, -1):
#print "Waiting for a new revision:%s... Sleeping...%d \r" % (sc.revision, i),
msg = message.format( **dict(dict_, time_left=i) )
print( msg, end='' )
len_ = max(len_, len(msg))
sys.stdout.flush()
time_module.sleep(1)
print( ' '*len_ + '\r', end='' ) # erazing sleep message
# Abstract class for HPC job submission
class HPC_Driver:
def __init__(self, working_dir, config, tracer=lambda x:None, set_daemon_message=lambda x:None):
self.working_dir = working_dir
self.config = config
self.cpu_usage = 0.0 # cummulative cpu usage in hours
self.tracer = tracer
self.set_daemon_message = set_daemon_message
self.cpu_count = self.config['cpu_count'] if type(config) == dict else self.config.getint('DEFAULT', 'cpu_count')
self.jobs = [] # list of all jobs currently running by this driver, Job class is driver depended, could be just int or something more complex
self.install_signal_handler()
def __del__(self):
self.remove_signal_handler()
def execute(self, executable, arguments, working_dir, log_dir=None, name='_no_name_', memory=256, time=24, shell_wrapper=False, block=True):
''' Execute given command line on HPC cluster, must accumulate cpu hours in self.cpu_usage '''
if log_dir==None: log_dir=self.working_dir
if shell_wrapper:
shell_wrapper_sh = os.path.abspath(self.working_dir + '/hpc.{}.shell_wrapper.sh'.format(name))
with file(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
executable, arguments = shell_wrapper_sh, ''
return self.submit_serial_hpc_job(name=name, executable=executable, arguments=arguments, working_dir=working_dir, log_dir=log_dir, jobs_to_queue=1, memory=memory, time=time, block=block, shell_wrapper=shell_wrapper)
@property
def number_of_cpu_per_node(self):
must_be_implemented_in_inherited_classes
@property
def maximum_number_of_mpi_cpu(self):
must_be_implemented_in_inherited_classes
def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job instead!')
must_be_implemented_in_inherited_classes
def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
must_be_implemented_in_inherited_classes
def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, memory=512, time=12, block=True, process_coefficient="1", requested_nodes=1, requested_processes_per_node=1):
''' submit jobs as MPI job
process_coefficient should be string representing fraction of process to launch on each node, for example '3 / 4' will start only 75% of MPI process's on each node
'''
must_be_implemented_in_inherited_classes
def cancel_all_jobs(self):
''' Cancel all HPC jobs known to this driver, use this as signal handler for script termination '''
for j in self.jobs: self.cancel_job(j)
def block_until(self, silent, fn, *args, **kwargs):
'''
**fn must have the driver as the first argument**
example:
def fn(driver):
jobs = list(driver.jobs)
jobs = [job for job in jobs if not driver.complete(job)]
if len(jobs) <= 8:
return False # stops sleeping
return True # continues sleeping
for x in range(100):
hpc_driver.submit_hpc_job(...)
hpc_driver.block_until(False, fn)
'''
while fn(self, *args, **kwargs):
sys.stdout.flush()
time_module.sleep(60)
if not silent:
Sleep(1, '"Waiting for HPC job(s) to finish, sleeping {time_left}s\r')
def wait_until_complete(self, jobs=None, callback=None, silent=False):
''' Helper function, wait until given jobs list is finished, if no argument is given waits until all jobs known by driver is finished '''
jobs = jobs if jobs else self.jobs
while jobs:
for j in jobs[:]:
if self.complete(j): jobs.remove(j)
if jobs:
#total_cpu_queued = sum( [j.jobs_queued for j in jobs] )
#total_cpu_running = sum( [j.cpu_running for j in jobs] )
#self.set_daemon_message("Waiting for HPC job(s) to finish... [{} process(es) in queue, {} process(es) running]".format(total_cpu_queued, total_cpu_running) )
#self.tracer("Waiting for HPC job(s) [{} process(es) in queue, {} process(es) running]... \r".format(total_cpu_queued, total_cpu_running), end='')
#print "Waiting for {} HPC jobs to finish... [{} jobs in queue, {} jobs running]... Sleeping 32s... \r".format(total_cpu_queued, cpu_queued+cpu_running, cpu_running),
self.set_daemon_message("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) )
#self.tracer("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) )
sys.stdout.flush()
if callback: callback()
if silent: time_module.sleep(64*1)
else: Sleep(64, '"Waiting for HPC {n_jobs} job(s) to finish, sleeping {time_left}s \r', dict(n_jobs=len(jobs)))
_signals_ = [signal_module.SIGINT, signal_module.SIGTERM, signal_module.SIGABRT]
def install_signal_handler(self):
def signal_handler(signal_, frame):
self.tracer('Recieved signal:{}... Canceling HPC jobs...'.format(signal_) )
self.cancel_all_jobs()
self.set_daemon_message( 'Remote daemon got terminated with signal:{}'.format(signal_) )
sys.exit(1)
for s in self._signals_: signal_module.signal(s, signal_handler)
def remove_signal_handler(self): # do we really need this???
try:
for s in self._signals_: signal_module.signal(s, signal_module.SIG_DFL)
#print('remove_signal_handler: done!')
except TypeError:
#print('remove_signal_handler: interpreted terminating, skipping remove_signal_handler...')
pass
def cancel_job(self, job_id):
must_be_implemented_in_inherited_classes
def complete(self, job_id):
''' Return job completion status. Return True if job complered and False otherwise
'''
must_be_implemented_in_inherited_classes