Spaces:
Sleeping
Sleeping
import os | |
import json | |
import streamlit as st | |
import pandas as pd | |
from google.cloud import bigquery_datatransfer | |
from google.cloud import bigquery | |
from google.oauth2 import service_account | |
from datetime import datetime, timezone | |
from html_templates import button_styles, sq_title, sq_placeholder, tooltip_message_sq, logo | |
st.markdown(logo, unsafe_allow_html=True) | |
st.logo("alerter_4.jpeg") | |
# Function to load GCP credentials | |
def load_gcp_credentials(): | |
try: | |
gcp_credentials_str = os.getenv('GCP_CREDENTIALS') | |
if not gcp_credentials_str: | |
raise ValueError("GCP_CREDENTIALS environment variable not defined") | |
gcp_credentials = json.loads(gcp_credentials_str) | |
credentials = service_account.Credentials.from_service_account_info(gcp_credentials) | |
return credentials | |
except Exception as e: | |
st.error(f"β Error loading GCP credentials: {str(e)}") | |
return None | |
# Function to get BigQuery Data Transfer Client | |
def get_transfer_client(): | |
credentials = load_gcp_credentials() | |
if credentials: | |
return bigquery_datatransfer.DataTransferServiceClient(credentials=credentials) | |
return None | |
# Function to list all scheduled queries | |
def list_scheduled_queries(): | |
try: | |
client = get_transfer_client() | |
if not client: | |
return [] | |
project_id = "fynd-db" # Change if necessary | |
parent = f"projects/{project_id}/locations/asia-south1" # Mumbai Region | |
transfer_configs = client.list_transfer_configs(parent=parent) | |
return list(transfer_configs) | |
except Exception as e: | |
st.error(f"β Error retrieving scheduled queries: {str(e)}") | |
return [] | |
# Function to get details of a selected scheduled query | |
def get_query_details(query_id): | |
try: | |
client = get_transfer_client() | |
if not client: | |
return None | |
query = client.get_transfer_config(name=query_id) | |
return { | |
"Display Name": [query.display_name], | |
"Source": [query.data_source_id], | |
"Schedule (UTC)": [query.schedule], | |
"Region": [query.name.split("/")[3]], # Extracting region from query name | |
"Destination Dataset": [query.destination_dataset_id], | |
"Next Scheduled Run": [query.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if query.next_run_time else "N/A"], | |
} | |
except Exception as e: | |
st.error(f"β Error fetching query details: {str(e)}") | |
return None | |
# Function to trigger a real-time backfill WITHOUT CHANGING DATA ORDER | |
def trigger_backfill(query_id): | |
try: | |
client = get_transfer_client() | |
if not client: | |
return False | |
run_request = bigquery_datatransfer.StartManualTransferRunsRequest( | |
parent=query_id, | |
requested_run_time=datetime.utcnow().replace(tzinfo=timezone.utc), # Real-time execution | |
) | |
response = client.start_manual_transfer_runs(request=run_request) | |
# Explicitly log run details without modifying data order | |
return response | |
except Exception as e: | |
st.error(f"β Error scheduling backfill: {str(e)}") | |
return None | |
# Streamlit UI | |
st.markdown(sq_title, unsafe_allow_html = True) | |
st.write("") | |
st.markdown(tooltip_message_sq, unsafe_allow_html = True) | |
st.write("") | |
st.markdown(sq_placeholder,unsafe_allow_html = True) | |
# List Scheduled Queries | |
st.write("") | |
queries = list_scheduled_queries() | |
query_options = {query.display_name: query.name for query in queries} if queries else {} | |
selected_query = st.selectbox("", [""] + list(query_options.keys())) | |
if selected_query: | |
query_id = query_options[selected_query] | |
details = get_query_details(query_id) | |
if details: | |
details_df = pd.DataFrame(details) # Convert dictionary to DataFrame | |
st.dataframe(details_df) # Display DataFrame properly | |
# Backfill Button | |
st.markdown(button_styles, unsafe_allow_html = True) | |
if st.button("Schedule Backfill"): | |
response = trigger_backfill(query_id) | |
if response: | |
st.success(f"β Backfill successfully scheduled for `{selected_query}`!") | |
else: | |
st.error("β οΈ Failed to schedule backfill!") | |