File size: 1,835 Bytes
1c72248
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import copy
import json
from collections import OrderedDict

from toolkit.timer import Timer


class BaseProcess(object):

    def __init__(
            self,
            process_id: int,
            job: 'BaseJob',
            config: OrderedDict
    ):
        self.process_id = process_id
        self.meta: OrderedDict
        self.job = job
        self.config = config
        self.raw_process_config = config
        self.name = self.get_conf('name', self.job.name)
        self.meta = copy.deepcopy(self.job.meta)
        self.timer: Timer = Timer(f'{self.name} Timer')
        self.performance_log_every = self.get_conf('performance_log_every', 0)

        print(json.dumps(self.config, indent=4))
        
    def on_error(self, e: Exception):
        pass

    def get_conf(self, key, default=None, required=False, as_type=None):
        # split key by '.' and recursively get the value
        keys = key.split('.')

        # see if it exists in the config
        value = self.config
        for subkey in keys:
            if subkey in value:
                value = value[subkey]
            else:
                value = None
                break

        if value is not None:
            if as_type is not None:
                value = as_type(value)
            return value
        elif required:
            raise ValueError(f'config file error. Missing "config.process[{self.process_id}].{key}" key')
        else:
            if as_type is not None and default is not None:
                return as_type(default)
            return default

    def run(self):
        # implement in child class
        # be sure to call super().run() first incase something is added here
        pass

    def add_meta(self, additional_meta: OrderedDict):
        self.meta.update(additional_meta)


from jobs import BaseJob