File size: 4,357 Bytes
d8535a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import json
import streamlit as st
import pandas as pd
from google.cloud import bigquery_datatransfer
from google.cloud import bigquery
from google.oauth2 import service_account
from datetime import datetime, timezone
from html_templates import button_styles, sq_title, sq_placeholder, tooltip_message_sq, logo


st.markdown(logo, unsafe_allow_html=True)
st.logo("alerter_4.jpeg")

# Function to load GCP credentials
def load_gcp_credentials():
    try:
        gcp_credentials_str = os.getenv('GCP_CREDENTIALS')
        if not gcp_credentials_str:
            raise ValueError("GCP_CREDENTIALS environment variable not defined")

        gcp_credentials = json.loads(gcp_credentials_str)
        credentials = service_account.Credentials.from_service_account_info(gcp_credentials)

        return credentials
    except Exception as e:
        st.error(f"❌ Error loading GCP credentials: {str(e)}")
        return None

# Function to get BigQuery Data Transfer Client
def get_transfer_client():
    credentials = load_gcp_credentials()
    if credentials:
        return bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)
    return None

# Function to list all scheduled queries
@st.cache_data(ttl=600, show_spinner=True)
def list_scheduled_queries():
    try:
        client = get_transfer_client()
        if not client:
            return []

        project_id = "fynd-db"  # Change if necessary
        parent = f"projects/{project_id}/locations/asia-south1"  # Mumbai Region

        transfer_configs = client.list_transfer_configs(parent=parent)
        return list(transfer_configs)
    except Exception as e:
        st.error(f"❌ Error retrieving scheduled queries: {str(e)}")
        return []

# Function to get details of a selected scheduled query
@st.cache_data(ttl=600, show_spinner=True)
def get_query_details(query_id):
    try:
        client = get_transfer_client()
        if not client:
            return None

        query = client.get_transfer_config(name=query_id)
        return {
            "Display Name": [query.display_name],
            "Source": [query.data_source_id],
            "Schedule (UTC)": [query.schedule],
            "Region": [query.name.split("/")[3]],  # Extracting region from query name
            "Destination Dataset": [query.destination_dataset_id],
            "Next Scheduled Run": [query.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if query.next_run_time else "N/A"],
        }
    except Exception as e:
        st.error(f"❌ Error fetching query details: {str(e)}")
        return None

# Function to trigger a real-time backfill WITHOUT CHANGING DATA ORDER
def trigger_backfill(query_id):
    try:
        client = get_transfer_client()
        if not client:
            return False

        run_request = bigquery_datatransfer.StartManualTransferRunsRequest(
            parent=query_id,
            requested_run_time=datetime.utcnow().replace(tzinfo=timezone.utc),  # Real-time execution
        )

        response = client.start_manual_transfer_runs(request=run_request)

        # Explicitly log run details without modifying data order
        return response
    except Exception as e:
        st.error(f"❌ Error scheduling backfill: {str(e)}")
        return None

# Streamlit UI
st.markdown(sq_title, unsafe_allow_html = True)
st.write("")
st.markdown(tooltip_message_sq, unsafe_allow_html = True)
st.write("")
st.markdown(sq_placeholder,unsafe_allow_html = True)

# List Scheduled Queries
st.write("")
queries = list_scheduled_queries()
query_options = {query.display_name: query.name for query in queries} if queries else {}

selected_query = st.selectbox("", [""] + list(query_options.keys()))


if selected_query:
    query_id = query_options[selected_query]
    details = get_query_details(query_id)

    if details:
        details_df = pd.DataFrame(details)  # Convert dictionary to DataFrame
        st.dataframe(details_df)  # Display DataFrame properly

        # Backfill Button
        st.markdown(button_styles, unsafe_allow_html = True)
        if st.button("Schedule Backfill"):
            response = trigger_backfill(query_id)
            if response:
                st.success(f"βœ… Backfill successfully scheduled for `{selected_query}`!")
            else:
                st.error("⚠️ Failed to schedule backfill!")