File size: 14,678 Bytes
105b369
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
from typing import Optional, Dict

from phi.app.db_app import DbApp
from phi.k8s.app.base import (
    K8sApp,
    AppVolumeType,  # noqa: F401
    ContainerContext,
    ServiceType,  # noqa: F401
    RestartPolicy,  # noqa: F401
    ImagePullPolicy,  # noqa: F401
)
from phi.utils.common import str_to_int
from phi.utils.log import logger


class AirflowBase(K8sApp):
    # -*- App Name
    name: str = "airflow"

    # -*- Image Configuration
    image_name: str = "phidata/airflow"
    image_tag: str = "2.7.1"

    # -*- App Ports
    # Open a container port if open_port=True
    open_port: bool = False
    port_number: int = 8080

    # -*- Workspace Configuration
    # Path to the parent directory of the workspace inside the container
    # When using git-sync, the git repo is cloned inside this directory
    #   i.e. this is the parent directory of the workspace
    workspace_parent_dir_container_path: str = "/usr/local/workspace"

    # -*- Airflow Configuration
    # airflow_env sets the AIRFLOW_ENV env var and can be used by
    # DAGs to separate dev/stg/prd code
    airflow_env: Optional[str] = None
    # Set the AIRFLOW_HOME env variable
    # Defaults to: /usr/local/airflow
    airflow_home: Optional[str] = None
    # Set the AIRFLOW__CORE__DAGS_FOLDER env variable to the workspace_root/{airflow_dags_dir}
    # By default, airflow_dags_dir is set to the "dags" folder in the workspace
    airflow_dags_dir: str = "dags"
    # Creates an airflow admin with username: admin, pass: admin
    create_airflow_admin_user: bool = False
    # Airflow Executor
    executor: str = "SequentialExecutor"

    # -*- Airflow Database Configuration
    # Set as True to wait for db before starting airflow
    wait_for_db: bool = False
    # Set as True to delay start by 60 seconds to wait for db migrations
    wait_for_db_migrate: bool = False
    # Connect to the database using a DbApp
    db_app: Optional[DbApp] = None
    # Provide database connection details manually
    # db_user can be provided here or as the
    # DB_USER env var in the secrets_file
    db_user: Optional[str] = None
    # db_password can be provided here or as the
    # DB_PASSWORD env var in the secrets_file
    db_password: Optional[str] = None
    # db_database can be provided here or as the
    # DB_DATABASE env var in the secrets_file
    db_database: Optional[str] = None
    # db_host can be provided here or as the
    # DB_HOST env var in the secrets_file
    db_host: Optional[str] = None
    # db_port can be provided here or as the
    # DB_PORT env var in the secrets_file
    db_port: Optional[int] = None
    # db_driver can be provided here or as the
    # DB_DRIVER env var in the secrets_file
    db_driver: str = "postgresql+psycopg2"
    db_result_backend_driver: str = "db+postgresql"
    # Airflow db connections in the format { conn_id: conn_url }
    # converted to env var: AIRFLOW_CONN__conn_id = conn_url
    db_connections: Optional[Dict] = None
    # Set as True to migrate (initialize/upgrade) the airflow_db
    db_migrate: bool = False

    # -*- Airflow Redis Configuration
    # Set as True to wait for redis before starting airflow
    wait_for_redis: bool = False
    # Connect to redis using a DbApp
    redis_app: Optional[DbApp] = None
    # Provide redis connection details manually
    # redis_password can be provided here or as the
    # REDIS_PASSWORD env var in the secrets_file
    redis_password: Optional[str] = None
    # redis_schema can be provided here or as the
    # REDIS_SCHEMA env var in the secrets_file
    redis_schema: Optional[str] = None
    # redis_host can be provided here or as the
    # REDIS_HOST env var in the secrets_file
    redis_host: Optional[str] = None
    # redis_port can be provided here or as the
    # REDIS_PORT env var in the secrets_file
    redis_port: Optional[int] = None
    # redis_driver can be provided here or as the
    # REDIS_DRIVER env var in the secrets_file
    redis_driver: str = "redis"

    #  -*- Other args
    load_examples: bool = False

    def get_db_user(self) -> Optional[str]:
        return self.db_user or self.get_secret_from_file("DATABASE_USER") or self.get_secret_from_file("DB_USER")

    def get_db_password(self) -> Optional[str]:
        return (
            self.db_password
            or self.get_secret_from_file("DATABASE_PASSWORD")
            or self.get_secret_from_file("DB_PASSWORD")
        )

    def get_db_database(self) -> Optional[str]:
        return self.db_database or self.get_secret_from_file("DATABASE_DB") or self.get_secret_from_file("DB_DATABASE")

    def get_db_driver(self) -> Optional[str]:
        return self.db_driver or self.get_secret_from_file("DATABASE_DRIVER") or self.get_secret_from_file("DB_DRIVER")

    def get_db_host(self) -> Optional[str]:
        return self.db_host or self.get_secret_from_file("DATABASE_HOST") or self.get_secret_from_file("DB_HOST")

    def get_db_port(self) -> Optional[int]:
        return (
            self.db_port
            or str_to_int(self.get_secret_from_file("DATABASE_PORT"))
            or str_to_int(self.get_secret_from_file("DB_PORT"))
        )

    def get_redis_password(self) -> Optional[str]:
        return self.redis_password or self.get_secret_from_file("REDIS_PASSWORD")

    def get_redis_schema(self) -> Optional[str]:
        return self.redis_schema or self.get_secret_from_file("REDIS_SCHEMA")

    def get_redis_host(self) -> Optional[str]:
        return self.redis_host or self.get_secret_from_file("REDIS_HOST")

    def get_redis_port(self) -> Optional[int]:
        return self.redis_port or str_to_int(self.get_secret_from_file("REDIS_PORT"))

    def get_redis_driver(self) -> Optional[str]:
        return self.redis_driver or self.get_secret_from_file("REDIS_DRIVER")

    def get_airflow_home(self) -> str:
        return self.airflow_home or "/usr/local/airflow"

    def get_container_env(self, container_context: ContainerContext) -> Dict[str, str]:
        from phi.constants import (
            PHI_RUNTIME_ENV_VAR,
            PYTHONPATH_ENV_VAR,
            REQUIREMENTS_FILE_PATH_ENV_VAR,
            SCRIPTS_DIR_ENV_VAR,
            STORAGE_DIR_ENV_VAR,
            WORKFLOWS_DIR_ENV_VAR,
            WORKSPACE_DIR_ENV_VAR,
            WORKSPACE_HASH_ENV_VAR,
            WORKSPACE_ID_ENV_VAR,
            WORKSPACE_ROOT_ENV_VAR,
            INIT_AIRFLOW_ENV_VAR,
            AIRFLOW_ENV_ENV_VAR,
            AIRFLOW_HOME_ENV_VAR,
            AIRFLOW_DAGS_FOLDER_ENV_VAR,
            AIRFLOW_EXECUTOR_ENV_VAR,
            AIRFLOW_DB_CONN_URL_ENV_VAR,
        )

        # Container Environment
        container_env: Dict[str, str] = self.container_env or {}
        container_env.update(
            {
                "INSTALL_REQUIREMENTS": str(self.install_requirements),
                "MOUNT_WORKSPACE": str(self.mount_workspace),
                "PRINT_ENV_ON_LOAD": str(self.print_env_on_load),
                PHI_RUNTIME_ENV_VAR: "kubernetes",
                REQUIREMENTS_FILE_PATH_ENV_VAR: container_context.requirements_file or "",
                SCRIPTS_DIR_ENV_VAR: container_context.scripts_dir or "",
                STORAGE_DIR_ENV_VAR: container_context.storage_dir or "",
                WORKFLOWS_DIR_ENV_VAR: container_context.workflows_dir or "",
                WORKSPACE_DIR_ENV_VAR: container_context.workspace_dir or "",
                WORKSPACE_ROOT_ENV_VAR: container_context.workspace_root or "",
                # Env variables used by Airflow
                # INIT_AIRFLOW env var is required for phidata to generate DAGs from workflows
                INIT_AIRFLOW_ENV_VAR: str(True),
                "DB_MIGRATE": str(self.db_migrate),
                "WAIT_FOR_DB": str(self.wait_for_db),
                "WAIT_FOR_DB_MIGRATE": str(self.wait_for_db_migrate),
                "WAIT_FOR_REDIS": str(self.wait_for_redis),
                "CREATE_AIRFLOW_ADMIN_USER": str(self.create_airflow_admin_user),
                AIRFLOW_EXECUTOR_ENV_VAR: str(self.executor),
                "AIRFLOW__CORE__LOAD_EXAMPLES": str(self.load_examples),
                # Airflow Navbar color
                "AIRFLOW__WEBSERVER__NAVBAR_COLOR": "#d1fae5",
            }
        )

        try:
            if container_context.workspace_schema is not None:
                if container_context.workspace_schema.id_workspace is not None:
                    container_env[WORKSPACE_ID_ENV_VAR] = str(container_context.workspace_schema.id_workspace) or ""
                if container_context.workspace_schema.ws_hash is not None:
                    container_env[WORKSPACE_HASH_ENV_VAR] = container_context.workspace_schema.ws_hash
        except Exception:
            pass

        if self.set_python_path:
            python_path = self.python_path
            if python_path is None:
                python_path = f"{container_context.workspace_root}:{self.get_airflow_home()}"
                if self.add_python_paths is not None:
                    python_path = "{}:{}".format(python_path, ":".join(self.add_python_paths))
            if python_path is not None:
                container_env[PYTHONPATH_ENV_VAR] = python_path

        # Set aws region and profile
        self.set_aws_env_vars(env_dict=container_env)

        # Set the AIRFLOW__CORE__DAGS_FOLDER
        container_env[AIRFLOW_DAGS_FOLDER_ENV_VAR] = f"{container_context.workspace_root}/{self.airflow_dags_dir}"

        # Set the AIRFLOW_ENV
        if self.airflow_env is not None:
            container_env[AIRFLOW_ENV_ENV_VAR] = self.airflow_env

        # Set the AIRFLOW_HOME
        if self.airflow_home is not None:
            container_env[AIRFLOW_HOME_ENV_VAR] = self.get_airflow_home()

        # Set the AIRFLOW__CONN_ variables
        if self.db_connections is not None:
            for conn_id, conn_url in self.db_connections.items():
                try:
                    af_conn_id = str("AIRFLOW_CONN_{}".format(conn_id)).upper()
                    container_env[af_conn_id] = conn_url
                except Exception as e:
                    logger.exception(e)
                    continue

        # Airflow db connection
        db_user = self.get_db_user()
        db_password = self.get_db_password()
        db_database = self.get_db_database()
        db_host = self.get_db_host()
        db_port = self.get_db_port()
        db_driver = self.get_db_driver()
        if self.db_app is not None and isinstance(self.db_app, DbApp):
            logger.debug(f"Reading db connection details from: {self.db_app.name}")
            if db_user is None:
                db_user = self.db_app.get_db_user()
            if db_password is None:
                db_password = self.db_app.get_db_password()
            if db_database is None:
                db_database = self.db_app.get_db_database()
            if db_host is None:
                db_host = self.db_app.get_db_host()
            if db_port is None:
                db_port = self.db_app.get_db_port()
            if db_driver is None:
                db_driver = self.db_app.get_db_driver()
        db_connection_url = f"{db_driver}://{db_user}:{db_password}@{db_host}:{db_port}/{db_database}"

        # Set the AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
        if "None" not in db_connection_url:
            logger.debug(f"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: {db_connection_url}")
            container_env[AIRFLOW_DB_CONN_URL_ENV_VAR] = db_connection_url

        # Set the database connection details in the container env
        if db_host is not None:
            container_env["DATABASE_HOST"] = db_host
        if db_port is not None:
            container_env["DATABASE_PORT"] = str(db_port)

        # Airflow redis connection
        if self.executor == "CeleryExecutor":
            # Airflow celery result backend
            celery_result_backend_driver = self.db_result_backend_driver or db_driver
            celery_result_backend_url = (
                f"{celery_result_backend_driver}://{db_user}:{db_password}@{db_host}:{db_port}/{db_database}"
            )
            # Set the AIRFLOW__CELERY__RESULT_BACKEND
            if "None" not in celery_result_backend_url:
                container_env["AIRFLOW__CELERY__RESULT_BACKEND"] = celery_result_backend_url

            # Airflow celery broker url
            _redis_pass = self.get_redis_password()
            redis_password = f"{_redis_pass}@" if _redis_pass else ""
            redis_schema = self.get_redis_schema()
            redis_host = self.get_redis_host()
            redis_port = self.get_redis_port()
            redis_driver = self.get_redis_driver()
            if self.redis_app is not None and isinstance(self.redis_app, DbApp):
                logger.debug(f"Reading redis connection details from: {self.redis_app.name}")
                if redis_password is None:
                    redis_password = self.redis_app.get_db_password()
                if redis_schema is None:
                    redis_schema = self.redis_app.get_db_database() or "0"
                if redis_host is None:
                    redis_host = self.redis_app.get_db_host()
                if redis_port is None:
                    redis_port = self.redis_app.get_db_port()
                if redis_driver is None:
                    redis_driver = self.redis_app.get_db_driver()

            # Set the AIRFLOW__CELERY__RESULT_BACKEND
            celery_broker_url = f"{redis_driver}://{redis_password}{redis_host}:{redis_port}/{redis_schema}"
            if "None" not in celery_broker_url:
                logger.debug(f"AIRFLOW__CELERY__BROKER_URL: {celery_broker_url}")
                container_env["AIRFLOW__CELERY__BROKER_URL"] = celery_broker_url

            # Set the redis connection details in the container env
            if redis_host is not None:
                container_env["REDIS_HOST"] = redis_host
            if redis_port is not None:
                container_env["REDIS_PORT"] = str(redis_port)

        # Update the container env using env_file
        env_data_from_file = self.get_env_file_data()
        if env_data_from_file is not None:
            container_env.update({k: str(v) for k, v in env_data_from_file.items() if v is not None})

        # Update the container env with user provided env_vars
        # this overwrites any existing variables with the same key
        if self.env_vars is not None and isinstance(self.env_vars, dict):
            container_env.update({k: str(v) for k, v in self.env_vars.items() if v is not None})

        # logger.debug("Container Environment: {}".format(container_env))
        return container_env