File size: 7,235 Bytes
6b89792
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# -*- coding: utf-8 -*-
# :noTabs=true:

import os, sys, time, collections, math
import stat as stat_module


try:
    from .base import *

except ImportError: # workaround for B2 back-end's
    import imp
    imp.load_source(__name__, '/'.join(__file__.split('/')[:-1]) + '/base.py')  # A bit of Python magic here, what we trying to say is this: from base import *, but path to base is calculated from our source location  # from base import HPC_Driver, execute, NT


_T_slurm_array_job_template_ = '''\
#!/bin/bash
#
#SBATCH --job-name={name}
#SBATCH --output={log_dir}/.hpc.%x.%a.output
#
#SBATCH --time={time}:00
#SBATCH --mem-per-cpu={memory}M
#SBATCH --chdir={working_dir}
#
#SBATCH --array=1-{jobs_to_queue}

srun {executable} {arguments}
'''

_T_slurm_mpi_job_template_ = '''\
#!/bin/bash
#
#SBATCH --job-name={name}
#SBATCH --output={log_dir}/.hpc.%x.output
#
#SBATCH --time={time}:00
#SBATCH --mem-per-cpu={memory}M
#SBATCH --chdir={working_dir}
#
#SBATCH --ntasks={ntasks}

mpirun {executable} {arguments}
'''

class Slurm_HPC_Driver(HPC_Driver):
    def head_node_execute(self, message, command_line, *args, **kwargs):
        head_node = self.config['slurm'].get('head_node')

        command_line, host = (f"ssh {head_node} cd `pwd` '&& {command_line}'", head_node) if head_node else (command_line, 'localhost')
        return execute(f'Executiong on {host}: {message}' if message else '', command_line, *args, **kwargs)


    # NodeGroup = collections.namedtuple('NodeGroup', 'nodes cores')

    # @property
    # def mpi_topology(self):
    #     ''' return list of NodeGroup's
    #     '''
    #     pass


    # @property
    # def number_of_cpu_per_node(self): return int( self.config['condor']['mpi_cpu_per_node'] )

    # @property
    # def maximum_number_of_mpi_cpu(self):
    #     return self.number_of_cpu_per_node * int( self.config['condor']['mpi_maximum_number_of_nodes'] )


    # def complete(self, condor_job_id):
    #     ''' Return job completion status. Note that single hpc_job may contatin inner list of individual HPC jobs, True should be return if they all run in to completion.
    #     '''

    #     execute('Releasing condor jobs...', 'condor_release $USER', return_='tuple')

    #     s = execute('', 'condor_q $USER | grep $USER | grep {}'.format(condor_job_id), return_='output', terminate_on_failure=False).replace(' ', '').replace('\n', '')
    #     if s: return False

    #         # #setDaemonStatusAndPing('[Job #%s] Running... %s condor job(s) in queue...' % (self.id, len(s.split('\n') ) ) )
    #         # n_jobs = len(s.split('\n'))
    #         # s, o = execute('', 'condor_userprio -all | grep $USER@', return_='tuple')
    #         # if s == 0:
    #         #     jobs_running = o.split()
    #         #     jobs_running = 'XX' if len(jobs_running) < 4 else jobs_running[4]
    #         #     self.set_daemon_message("Waiting for condor to finish HPC jobs... [{} jobs in HPC-Queue, {} CPU's used]".format(n_jobs, jobs_running) )
    #         #     print "{} condor jobs in queue... Sleeping 32s...    \r".format(n_jobs),
    #         # sys.stdout.flush()
    #         # time.sleep(32)
    #     else:

    #         #self.tracer('Waiting for condor to finish the jobs... DONE')
    #         self.jobs.remove(condor_job_id)
    #         self.cpu_usage += self.get_condor_accumulated_usage()
    #         return True  # jobs already finished, we return empty list to prevent double counting of cpu_usage


    def complete(self, slurm_job_id):
        ''' Return True if job with given id is complete
        '''

        s = self.head_node_execute('', f'squeue -j {slurm_job_id} --noheader', return_='output', terminate_on_failure=False, silent=True)
        if s: return False
        else:
            #self.tracer('Waiting for condor to finish the jobs... DONE')
            self.jobs.remove(slurm_job_id)
            return True  # jobs already finished, we return empty list to prevent double counting of cpu_usage


    def cancel_job(self, slurm_job_id):
        self.head_node_execute(f'Slurm_HPC_Driver.canceling job {slurm_job_id}...', f'scancel {slurm_job_id}', terminate_on_failure=False)


    # def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
    #     print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job  instead!')
    #     return self.submit_serial_hpc_job(name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory, time, block, shell_wrapper)


    def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):

        arguments = arguments.format(process='%a') # %a is SLURM array index
        time = int( math.ceil(time*60) )

        if shell_wrapper:
            shell_wrapper_sh = os.path.abspath(self.working_dir + f'/hpc.{name}.shell_wrapper.sh')
            with open(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments));  os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
            executable, arguments = shell_wrapper_sh, ''

        slurm_file = working_dir + f'/.hpc.{name}.slurm'

        with open(slurm_file, 'w') as f: f.write( _T_slurm_array_job_template_.format( **vars() ) )


        slurm_job_id = self.head_node_execute('Submitting SLURM array job...', f'cd {self.working_dir} && sbatch {slurm_file}',
                                              tracer=self.tracer, return_='output'
                                              ).split()[-1] # expecting something like `Submitted batch job 6122` in output


        self.jobs.append(slurm_job_id)

        if block:
            self.wait_until_complete( [slurm_job_id] )
            return None

        else: return slurm_job_id





    def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, ntasks, memory=512, time=12, block=True, shell_wrapper=False):
        ''' submit jobs as MPI job
        '''
        arguments = arguments.format(process='0')
        time = int( math.ceil(time*60) )

        if shell_wrapper:
            shell_wrapper_sh = os.path.abspath(self.working_dir + f'/hpc.{name}.shell_wrapper.sh')
            with open(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments));  os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
            executable, arguments = shell_wrapper_sh, ''

        slurm_file = working_dir + f'/.hpc.{name}.slurm'

        with open(slurm_file, 'w') as f: f.write( _T_slurm_mpi_job_template_.format( **vars() ) )

        slurm_job_id = self.head_node_execute('Submitting SLURM mpi job...', f'cd {self.working_dir} && sbatch {slurm_file}',
                                              tracer=self.tracer, return_='output'
                                              ).split()[-1] # expecting something like `Submitted batch job 6122` in output

        self.jobs.append(slurm_job_id)

        if block:
            self.wait_until_complete( [slurm_job_id] )
            return None

        else: return slurm_job_id