Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import streamlit as st | |
| import pandas as pd | |
| from google.cloud import bigquery_datatransfer | |
| from google.cloud import bigquery | |
| from google.oauth2 import service_account | |
| from datetime import datetime, timezone | |
| from html_templates import button_styles, sq_title, sq_placeholder, tooltip_message_sq, logo | |
| st.markdown(logo, unsafe_allow_html=True) | |
| st.logo("alerter_4.jpeg") | |
| # Function to load GCP credentials | |
| def load_gcp_credentials(): | |
| try: | |
| gcp_credentials_str = os.getenv('GCP_CREDENTIALS') | |
| if not gcp_credentials_str: | |
| raise ValueError("GCP_CREDENTIALS environment variable not defined") | |
| gcp_credentials = json.loads(gcp_credentials_str) | |
| credentials = service_account.Credentials.from_service_account_info(gcp_credentials) | |
| return credentials | |
| except Exception as e: | |
| st.error(f"β Error loading GCP credentials: {str(e)}") | |
| return None | |
| # Function to get BigQuery Data Transfer Client | |
| def get_transfer_client(): | |
| credentials = load_gcp_credentials() | |
| if credentials: | |
| return bigquery_datatransfer.DataTransferServiceClient(credentials=credentials) | |
| return None | |
| # Function to list all scheduled queries | |
| def list_scheduled_queries(): | |
| try: | |
| client = get_transfer_client() | |
| if not client: | |
| return [] | |
| project_id = "fynd-db" # Change if necessary | |
| parent = f"projects/{project_id}/locations/asia-south1" # Mumbai Region | |
| transfer_configs = client.list_transfer_configs(parent=parent) | |
| return list(transfer_configs) | |
| except Exception as e: | |
| st.error(f"β Error retrieving scheduled queries: {str(e)}") | |
| return [] | |
| # Function to get details of a selected scheduled query | |
| def get_query_details(query_id): | |
| try: | |
| client = get_transfer_client() | |
| if not client: | |
| return None | |
| query = client.get_transfer_config(name=query_id) | |
| return { | |
| "Display Name": [query.display_name], | |
| "Source": [query.data_source_id], | |
| "Schedule (UTC)": [query.schedule], | |
| "Region": [query.name.split("/")[3]], # Extracting region from query name | |
| "Destination Dataset": [query.destination_dataset_id], | |
| "Next Scheduled Run": [query.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if query.next_run_time else "N/A"], | |
| } | |
| except Exception as e: | |
| st.error(f"β Error fetching query details: {str(e)}") | |
| return None | |
| # Function to trigger a real-time backfill WITHOUT CHANGING DATA ORDER | |
| def trigger_backfill(query_id): | |
| try: | |
| client = get_transfer_client() | |
| if not client: | |
| return False | |
| run_request = bigquery_datatransfer.StartManualTransferRunsRequest( | |
| parent=query_id, | |
| requested_run_time=datetime.utcnow().replace(tzinfo=timezone.utc), # Real-time execution | |
| ) | |
| response = client.start_manual_transfer_runs(request=run_request) | |
| # Explicitly log run details without modifying data order | |
| return response | |
| except Exception as e: | |
| st.error(f"β Error scheduling backfill: {str(e)}") | |
| return None | |
| # Streamlit UI | |
| st.markdown(sq_title, unsafe_allow_html = True) | |
| st.write("") | |
| st.markdown(tooltip_message_sq, unsafe_allow_html = True) | |
| st.write("") | |
| st.markdown(sq_placeholder,unsafe_allow_html = True) | |
| # List Scheduled Queries | |
| st.write("") | |
| queries = list_scheduled_queries() | |
| query_options = {query.display_name: query.name for query in queries} if queries else {} | |
| selected_query = st.selectbox("", [""] + list(query_options.keys())) | |
| if selected_query: | |
| query_id = query_options[selected_query] | |
| details = get_query_details(query_id) | |
| if details: | |
| details_df = pd.DataFrame(details) # Convert dictionary to DataFrame | |
| st.dataframe(details_df) # Display DataFrame properly | |
| # Backfill Button | |
| st.markdown(button_styles, unsafe_allow_html = True) | |
| if st.button("Schedule Backfill"): | |
| response = trigger_backfill(query_id) | |
| if response: | |
| st.success(f"β Backfill successfully scheduled for `{selected_query}`!") | |
| else: | |
| st.error("β οΈ Failed to schedule backfill!") | |