# -*- coding: utf-8 -*- # :noTabs=true: import os, sys, time, collections, math import stat as stat_module try: from .base import * except ImportError: # workaround for B2 back-end's import imp imp.load_source(__name__, '/'.join(__file__.split('/')[:-1]) + '/base.py') # A bit of Python magic here, what we trying to say is this: from base import *, but path to base is calculated from our source location # from base import HPC_Driver, execute, NT _T_slurm_array_job_template_ = '''\ #!/bin/bash # #SBATCH --job-name={name} #SBATCH --output={log_dir}/.hpc.%x.%a.output # #SBATCH --time={time}:00 #SBATCH --mem-per-cpu={memory}M #SBATCH --chdir={working_dir} # #SBATCH --array=1-{jobs_to_queue} srun {executable} {arguments} ''' _T_slurm_mpi_job_template_ = '''\ #!/bin/bash # #SBATCH --job-name={name} #SBATCH --output={log_dir}/.hpc.%x.output # #SBATCH --time={time}:00 #SBATCH --mem-per-cpu={memory}M #SBATCH --chdir={working_dir} # #SBATCH --ntasks={ntasks} mpirun {executable} {arguments} ''' class Slurm_HPC_Driver(HPC_Driver): def head_node_execute(self, message, command_line, *args, **kwargs): head_node = self.config['slurm'].get('head_node') command_line, host = (f"ssh {head_node} cd `pwd` '&& {command_line}'", head_node) if head_node else (command_line, 'localhost') return execute(f'Executiong on {host}: {message}' if message else '', command_line, *args, **kwargs) # NodeGroup = collections.namedtuple('NodeGroup', 'nodes cores') # @property # def mpi_topology(self): # ''' return list of NodeGroup's # ''' # pass # @property # def number_of_cpu_per_node(self): return int( self.config['condor']['mpi_cpu_per_node'] ) # @property # def maximum_number_of_mpi_cpu(self): # return self.number_of_cpu_per_node * int( self.config['condor']['mpi_maximum_number_of_nodes'] ) # def complete(self, condor_job_id): # ''' Return job completion status. Note that single hpc_job may contatin inner list of individual HPC jobs, True should be return if they all run in to completion. # ''' # execute('Releasing condor jobs...', 'condor_release $USER', return_='tuple') # s = execute('', 'condor_q $USER | grep $USER | grep {}'.format(condor_job_id), return_='output', terminate_on_failure=False).replace(' ', '').replace('\n', '') # if s: return False # # #setDaemonStatusAndPing('[Job #%s] Running... %s condor job(s) in queue...' % (self.id, len(s.split('\n') ) ) ) # # n_jobs = len(s.split('\n')) # # s, o = execute('', 'condor_userprio -all | grep $USER@', return_='tuple') # # if s == 0: # # jobs_running = o.split() # # jobs_running = 'XX' if len(jobs_running) < 4 else jobs_running[4] # # self.set_daemon_message("Waiting for condor to finish HPC jobs... [{} jobs in HPC-Queue, {} CPU's used]".format(n_jobs, jobs_running) ) # # print "{} condor jobs in queue... Sleeping 32s... \r".format(n_jobs), # # sys.stdout.flush() # # time.sleep(32) # else: # #self.tracer('Waiting for condor to finish the jobs... DONE') # self.jobs.remove(condor_job_id) # self.cpu_usage += self.get_condor_accumulated_usage() # return True # jobs already finished, we return empty list to prevent double counting of cpu_usage def complete(self, slurm_job_id): ''' Return True if job with given id is complete ''' s = self.head_node_execute('', f'squeue -j {slurm_job_id} --noheader', return_='output', terminate_on_failure=False, silent=True) if s: return False else: #self.tracer('Waiting for condor to finish the jobs... DONE') self.jobs.remove(slurm_job_id) return True # jobs already finished, we return empty list to prevent double counting of cpu_usage def cancel_job(self, slurm_job_id): self.head_node_execute(f'Slurm_HPC_Driver.canceling job {slurm_job_id}...', f'scancel {slurm_job_id}', terminate_on_failure=False) # def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False): # print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job instead!') # return self.submit_serial_hpc_job(name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory, time, block, shell_wrapper) def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False): arguments = arguments.format(process='%a') # %a is SLURM array index time = int( math.ceil(time*60) ) if shell_wrapper: shell_wrapper_sh = os.path.abspath(self.working_dir + f'/hpc.{name}.shell_wrapper.sh') with open(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE) executable, arguments = shell_wrapper_sh, '' slurm_file = working_dir + f'/.hpc.{name}.slurm' with open(slurm_file, 'w') as f: f.write( _T_slurm_array_job_template_.format( **vars() ) ) slurm_job_id = self.head_node_execute('Submitting SLURM array job...', f'cd {self.working_dir} && sbatch {slurm_file}', tracer=self.tracer, return_='output' ).split()[-1] # expecting something like `Submitted batch job 6122` in output self.jobs.append(slurm_job_id) if block: self.wait_until_complete( [slurm_job_id] ) return None else: return slurm_job_id def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, ntasks, memory=512, time=12, block=True, shell_wrapper=False): ''' submit jobs as MPI job ''' arguments = arguments.format(process='0') time = int( math.ceil(time*60) ) if shell_wrapper: shell_wrapper_sh = os.path.abspath(self.working_dir + f'/hpc.{name}.shell_wrapper.sh') with open(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE) executable, arguments = shell_wrapper_sh, '' slurm_file = working_dir + f'/.hpc.{name}.slurm' with open(slurm_file, 'w') as f: f.write( _T_slurm_mpi_job_template_.format( **vars() ) ) slurm_job_id = self.head_node_execute('Submitting SLURM mpi job...', f'cd {self.working_dir} && sbatch {slurm_file}', tracer=self.tracer, return_='output' ).split()[-1] # expecting something like `Submitted batch job 6122` in output self.jobs.append(slurm_job_id) if block: self.wait_until_complete( [slurm_job_id] ) return None else: return slurm_job_id