Spaces:
Runtime error
Runtime error
File size: 14,678 Bytes
105b369 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
from typing import Optional, Dict
from phi.app.db_app import DbApp
from phi.k8s.app.base import (
K8sApp,
AppVolumeType, # noqa: F401
ContainerContext,
ServiceType, # noqa: F401
RestartPolicy, # noqa: F401
ImagePullPolicy, # noqa: F401
)
from phi.utils.common import str_to_int
from phi.utils.log import logger
class AirflowBase(K8sApp):
# -*- App Name
name: str = "airflow"
# -*- Image Configuration
image_name: str = "phidata/airflow"
image_tag: str = "2.7.1"
# -*- App Ports
# Open a container port if open_port=True
open_port: bool = False
port_number: int = 8080
# -*- Workspace Configuration
# Path to the parent directory of the workspace inside the container
# When using git-sync, the git repo is cloned inside this directory
# i.e. this is the parent directory of the workspace
workspace_parent_dir_container_path: str = "/usr/local/workspace"
# -*- Airflow Configuration
# airflow_env sets the AIRFLOW_ENV env var and can be used by
# DAGs to separate dev/stg/prd code
airflow_env: Optional[str] = None
# Set the AIRFLOW_HOME env variable
# Defaults to: /usr/local/airflow
airflow_home: Optional[str] = None
# Set the AIRFLOW__CORE__DAGS_FOLDER env variable to the workspace_root/{airflow_dags_dir}
# By default, airflow_dags_dir is set to the "dags" folder in the workspace
airflow_dags_dir: str = "dags"
# Creates an airflow admin with username: admin, pass: admin
create_airflow_admin_user: bool = False
# Airflow Executor
executor: str = "SequentialExecutor"
# -*- Airflow Database Configuration
# Set as True to wait for db before starting airflow
wait_for_db: bool = False
# Set as True to delay start by 60 seconds to wait for db migrations
wait_for_db_migrate: bool = False
# Connect to the database using a DbApp
db_app: Optional[DbApp] = None
# Provide database connection details manually
# db_user can be provided here or as the
# DB_USER env var in the secrets_file
db_user: Optional[str] = None
# db_password can be provided here or as the
# DB_PASSWORD env var in the secrets_file
db_password: Optional[str] = None
# db_database can be provided here or as the
# DB_DATABASE env var in the secrets_file
db_database: Optional[str] = None
# db_host can be provided here or as the
# DB_HOST env var in the secrets_file
db_host: Optional[str] = None
# db_port can be provided here or as the
# DB_PORT env var in the secrets_file
db_port: Optional[int] = None
# db_driver can be provided here or as the
# DB_DRIVER env var in the secrets_file
db_driver: str = "postgresql+psycopg2"
db_result_backend_driver: str = "db+postgresql"
# Airflow db connections in the format { conn_id: conn_url }
# converted to env var: AIRFLOW_CONN__conn_id = conn_url
db_connections: Optional[Dict] = None
# Set as True to migrate (initialize/upgrade) the airflow_db
db_migrate: bool = False
# -*- Airflow Redis Configuration
# Set as True to wait for redis before starting airflow
wait_for_redis: bool = False
# Connect to redis using a DbApp
redis_app: Optional[DbApp] = None
# Provide redis connection details manually
# redis_password can be provided here or as the
# REDIS_PASSWORD env var in the secrets_file
redis_password: Optional[str] = None
# redis_schema can be provided here or as the
# REDIS_SCHEMA env var in the secrets_file
redis_schema: Optional[str] = None
# redis_host can be provided here or as the
# REDIS_HOST env var in the secrets_file
redis_host: Optional[str] = None
# redis_port can be provided here or as the
# REDIS_PORT env var in the secrets_file
redis_port: Optional[int] = None
# redis_driver can be provided here or as the
# REDIS_DRIVER env var in the secrets_file
redis_driver: str = "redis"
# -*- Other args
load_examples: bool = False
def get_db_user(self) -> Optional[str]:
return self.db_user or self.get_secret_from_file("DATABASE_USER") or self.get_secret_from_file("DB_USER")
def get_db_password(self) -> Optional[str]:
return (
self.db_password
or self.get_secret_from_file("DATABASE_PASSWORD")
or self.get_secret_from_file("DB_PASSWORD")
)
def get_db_database(self) -> Optional[str]:
return self.db_database or self.get_secret_from_file("DATABASE_DB") or self.get_secret_from_file("DB_DATABASE")
def get_db_driver(self) -> Optional[str]:
return self.db_driver or self.get_secret_from_file("DATABASE_DRIVER") or self.get_secret_from_file("DB_DRIVER")
def get_db_host(self) -> Optional[str]:
return self.db_host or self.get_secret_from_file("DATABASE_HOST") or self.get_secret_from_file("DB_HOST")
def get_db_port(self) -> Optional[int]:
return (
self.db_port
or str_to_int(self.get_secret_from_file("DATABASE_PORT"))
or str_to_int(self.get_secret_from_file("DB_PORT"))
)
def get_redis_password(self) -> Optional[str]:
return self.redis_password or self.get_secret_from_file("REDIS_PASSWORD")
def get_redis_schema(self) -> Optional[str]:
return self.redis_schema or self.get_secret_from_file("REDIS_SCHEMA")
def get_redis_host(self) -> Optional[str]:
return self.redis_host or self.get_secret_from_file("REDIS_HOST")
def get_redis_port(self) -> Optional[int]:
return self.redis_port or str_to_int(self.get_secret_from_file("REDIS_PORT"))
def get_redis_driver(self) -> Optional[str]:
return self.redis_driver or self.get_secret_from_file("REDIS_DRIVER")
def get_airflow_home(self) -> str:
return self.airflow_home or "/usr/local/airflow"
def get_container_env(self, container_context: ContainerContext) -> Dict[str, str]:
from phi.constants import (
PHI_RUNTIME_ENV_VAR,
PYTHONPATH_ENV_VAR,
REQUIREMENTS_FILE_PATH_ENV_VAR,
SCRIPTS_DIR_ENV_VAR,
STORAGE_DIR_ENV_VAR,
WORKFLOWS_DIR_ENV_VAR,
WORKSPACE_DIR_ENV_VAR,
WORKSPACE_HASH_ENV_VAR,
WORKSPACE_ID_ENV_VAR,
WORKSPACE_ROOT_ENV_VAR,
INIT_AIRFLOW_ENV_VAR,
AIRFLOW_ENV_ENV_VAR,
AIRFLOW_HOME_ENV_VAR,
AIRFLOW_DAGS_FOLDER_ENV_VAR,
AIRFLOW_EXECUTOR_ENV_VAR,
AIRFLOW_DB_CONN_URL_ENV_VAR,
)
# Container Environment
container_env: Dict[str, str] = self.container_env or {}
container_env.update(
{
"INSTALL_REQUIREMENTS": str(self.install_requirements),
"MOUNT_WORKSPACE": str(self.mount_workspace),
"PRINT_ENV_ON_LOAD": str(self.print_env_on_load),
PHI_RUNTIME_ENV_VAR: "kubernetes",
REQUIREMENTS_FILE_PATH_ENV_VAR: container_context.requirements_file or "",
SCRIPTS_DIR_ENV_VAR: container_context.scripts_dir or "",
STORAGE_DIR_ENV_VAR: container_context.storage_dir or "",
WORKFLOWS_DIR_ENV_VAR: container_context.workflows_dir or "",
WORKSPACE_DIR_ENV_VAR: container_context.workspace_dir or "",
WORKSPACE_ROOT_ENV_VAR: container_context.workspace_root or "",
# Env variables used by Airflow
# INIT_AIRFLOW env var is required for phidata to generate DAGs from workflows
INIT_AIRFLOW_ENV_VAR: str(True),
"DB_MIGRATE": str(self.db_migrate),
"WAIT_FOR_DB": str(self.wait_for_db),
"WAIT_FOR_DB_MIGRATE": str(self.wait_for_db_migrate),
"WAIT_FOR_REDIS": str(self.wait_for_redis),
"CREATE_AIRFLOW_ADMIN_USER": str(self.create_airflow_admin_user),
AIRFLOW_EXECUTOR_ENV_VAR: str(self.executor),
"AIRFLOW__CORE__LOAD_EXAMPLES": str(self.load_examples),
# Airflow Navbar color
"AIRFLOW__WEBSERVER__NAVBAR_COLOR": "#d1fae5",
}
)
try:
if container_context.workspace_schema is not None:
if container_context.workspace_schema.id_workspace is not None:
container_env[WORKSPACE_ID_ENV_VAR] = str(container_context.workspace_schema.id_workspace) or ""
if container_context.workspace_schema.ws_hash is not None:
container_env[WORKSPACE_HASH_ENV_VAR] = container_context.workspace_schema.ws_hash
except Exception:
pass
if self.set_python_path:
python_path = self.python_path
if python_path is None:
python_path = f"{container_context.workspace_root}:{self.get_airflow_home()}"
if self.add_python_paths is not None:
python_path = "{}:{}".format(python_path, ":".join(self.add_python_paths))
if python_path is not None:
container_env[PYTHONPATH_ENV_VAR] = python_path
# Set aws region and profile
self.set_aws_env_vars(env_dict=container_env)
# Set the AIRFLOW__CORE__DAGS_FOLDER
container_env[AIRFLOW_DAGS_FOLDER_ENV_VAR] = f"{container_context.workspace_root}/{self.airflow_dags_dir}"
# Set the AIRFLOW_ENV
if self.airflow_env is not None:
container_env[AIRFLOW_ENV_ENV_VAR] = self.airflow_env
# Set the AIRFLOW_HOME
if self.airflow_home is not None:
container_env[AIRFLOW_HOME_ENV_VAR] = self.get_airflow_home()
# Set the AIRFLOW__CONN_ variables
if self.db_connections is not None:
for conn_id, conn_url in self.db_connections.items():
try:
af_conn_id = str("AIRFLOW_CONN_{}".format(conn_id)).upper()
container_env[af_conn_id] = conn_url
except Exception as e:
logger.exception(e)
continue
# Airflow db connection
db_user = self.get_db_user()
db_password = self.get_db_password()
db_database = self.get_db_database()
db_host = self.get_db_host()
db_port = self.get_db_port()
db_driver = self.get_db_driver()
if self.db_app is not None and isinstance(self.db_app, DbApp):
logger.debug(f"Reading db connection details from: {self.db_app.name}")
if db_user is None:
db_user = self.db_app.get_db_user()
if db_password is None:
db_password = self.db_app.get_db_password()
if db_database is None:
db_database = self.db_app.get_db_database()
if db_host is None:
db_host = self.db_app.get_db_host()
if db_port is None:
db_port = self.db_app.get_db_port()
if db_driver is None:
db_driver = self.db_app.get_db_driver()
db_connection_url = f"{db_driver}://{db_user}:{db_password}@{db_host}:{db_port}/{db_database}"
# Set the AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
if "None" not in db_connection_url:
logger.debug(f"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: {db_connection_url}")
container_env[AIRFLOW_DB_CONN_URL_ENV_VAR] = db_connection_url
# Set the database connection details in the container env
if db_host is not None:
container_env["DATABASE_HOST"] = db_host
if db_port is not None:
container_env["DATABASE_PORT"] = str(db_port)
# Airflow redis connection
if self.executor == "CeleryExecutor":
# Airflow celery result backend
celery_result_backend_driver = self.db_result_backend_driver or db_driver
celery_result_backend_url = (
f"{celery_result_backend_driver}://{db_user}:{db_password}@{db_host}:{db_port}/{db_database}"
)
# Set the AIRFLOW__CELERY__RESULT_BACKEND
if "None" not in celery_result_backend_url:
container_env["AIRFLOW__CELERY__RESULT_BACKEND"] = celery_result_backend_url
# Airflow celery broker url
_redis_pass = self.get_redis_password()
redis_password = f"{_redis_pass}@" if _redis_pass else ""
redis_schema = self.get_redis_schema()
redis_host = self.get_redis_host()
redis_port = self.get_redis_port()
redis_driver = self.get_redis_driver()
if self.redis_app is not None and isinstance(self.redis_app, DbApp):
logger.debug(f"Reading redis connection details from: {self.redis_app.name}")
if redis_password is None:
redis_password = self.redis_app.get_db_password()
if redis_schema is None:
redis_schema = self.redis_app.get_db_database() or "0"
if redis_host is None:
redis_host = self.redis_app.get_db_host()
if redis_port is None:
redis_port = self.redis_app.get_db_port()
if redis_driver is None:
redis_driver = self.redis_app.get_db_driver()
# Set the AIRFLOW__CELERY__RESULT_BACKEND
celery_broker_url = f"{redis_driver}://{redis_password}{redis_host}:{redis_port}/{redis_schema}"
if "None" not in celery_broker_url:
logger.debug(f"AIRFLOW__CELERY__BROKER_URL: {celery_broker_url}")
container_env["AIRFLOW__CELERY__BROKER_URL"] = celery_broker_url
# Set the redis connection details in the container env
if redis_host is not None:
container_env["REDIS_HOST"] = redis_host
if redis_port is not None:
container_env["REDIS_PORT"] = str(redis_port)
# Update the container env using env_file
env_data_from_file = self.get_env_file_data()
if env_data_from_file is not None:
container_env.update({k: str(v) for k, v in env_data_from_file.items() if v is not None})
# Update the container env with user provided env_vars
# this overwrites any existing variables with the same key
if self.env_vars is not None and isinstance(self.env_vars, dict):
container_env.update({k: str(v) for k, v in self.env_vars.items() if v is not None})
# logger.debug("Container Environment: {}".format(container_env))
return container_env
|