File size: 1,402 Bytes
630839b
 
 
5b43a4e
6026a97
 
630839b
80baa7f
 
630839b
 
 
 
 
 
 
37fdb97
6026a97
 
 
 
80baa7f
6026a97
 
 
5b43a4e
 
a174711
5465cf2
5b43a4e
630839b
5b43a4e
630839b
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from airflow import DAG
from airflow.decorators import task
from common import check_db_connection, get_session, default_args
from datetime import datetime, timedelta
import logging
from sqlalchemy import text

logger = logging.getLogger(__name__)

@task(task_id="refresh_views")
def _refresh_views():
    """
    Refreshes the materialized views in the database.
    This function is called by the DAG to refresh the views.
    It executes the SQL command to refresh the materialized view.
    """
    with get_session() as session:
        list_views = session.execute(text("SELECT matviewname FROM pg_matviews WHERE schemaname = 'public';"))
        
        for view in list_views:
            view_name = view[0]
            logger.info(f"Refreshing materialized view: {view_name}")
            session.execute(text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY public.{view_name} WITH DATA;"))
        
            session.commit()

# Copy default_args from the original code
dag_args = default_args.copy()
dag_args['start_date'] = datetime.now() - timedelta(days=1)

with DAG(dag_id="refresh_materialized_views",
         default_args=dag_args,
         schedule_interval="0 3 */1 * *") as dag:
    """
    DAG to refresh the materialized views in the database.
    This DAG runs daily at 3 AM.
    """
    check_db = check_db_connection()
    refresh = _refresh_views()
    
    check_db >> refresh