| import os |
| import sys |
|
|
| MODE_CONSUME = 'consume' |
| MODE_CONSUME_AND_PUBLISH = 'consume-and-publish' |
| MODE_PUBLISH = 'publish' |
|
|
|
|
| class Config: |
| @staticmethod |
| def __assert_variable_is_defined(value: ..., message: str) -> None: |
| if value is None: |
| |
| print(message) |
| |
| sys.exit(1) |
|
|
| def __init__(self) -> None: |
| self.mq_host = os.environ.get('MQ_HOST', 'mq-service') |
| self.mq_user = os.environ.get('MQ_USER') |
| self.mq_password = os.environ.get('MQ_PASSWORD') |
|
|
| |
| self.queue_mode = os.environ.get('QUEUE_MODE', MODE_PUBLISH).lower() |
| print(self.queue_mode) |
| |
| if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]: |
| self.queue_mode = MODE_CONSUME |
| |
| if self.queue_mode in [MODE_PUBLISH, 'push', 'load', ]: |
| self.queue_mode = MODE_PUBLISH |
| |
| if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]: |
| self.queue_mode = MODE_CONSUME_AND_PUBLISH |
|
|
| self.queue_dest_test_trigger= os.environ.get('QUEUE_DEST_TEST_TRIGGER') |
| self.queue_dest_pipeline_trigger = os.environ.get('QUEUE_DEST_PIPELINE_TRIGGER') |
| self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/') |
| |
| self.data_index = os.environ.get('DATA_INDEX', '/storage/index') |
|
|
| |
| self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER") |
| self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD") |
| if self.is_start_publish(): |
| self.__assert_variable_is_defined(self.queue_dest_test_trigger, "Destination queue is not specified in QUEUE_DEST_TEST_TRIGGER") |
| self.__assert_variable_is_defined(self.queue_dest_pipeline_trigger, "Destination queue is not specified in QUEUE_DEST_PIPELINE_TRIGGER") |
|
|
|
|
| def display(self) -> None: |
| for name in sorted(dir(self)): |
| if not name.startswith('_') and name not in [ |
| 'display', |
| 'is_start_consume', |
| 'is_start_publish', |
| 'mq_password', |
| ]: |
| print(f"{name.upper()}={getattr(self, name)}") |
| print() |
|
|
|
|
| def is_start_publish(self) -> bool: |
| return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ] |
|
|