File size: 6,949 Bytes
256a159
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import os
import os.path as osp
import re
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from threading import Lock
from typing import Any, Dict, List, Tuple

import mmengine
import numpy as np
from mmengine.config import ConfigDict
from tqdm import tqdm

from opencompass.registry import RUNNERS, TASKS
from opencompass.utils import get_logger

from .base import BaseRunner


def get_command_template(gpu_ids: List[int]) -> str:
    """Format command template given available gpu ids."""
    if sys.platform == 'win32':  # Always return win32 for Windows
        # use command in Windows format
        tmpl = 'set CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids)
        tmpl += ' & {task_cmd}'
    else:
        tmpl = 'CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids)
        tmpl += ' {task_cmd}'
    return tmpl


@RUNNERS.register_module()
class LocalRunner(BaseRunner):
    """Local runner. Start tasks by local python.

    Args:
        task (ConfigDict): Task type config.
        max_num_workers (int): Max number of workers to run in parallel.
            Defaults to 16.
        max_workers_per_gpu (int): Max number of workers to run for one GPU.
            Defaults to 1.
        debug (bool): Whether to run in debug mode.
        lark_bot_url (str): Lark bot url.
    """

    def __init__(self,
                 task: ConfigDict,
                 max_num_workers: int = 16,
                 debug: bool = False,
                 max_workers_per_gpu: int = 1,
                 lark_bot_url: str = None):
        super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url)
        self.max_num_workers = max_num_workers
        self.max_workers_per_gpu = max_workers_per_gpu

    def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]:
        """Launch multiple tasks.

        Args:
            tasks (list[dict]): A list of task configs, usually generated by
                Partitioner.

        Returns:
            list[tuple[str, int]]: A list of (task name, exit code).
        """

        status = []
        import torch
        if 'CUDA_VISIBLE_DEVICES' in os.environ:
            all_gpu_ids = [
                int(i) for i in re.findall(r'(?<!-)\d+',
                                           os.getenv('CUDA_VISIBLE_DEVICES'))
            ]
        else:
            all_gpu_ids = list(range(torch.cuda.device_count()))

        if self.debug:
            for task in tasks:
                task = TASKS.build(dict(cfg=task, type=self.task_cfg['type']))
                task_name = task.name
                num_gpus = task.num_gpus
                assert len(all_gpu_ids) >= num_gpus
                # get cmd
                mmengine.mkdir_or_exist('tmp/')
                param_file = f'tmp/{os.getpid()}_params.py'
                try:
                    task.cfg.dump(param_file)
                    # if use torchrun, restrict it behaves the same as non
                    # debug mode, otherwise, the torchrun will use all the
                    # available resources which might cause inconsistent
                    # behavior.
                    if len(all_gpu_ids) > num_gpus and num_gpus > 0:
                        get_logger().warning(f'Only use {num_gpus} GPUs for '
                                             f'total {len(all_gpu_ids)} '
                                             'available GPUs in debug mode.')
                    tmpl = get_command_template(all_gpu_ids[:num_gpus])
                    cmd = task.get_command(cfg_path=param_file, template=tmpl)
                    # run in subprocess if starts with torchrun etc.
                    if 'python3 ' in cmd or 'python ' in cmd:
                        task.run()
                    else:
                        subprocess.run(cmd, shell=True, text=True)
                finally:
                    os.remove(param_file)
                status.append((task_name, 0))
        else:
            if len(all_gpu_ids) > 0:
                gpus = np.zeros(max(all_gpu_ids) + 1, dtype=np.uint)
                gpus[all_gpu_ids] = self.max_workers_per_gpu
            else:
                gpus = np.array([], dtype=np.uint)

            pbar = tqdm(total=len(tasks))
            lock = Lock()

            def submit(task, index):
                task = TASKS.build(dict(cfg=task, type=self.task_cfg['type']))
                num_gpus = task.num_gpus
                assert len(gpus) >= num_gpus

                while True:
                    lock.acquire()
                    if sum(gpus > 0) >= num_gpus:
                        gpu_ids = np.where(gpus)[0][:num_gpus]
                        gpus[gpu_ids] -= 1
                        lock.release()
                        break
                    lock.release()
                    time.sleep(1)

                if num_gpus > 0:
                    tqdm.write(f'launch {task.name} on GPU ' +
                               ','.join(map(str, gpu_ids)))
                else:
                    tqdm.write(f'launch {task.name} on CPU ')

                res = self._launch(task, gpu_ids, index)
                pbar.update()

                with lock:
                    gpus[gpu_ids] += 1

                return res

            with ThreadPoolExecutor(
                    max_workers=self.max_num_workers) as executor:
                status = executor.map(submit, tasks, range(len(tasks)))

        return status

    def _launch(self, task, gpu_ids, index):
        """Launch a single task.

        Args:
            task (BaseTask): Task to launch.

        Returns:
            tuple[str, int]: Task name and exit code.
        """

        task_name = task.name

        # Dump task config to file
        mmengine.mkdir_or_exist('tmp/')
        param_file = f'tmp/{os.getpid()}_{index}_params.py'
        try:
            task.cfg.dump(param_file)
            tmpl = get_command_template(gpu_ids)
            get_cmd = partial(task.get_command,
                              cfg_path=param_file,
                              template=tmpl)
            cmd = get_cmd()

            logger = get_logger()
            logger.debug(f'Running command: {cmd}')

            # Run command
            out_path = task.get_log_path(file_extension='out')
            mmengine.mkdir_or_exist(osp.split(out_path)[0])
            stdout = open(out_path, 'w', encoding='utf-8')

            result = subprocess.run(cmd,
                                    shell=True,
                                    text=True,
                                    stdout=stdout,
                                    stderr=stdout)

            if result.returncode != 0:
                logger.warning(f'task {task_name} fail, see\n{out_path}')
        finally:
            # Clean up
            os.remove(param_file)
        return task_name, result.returncode