GlandVergil's picture
Upload 693 files
b14983e verified
raw
history blame
7.24 kB
# -*- coding: utf-8 -*-
# :noTabs=true:
import os, sys, time, collections, math
import stat as stat_module
try:
from .base import *
except ImportError: # workaround for B2 back-end's
import imp
imp.load_source(__name__, '/'.join(__file__.split('/')[:-1]) + '/base.py') # A bit of Python magic here, what we trying to say is this: from base import *, but path to base is calculated from our source location # from base import HPC_Driver, execute, NT
_T_slurm_array_job_template_ = '''\
#!/bin/bash
#
#SBATCH --job-name={name}
#SBATCH --output={log_dir}/.hpc.%x.%a.output
#
#SBATCH --time={time}:00
#SBATCH --mem-per-cpu={memory}M
#SBATCH --chdir={working_dir}
#
#SBATCH --array=1-{jobs_to_queue}
srun {executable} {arguments}
'''
_T_slurm_mpi_job_template_ = '''\
#!/bin/bash
#
#SBATCH --job-name={name}
#SBATCH --output={log_dir}/.hpc.%x.output
#
#SBATCH --time={time}:00
#SBATCH --mem-per-cpu={memory}M
#SBATCH --chdir={working_dir}
#
#SBATCH --ntasks={ntasks}
mpirun {executable} {arguments}
'''
class Slurm_HPC_Driver(HPC_Driver):
def head_node_execute(self, message, command_line, *args, **kwargs):
head_node = self.config['slurm'].get('head_node')
command_line, host = (f"ssh {head_node} cd `pwd` '&& {command_line}'", head_node) if head_node else (command_line, 'localhost')
return execute(f'Executiong on {host}: {message}' if message else '', command_line, *args, **kwargs)
# NodeGroup = collections.namedtuple('NodeGroup', 'nodes cores')
# @property
# def mpi_topology(self):
# ''' return list of NodeGroup's
# '''
# pass
# @property
# def number_of_cpu_per_node(self): return int( self.config['condor']['mpi_cpu_per_node'] )
# @property
# def maximum_number_of_mpi_cpu(self):
# return self.number_of_cpu_per_node * int( self.config['condor']['mpi_maximum_number_of_nodes'] )
# def complete(self, condor_job_id):
# ''' Return job completion status. Note that single hpc_job may contatin inner list of individual HPC jobs, True should be return if they all run in to completion.
# '''
# execute('Releasing condor jobs...', 'condor_release $USER', return_='tuple')
# s = execute('', 'condor_q $USER | grep $USER | grep {}'.format(condor_job_id), return_='output', terminate_on_failure=False).replace(' ', '').replace('\n', '')
# if s: return False
# # #setDaemonStatusAndPing('[Job #%s] Running... %s condor job(s) in queue...' % (self.id, len(s.split('\n') ) ) )
# # n_jobs = len(s.split('\n'))
# # s, o = execute('', 'condor_userprio -all | grep $USER@', return_='tuple')
# # if s == 0:
# # jobs_running = o.split()
# # jobs_running = 'XX' if len(jobs_running) < 4 else jobs_running[4]
# # self.set_daemon_message("Waiting for condor to finish HPC jobs... [{} jobs in HPC-Queue, {} CPU's used]".format(n_jobs, jobs_running) )
# # print "{} condor jobs in queue... Sleeping 32s... \r".format(n_jobs),
# # sys.stdout.flush()
# # time.sleep(32)
# else:
# #self.tracer('Waiting for condor to finish the jobs... DONE')
# self.jobs.remove(condor_job_id)
# self.cpu_usage += self.get_condor_accumulated_usage()
# return True # jobs already finished, we return empty list to prevent double counting of cpu_usage
def complete(self, slurm_job_id):
''' Return True if job with given id is complete
'''
s = self.head_node_execute('', f'squeue -j {slurm_job_id} --noheader', return_='output', terminate_on_failure=False, silent=True)
if s: return False
else:
#self.tracer('Waiting for condor to finish the jobs... DONE')
self.jobs.remove(slurm_job_id)
return True # jobs already finished, we return empty list to prevent double counting of cpu_usage
def cancel_job(self, slurm_job_id):
self.head_node_execute(f'Slurm_HPC_Driver.canceling job {slurm_job_id}...', f'scancel {slurm_job_id}', terminate_on_failure=False)
# def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
# print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job instead!')
# return self.submit_serial_hpc_job(name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory, time, block, shell_wrapper)
def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
arguments = arguments.format(process='%a') # %a is SLURM array index
time = int( math.ceil(time*60) )
if shell_wrapper:
shell_wrapper_sh = os.path.abspath(self.working_dir + f'/hpc.{name}.shell_wrapper.sh')
with open(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
executable, arguments = shell_wrapper_sh, ''
slurm_file = working_dir + f'/.hpc.{name}.slurm'
with open(slurm_file, 'w') as f: f.write( _T_slurm_array_job_template_.format( **vars() ) )
slurm_job_id = self.head_node_execute('Submitting SLURM array job...', f'cd {self.working_dir} && sbatch {slurm_file}',
tracer=self.tracer, return_='output'
).split()[-1] # expecting something like `Submitted batch job 6122` in output
self.jobs.append(slurm_job_id)
if block:
self.wait_until_complete( [slurm_job_id] )
return None
else: return slurm_job_id
def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, ntasks, memory=512, time=12, block=True, shell_wrapper=False):
''' submit jobs as MPI job
'''
arguments = arguments.format(process='0')
time = int( math.ceil(time*60) )
if shell_wrapper:
shell_wrapper_sh = os.path.abspath(self.working_dir + f'/hpc.{name}.shell_wrapper.sh')
with open(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
executable, arguments = shell_wrapper_sh, ''
slurm_file = working_dir + f'/.hpc.{name}.slurm'
with open(slurm_file, 'w') as f: f.write( _T_slurm_mpi_job_template_.format( **vars() ) )
slurm_job_id = self.head_node_execute('Submitting SLURM mpi job...', f'cd {self.working_dir} && sbatch {slurm_file}',
tracer=self.tracer, return_='output'
).split()[-1] # expecting something like `Submitted batch job 6122` in output
self.jobs.append(slurm_job_id)
if block:
self.wait_until_complete( [slurm_job_id] )
return None
else: return slurm_job_id