Alerter_v4.0 / pages /Scheduler.py
Ninad077's picture
Upload 10 files
d8535a4 verified
import os
import json
import streamlit as st
import pandas as pd
from google.cloud import bigquery_datatransfer
from google.cloud import bigquery
from google.oauth2 import service_account
from datetime import datetime, timezone
from html_templates import button_styles, sq_title, sq_placeholder, tooltip_message_sq, logo
st.markdown(logo, unsafe_allow_html=True)
st.logo("alerter_4.jpeg")
# Function to load GCP credentials
def load_gcp_credentials():
try:
gcp_credentials_str = os.getenv('GCP_CREDENTIALS')
if not gcp_credentials_str:
raise ValueError("GCP_CREDENTIALS environment variable not defined")
gcp_credentials = json.loads(gcp_credentials_str)
credentials = service_account.Credentials.from_service_account_info(gcp_credentials)
return credentials
except Exception as e:
st.error(f"❌ Error loading GCP credentials: {str(e)}")
return None
# Function to get BigQuery Data Transfer Client
def get_transfer_client():
credentials = load_gcp_credentials()
if credentials:
return bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)
return None
# Function to list all scheduled queries
@st.cache_data(ttl=600, show_spinner=True)
def list_scheduled_queries():
try:
client = get_transfer_client()
if not client:
return []
project_id = "fynd-db" # Change if necessary
parent = f"projects/{project_id}/locations/asia-south1" # Mumbai Region
transfer_configs = client.list_transfer_configs(parent=parent)
return list(transfer_configs)
except Exception as e:
st.error(f"❌ Error retrieving scheduled queries: {str(e)}")
return []
# Function to get details of a selected scheduled query
@st.cache_data(ttl=600, show_spinner=True)
def get_query_details(query_id):
try:
client = get_transfer_client()
if not client:
return None
query = client.get_transfer_config(name=query_id)
return {
"Display Name": [query.display_name],
"Source": [query.data_source_id],
"Schedule (UTC)": [query.schedule],
"Region": [query.name.split("/")[3]], # Extracting region from query name
"Destination Dataset": [query.destination_dataset_id],
"Next Scheduled Run": [query.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if query.next_run_time else "N/A"],
}
except Exception as e:
st.error(f"❌ Error fetching query details: {str(e)}")
return None
# Function to trigger a real-time backfill WITHOUT CHANGING DATA ORDER
def trigger_backfill(query_id):
try:
client = get_transfer_client()
if not client:
return False
run_request = bigquery_datatransfer.StartManualTransferRunsRequest(
parent=query_id,
requested_run_time=datetime.utcnow().replace(tzinfo=timezone.utc), # Real-time execution
)
response = client.start_manual_transfer_runs(request=run_request)
# Explicitly log run details without modifying data order
return response
except Exception as e:
st.error(f"❌ Error scheduling backfill: {str(e)}")
return None
# Streamlit UI
st.markdown(sq_title, unsafe_allow_html = True)
st.write("")
st.markdown(tooltip_message_sq, unsafe_allow_html = True)
st.write("")
st.markdown(sq_placeholder,unsafe_allow_html = True)
# List Scheduled Queries
st.write("")
queries = list_scheduled_queries()
query_options = {query.display_name: query.name for query in queries} if queries else {}
selected_query = st.selectbox("", [""] + list(query_options.keys()))
if selected_query:
query_id = query_options[selected_query]
details = get_query_details(query_id)
if details:
details_df = pd.DataFrame(details) # Convert dictionary to DataFrame
st.dataframe(details_df) # Display DataFrame properly
# Backfill Button
st.markdown(button_styles, unsafe_allow_html = True)
if st.button("Schedule Backfill"):
response = trigger_backfill(query_id)
if response:
st.success(f"βœ… Backfill successfully scheduled for `{selected_query}`!")
else:
st.error("⚠️ Failed to schedule backfill!")