Spaces:
Running
Running
File size: 1,402 Bytes
630839b 5b43a4e 6026a97 630839b 80baa7f 630839b 37fdb97 6026a97 80baa7f 6026a97 5b43a4e a174711 5465cf2 5b43a4e 630839b 5b43a4e 630839b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
from airflow import DAG
from airflow.decorators import task
from common import check_db_connection, get_session, default_args
from datetime import datetime, timedelta
import logging
from sqlalchemy import text
logger = logging.getLogger(__name__)
@task(task_id="refresh_views")
def _refresh_views():
"""
Refreshes the materialized views in the database.
This function is called by the DAG to refresh the views.
It executes the SQL command to refresh the materialized view.
"""
with get_session() as session:
list_views = session.execute(text("SELECT matviewname FROM pg_matviews WHERE schemaname = 'public';"))
for view in list_views:
view_name = view[0]
logger.info(f"Refreshing materialized view: {view_name}")
session.execute(text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY public.{view_name} WITH DATA;"))
session.commit()
# Copy default_args from the original code
dag_args = default_args.copy()
dag_args['start_date'] = datetime.now() - timedelta(days=1)
with DAG(dag_id="refresh_materialized_views",
default_args=dag_args,
schedule_interval="0 3 */1 * *") as dag:
"""
DAG to refresh the materialized views in the database.
This DAG runs daily at 3 AM.
"""
check_db = check_db_connection()
refresh = _refresh_views()
check_db >> refresh
|