ramimu's picture
Upload 586 files
1c72248 verified
import copy
import json
from collections import OrderedDict
from toolkit.timer import Timer
class BaseProcess(object):
def __init__(
self,
process_id: int,
job: 'BaseJob',
config: OrderedDict
):
self.process_id = process_id
self.meta: OrderedDict
self.job = job
self.config = config
self.raw_process_config = config
self.name = self.get_conf('name', self.job.name)
self.meta = copy.deepcopy(self.job.meta)
self.timer: Timer = Timer(f'{self.name} Timer')
self.performance_log_every = self.get_conf('performance_log_every', 0)
print(json.dumps(self.config, indent=4))
def on_error(self, e: Exception):
pass
def get_conf(self, key, default=None, required=False, as_type=None):
# split key by '.' and recursively get the value
keys = key.split('.')
# see if it exists in the config
value = self.config
for subkey in keys:
if subkey in value:
value = value[subkey]
else:
value = None
break
if value is not None:
if as_type is not None:
value = as_type(value)
return value
elif required:
raise ValueError(f'config file error. Missing "config.process[{self.process_id}].{key}" key')
else:
if as_type is not None and default is not None:
return as_type(default)
return default
def run(self):
# implement in child class
# be sure to call super().run() first incase something is added here
pass
def add_meta(self, additional_meta: OrderedDict):
self.meta.update(additional_meta)
from jobs import BaseJob