Alerter_v4.0 / pages /Dataform.py
Ninad077's picture
Upload 10 files
d8535a4 verified
import os
import pandas as pd
import streamlit as st
from google.cloud import bigquery, dataform_v1beta1
from utils import load_gcp_credentials, upload_to_bigquery, authenticate_bigquery
from html_templates import (
logo,
BigQuery_upload_title,
table_id_placeholder,
uploader,
button_styles,
tooltip_message_bq,
recon_title
)
import json
from google.cloud.dataform_v1beta1.services.dataform import DataformClient
from google.cloud.dataform_v1beta1.types import SearchFilesRequest
from google.cloud import dataform_v1beta1
from streamlit_option_menu import option_menu
# --- Page Configuration ---
st.set_page_config(
page_title="Pipeline flow",
page_icon="πŸ’°",
layout="wide",
initial_sidebar_state="expanded"
)
st.markdown(logo, unsafe_allow_html=True)
st.logo("alerter_4.jpeg")
# --- Custom CSS for better aesthetics ---
st.markdown("""
<style>
.stButton>button {
background-color: #800000;
color: white;
border: none;
border-radius: 4px;
padding: 0.5rem 1rem;
font-weight: bold;
transition: all 0.3s;
}
.stButton>button:hover {
background-color: #a52a2a;
box-shadow: 0 4px 8px rgba(0,0,0,0.2);
transform: translateY(-2px);
}
.stSelectbox>div>div {
background-color: #ffffff;
border-radius: 4px;
border: 1px solid #e0e0e0;
}
h1, h2, h3 {
color: #800000 !important;
font-family: 'Arial', sans-serif;
}
.success-message {
background-color: #d4edda;
color: #155724;
padding: 10px;
border-radius: 5px;
border-left: 5px solid #28a745;
margin: 10px 0;
}
.error-message {
background-color: #f8d7da;
color: #721c24;
padding: 10px;
border-radius: 5px;
border-left: 5px solid #dc3545;
margin: 10px 0;
}
.card {
background-color: white;
border-radius: 10px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
padding: 20px;
margin-bottom: 20px;
}
.header-container {
display: flex;
align-items: center;
padding: 1rem;
background-color: #ffffff;
border-radius: 10px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
margin-bottom: 2rem;
}
.logo-img {
max-height: 60px;
margin-right: 20px;
}
.header-text {
flex-grow: 1;
}
div.stStreamlitApp div.stHorizontalBlock {
padding: 0 !important;
background-color: #800000 !important;
}
/* Target the option menu's parent container */
div.row-widget.stHorizontalBlock {
background-color: #800000 !important;
padding: 0 !important;
border-radius: 8px;
}
/* Ensure no margins/paddings on surrounding elements */
.stRadio > div, .stSelectbox > div {
margin-bottom: 0 !important;
padding-bottom: 0 !important;
}
</style>
""", unsafe_allow_html=True)
st.markdown("<h1 style='margin-top: 10px;'> βš™οΈ Dataform</h1>", unsafe_allow_html=True)
st.markdown("<p style='color: #555555;'>Access and run financial reconciliation pipelines</p>", unsafe_allow_html=True)
# --- GCP Credential Setup ---
with st.container():
gcp_credentials = load_gcp_credentials()
if not gcp_credentials:
st.warning("⚠️ Please upload your GCP credentials to continue.")
else:
st.write("")
st.markdown("</div>", unsafe_allow_html=True)
if gcp_credentials:
bigquery_creds = authenticate_bigquery()
client = bigquery.Client(credentials=bigquery_creds)
# --- Tab Selection with styled option menu ---
selected = option_menu(
menu_title=None,
options=["Recon Pipeline", "Partner Collection"],
icons=["graph-up", "collection"],
menu_icon="cast",
default_index=0,
orientation="horizontal",
styles={
"container": {
"padding": "0px",
"background-color": "#800000",
"border-radius": "8px",
"margin-bottom": "20px",
"width": "100%", # Ensure container uses full width
# Remove any unwanted margin or padding that might create white space
"margin": "0px",
},
"icon": {
"color": "#ffffff",
"font-size": "20px"
},
"nav-link": {
"font-size": "18px",
"text-align": "center",
"margin": "0px",
"padding": "12px 15px",
"color": "#ffffff",
"background-color": "#800000",
"border-radius": "0px",
"width": "100%", # Ensure links use full width
},
"nav-link-selected": {
"background-color": "#a52a2a",
"color": "#ffffff",
"font-weight": "bold",
"border-bottom": "3px solid #ffffff",
},
},
)
# --- Common Functions ---
def compile_workspace(project_id, region, repo_id, workspace_id, credentials):
df_client = dataform_v1beta1.DataformClient(credentials=credentials)
parent = f"projects/{project_id}/locations/{region}/repositories/{repo_id}"
workspace_path = f"{parent}/workspaces/{workspace_id}"
compilation_result = dataform_v1beta1.CompilationResult(
git_commitish="HEAD",
workspace=workspace_path
)
response = df_client.create_compilation_result(
parent=parent,
compilation_result=compilation_result
)
return response.name
def run_dataform_workflow(project_id, region, repo_id, compilation_result_name, credentials):
df_client = dataform_v1beta1.DataformClient(credentials=credentials)
parent = f"projects/{project_id}/locations/{region}/repositories/{repo_id}"
invocation = dataform_v1beta1.WorkflowInvocation(
compilation_result=compilation_result_name
)
run_response = df_client.create_workflow_invocation(
parent=parent,
workflow_invocation=invocation
)
return run_response.name
def list_staging_queries(project_id, region, repo_id, workspace_id, credentials):
df_client = dataform_v1beta1.DataformClient(credentials=credentials)
workspace_path = f"projects/{project_id}/locations/{region}/repositories/{repo_id}/workspaces/{workspace_id}"
directory_path = "definitions/staging"
request = dataform_v1beta1.QueryDirectoryContentsRequest(
workspace=workspace_path,
path=directory_path
)
query_files = []
response = df_client.query_directory_contents(request=request)
for entry in response.directory_entries:
if entry.file and entry.file.endswith(".sqlx"):
clean_name = entry.file.split('/')[-1].replace(".sqlx", "")
query_files.append(clean_name)
return query_files
def display_pipeline_ui(project_id, region, repo_id, workspace_id):
st.markdown("<div class='card'>", unsafe_allow_html=True)
col1, col2 = st.columns([1, 2])
with col1:
st.markdown("### πŸ”© Run Full Pipeline")
st.markdown("""
<p style='color: #555555; font-size: 14px;'>
Trigger the complete dataform workflow with all queries in proper sequence.
</p>
""", unsafe_allow_html=True)
if st.button(" ⚑ Run Complete Workflow", key=f"run_full_{workspace_id}"):
try:
with st.spinner("Compiling workspace and triggering workflow..."):
compilation_result_name = compile_workspace(
project_id, region, repo_id, workspace_id, bigquery_creds
)
invocation_name = run_dataform_workflow(
project_id, region, repo_id, compilation_result_name, bigquery_creds
)
st.markdown(f"""
<div class='success-message'>
<h4>βœ… Success!</h4>
<p>Dataform workflow triggered successfully.<br>
<span style='font-size: 12px;'>Reference ID: {invocation_name.split('/')[-1]}</span></p>
</div>
""", unsafe_allow_html=True)
except Exception as e:
st.markdown(f"""
<div class='error-message'>
<h4>❌ Error</h4>
<p>Failed to run Dataform workflow: {str(e)}</p>
</div>
""", unsafe_allow_html=True)
with col2:
st.markdown("### πŸ” Run Individual Query")
st.markdown("""
<p style='color: #555555; font-size: 14px;'>
Select and run a specific query from the staging directory without running the entire pipeline.
</p>
""", unsafe_allow_html=True)
try:
staging_queries = list_staging_queries(project_id, region, repo_id, workspace_id, bigquery_creds)
selected_query = st.selectbox("Select a query to run", staging_queries, key=f"select_{workspace_id}")
if st.button(f"▢️ Run Query: {selected_query}", key=f"run_query_{workspace_id}"):
try:
with st.spinner(f"Running `{selected_query}`..."):
compilation_result_name = compile_workspace(
project_id, region, repo_id, workspace_id, bigquery_creds
)
df_client = dataform_v1beta1.DataformClient(credentials=bigquery_creds)
parent = f"projects/{project_id}/locations/{region}/repositories/{repo_id}"
invocation = dataform_v1beta1.WorkflowInvocation(
compilation_result=compilation_result_name,
# included_targets=[
# dataform_v1beta1.Target(
# name=selected_query
# )
# ]
)
run_response = df_client.create_workflow_invocation(
parent=parent,
workflow_invocation=invocation
)
st.markdown(f"""
<div class='success-message'>
<h4>βœ… Success!</h4>
<p>Query `{selected_query}` triggered successfully.<br>
<span style='font-size: 12px;'>Reference ID: {run_response.name.split('/')[-1]}</span></p>
</div>
""", unsafe_allow_html=True)
except Exception as e:
st.markdown(f"""
<div class='error-message'>
<h4>❌ Error</h4>
<p>Failed to run selected query: {str(e)}</p>
</div>
""", unsafe_allow_html=True)
except Exception as e:
st.markdown(f"""
<div class='error-message'>
<h4>❌ Error</h4>
<p>Error loading staging queries: {str(e)}</p>
</div>
""", unsafe_allow_html=True)
st.markdown("</div>", unsafe_allow_html=True)
# --- Pipeline Configuration ---
if selected == 'Recon Pipeline':
st.markdown("""
<h2>Reconciliation Pipeline</h2>
<p style='color: #555555; margin-bottom: 20px;'>
Fynd Finance recon pipeline that contains queries related to payouts, disbursement & collection.
</p>
""", unsafe_allow_html=True)
# Project info in a nice card
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("""
<div style='background-color: #e9ecef; padding: 15px; border-radius: 5px;'>
<h4 style='color: #800000; margin: 0;'>Project</h4>
<p style='margin: 0; font-size: 16px;'>fynd-db</p>
</div>
""", unsafe_allow_html=True)
with col2:
st.markdown("""
<div style='background-color: #e9ecef; padding: 15px; border-radius: 5px;'>
<h4 style='color: #800000; margin: 0;'>Repository</h4>
<p style='margin: 0; font-size: 16px;'>finance_recon_pipeline_asia</p>
</div>
""", unsafe_allow_html=True)
with col3:
st.markdown("""
<div style='background-color: #e9ecef; padding: 15px; border-radius: 5px;'>
<h4 style='color: #800000; margin: 0;'>Region</h4>
<p style='margin: 0; font-size: 16px;'>asia-south1</p>
</div>
""", unsafe_allow_html=True)
# Display UI components
display_pipeline_ui(
project_id="fynd-db",
region="asia-south1",
repo_id="finance_recon_pipeline_asia",
workspace_id="recon_pipeline"
)
elif selected == 'Partner Collection':
st.markdown("""
<h2>Partner Collection Pipeline</h2>
<p style='color: #555555; margin-bottom: 20px;'>
PG partner pipleine that contains queries related to Delhivery Partner's awb wise details.
</p>
""", unsafe_allow_html=True)
# Project info in a nice card
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("""
<div style='background-color: #e9ecef; padding: 15px; border-radius: 5px;'>
<h4 style='color: #800000; margin: 0;'>Project</h4>
<p style='margin: 0; font-size: 16px;'>fynd-db</p>
</div>
""", unsafe_allow_html=True)
with col2:
st.markdown("""
<div style='background-color: #e9ecef; padding: 15px; border-radius: 5px;'>
<h4 style='color: #800000; margin: 0;'>Repository</h4>
<p style='margin: 0; font-size: 16px;'>partner_collection_pipeline</p>
</div>
""", unsafe_allow_html=True)
with col3:
st.markdown("""
<div style='background-color: #e9ecef; padding: 15px; border-radius: 5px;'>
<h4 style='color: #800000; margin: 0;'>Region</h4>
<p style='margin: 0; font-size: 16px;'>asia-south1</p>
</div>
""", unsafe_allow_html=True)
# Display UI components
display_pipeline_ui(
project_id="fynd-db",
region="asia-south1",
repo_id="partner_collection_pipeline",
workspace_id="partner_collection"
)
# Display footer
st.markdown("""
<div style='margin-top: 50px; text-align: center; color: #6c757d; padding: 20px; border-top: 1px solid #e0e0e0;'>
<p>Β© 2025 Finance-pipeline-respository: fynd-db. All Rights Reserved.</p>
</div>
""", unsafe_allow_html=True)