Spaces:
Running
Running
File size: 1,857 Bytes
630839b 8d57950 37fdb97 630839b 37fdb97 630839b 80baa7f 630839b 37fdb97 630839b 37fdb97 630839b 37fdb97 630839b 37fdb97 8d57950 80baa7f 630839b 80baa7f 630839b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
import logging
from airflow.decorators import task
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from datetime import datetime, timedelta
from typing import Generator
from contextlib import contextmanager
logger = logging.getLogger(__name__)
# Load environment variables from .env file
load_dotenv()
_engine = None
_SessionLocal = None
default_args = {
"owner": "airflow",
"start_date": datetime.now() - timedelta(minutes=5),
"catchup": False
}
def get_engine():
global _engine
if _engine is None:
# Load the database URL from environment variables
database_url = os.getenv("DATABASE_URL")
if not database_url:
raise ValueError("DATABASE_URL is not set in environment variables")
# Create the SQLAlchemy engine
_engine = create_engine(database_url, pool_pre_ping=True)
return _engine
def get_session_local() -> sessionmaker:
global _SessionLocal
if _SessionLocal is None:
# Create a new session local
_SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=get_engine())
return _SessionLocal
@contextmanager
def get_session() -> Generator[Session, None, None]:
"""
Get a connection to the Postgres database
"""
SessionLocal = get_session_local()
db = SessionLocal()
try:
yield db
finally:
db.close()
@task(task_id="check_db_connection")
def check_db_connection():
"""
Checks the connection to the database.
"""
try:
with get_session() as session:
session.execute(text("SELECT 1"))
logger.info("Database connection is successful.")
except Exception as e:
logger.error(f"Database connection failed: {e}")
raise |