import os import json import streamlit as st import pandas as pd from google.cloud import bigquery_datatransfer from google.cloud import bigquery from google.oauth2 import service_account from datetime import datetime, timezone from html_templates import button_styles, sq_title, sq_placeholder, tooltip_message_sq, logo st.markdown(logo, unsafe_allow_html=True) st.logo("alerter_4.jpeg") # Function to load GCP credentials def load_gcp_credentials(): try: gcp_credentials_str = os.getenv('GCP_CREDENTIALS') if not gcp_credentials_str: raise ValueError("GCP_CREDENTIALS environment variable not defined") gcp_credentials = json.loads(gcp_credentials_str) credentials = service_account.Credentials.from_service_account_info(gcp_credentials) return credentials except Exception as e: st.error(f"❌ Error loading GCP credentials: {str(e)}") return None # Function to get BigQuery Data Transfer Client def get_transfer_client(): credentials = load_gcp_credentials() if credentials: return bigquery_datatransfer.DataTransferServiceClient(credentials=credentials) return None # Function to list all scheduled queries @st.cache_data(ttl=600, show_spinner=True) def list_scheduled_queries(): try: client = get_transfer_client() if not client: return [] project_id = "fynd-db" # Change if necessary parent = f"projects/{project_id}/locations/asia-south1" # Mumbai Region transfer_configs = client.list_transfer_configs(parent=parent) return list(transfer_configs) except Exception as e: st.error(f"❌ Error retrieving scheduled queries: {str(e)}") return [] # Function to get details of a selected scheduled query @st.cache_data(ttl=600, show_spinner=True) def get_query_details(query_id): try: client = get_transfer_client() if not client: return None query = client.get_transfer_config(name=query_id) return { "Display Name": [query.display_name], "Source": [query.data_source_id], "Schedule (UTC)": [query.schedule], "Region": [query.name.split("/")[3]], # Extracting region from query name "Destination Dataset": [query.destination_dataset_id], "Next Scheduled Run": [query.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if query.next_run_time else "N/A"], } except Exception as e: st.error(f"❌ Error fetching query details: {str(e)}") return None # Function to trigger a real-time backfill WITHOUT CHANGING DATA ORDER def trigger_backfill(query_id): try: client = get_transfer_client() if not client: return False run_request = bigquery_datatransfer.StartManualTransferRunsRequest( parent=query_id, requested_run_time=datetime.utcnow().replace(tzinfo=timezone.utc), # Real-time execution ) response = client.start_manual_transfer_runs(request=run_request) # Explicitly log run details without modifying data order return response except Exception as e: st.error(f"❌ Error scheduling backfill: {str(e)}") return None # Streamlit UI st.markdown(sq_title, unsafe_allow_html = True) st.write("") st.markdown(tooltip_message_sq, unsafe_allow_html = True) st.write("") st.markdown(sq_placeholder,unsafe_allow_html = True) # List Scheduled Queries st.write("") queries = list_scheduled_queries() query_options = {query.display_name: query.name for query in queries} if queries else {} selected_query = st.selectbox("", [""] + list(query_options.keys())) if selected_query: query_id = query_options[selected_query] details = get_query_details(query_id) if details: details_df = pd.DataFrame(details) # Convert dictionary to DataFrame st.dataframe(details_df) # Display DataFrame properly # Backfill Button st.markdown(button_styles, unsafe_allow_html = True) if st.button("Schedule Backfill"): response = trigger_backfill(query_id) if response: st.success(f"✅ Backfill successfully scheduled for `{selected_query}`!") else: st.error("⚠️ Failed to schedule backfill!")