import os import pandas as pd import streamlit as st from google.cloud import bigquery, dataform_v1beta1 from utils import load_gcp_credentials, upload_to_bigquery, authenticate_bigquery from html_templates import ( logo, BigQuery_upload_title, table_id_placeholder, uploader, button_styles, tooltip_message_bq, recon_title ) import json from google.cloud.dataform_v1beta1.services.dataform import DataformClient from google.cloud.dataform_v1beta1.types import SearchFilesRequest from google.cloud import dataform_v1beta1 from streamlit_option_menu import option_menu # --- Page Configuration --- st.set_page_config( page_title="Pipeline flow", page_icon="💰", layout="wide", initial_sidebar_state="expanded" ) st.markdown(logo, unsafe_allow_html=True) st.logo("alerter_4.jpeg") # --- Custom CSS for better aesthetics --- st.markdown(""" """, unsafe_allow_html=True) st.markdown("

⚙️ Dataform

", unsafe_allow_html=True) st.markdown("

Access and run financial reconciliation pipelines

", unsafe_allow_html=True) # --- GCP Credential Setup --- with st.container(): gcp_credentials = load_gcp_credentials() if not gcp_credentials: st.warning("⚠️ Please upload your GCP credentials to continue.") else: st.write("") st.markdown("", unsafe_allow_html=True) if gcp_credentials: bigquery_creds = authenticate_bigquery() client = bigquery.Client(credentials=bigquery_creds) # --- Tab Selection with styled option menu --- selected = option_menu( menu_title=None, options=["Recon Pipeline", "Partner Collection"], icons=["graph-up", "collection"], menu_icon="cast", default_index=0, orientation="horizontal", styles={ "container": { "padding": "0px", "background-color": "#800000", "border-radius": "8px", "margin-bottom": "20px", "width": "100%", # Ensure container uses full width # Remove any unwanted margin or padding that might create white space "margin": "0px", }, "icon": { "color": "#ffffff", "font-size": "20px" }, "nav-link": { "font-size": "18px", "text-align": "center", "margin": "0px", "padding": "12px 15px", "color": "#ffffff", "background-color": "#800000", "border-radius": "0px", "width": "100%", # Ensure links use full width }, "nav-link-selected": { "background-color": "#a52a2a", "color": "#ffffff", "font-weight": "bold", "border-bottom": "3px solid #ffffff", }, }, ) # --- Common Functions --- def compile_workspace(project_id, region, repo_id, workspace_id, credentials): df_client = dataform_v1beta1.DataformClient(credentials=credentials) parent = f"projects/{project_id}/locations/{region}/repositories/{repo_id}" workspace_path = f"{parent}/workspaces/{workspace_id}" compilation_result = dataform_v1beta1.CompilationResult( git_commitish="HEAD", workspace=workspace_path ) response = df_client.create_compilation_result( parent=parent, compilation_result=compilation_result ) return response.name def run_dataform_workflow(project_id, region, repo_id, compilation_result_name, credentials): df_client = dataform_v1beta1.DataformClient(credentials=credentials) parent = f"projects/{project_id}/locations/{region}/repositories/{repo_id}" invocation = dataform_v1beta1.WorkflowInvocation( compilation_result=compilation_result_name ) run_response = df_client.create_workflow_invocation( parent=parent, workflow_invocation=invocation ) return run_response.name def list_staging_queries(project_id, region, repo_id, workspace_id, credentials): df_client = dataform_v1beta1.DataformClient(credentials=credentials) workspace_path = f"projects/{project_id}/locations/{region}/repositories/{repo_id}/workspaces/{workspace_id}" directory_path = "definitions/staging" request = dataform_v1beta1.QueryDirectoryContentsRequest( workspace=workspace_path, path=directory_path ) query_files = [] response = df_client.query_directory_contents(request=request) for entry in response.directory_entries: if entry.file and entry.file.endswith(".sqlx"): clean_name = entry.file.split('/')[-1].replace(".sqlx", "") query_files.append(clean_name) return query_files def display_pipeline_ui(project_id, region, repo_id, workspace_id): st.markdown("
", unsafe_allow_html=True) col1, col2 = st.columns([1, 2]) with col1: st.markdown("### 🔩 Run Full Pipeline") st.markdown("""

Trigger the complete dataform workflow with all queries in proper sequence.

""", unsafe_allow_html=True) if st.button(" ⚡ Run Complete Workflow", key=f"run_full_{workspace_id}"): try: with st.spinner("Compiling workspace and triggering workflow..."): compilation_result_name = compile_workspace( project_id, region, repo_id, workspace_id, bigquery_creds ) invocation_name = run_dataform_workflow( project_id, region, repo_id, compilation_result_name, bigquery_creds ) st.markdown(f"""

✅ Success!

Dataform workflow triggered successfully.
Reference ID: {invocation_name.split('/')[-1]}

""", unsafe_allow_html=True) except Exception as e: st.markdown(f"""

❌ Error

Failed to run Dataform workflow: {str(e)}

""", unsafe_allow_html=True) with col2: st.markdown("### 🔍 Run Individual Query") st.markdown("""

Select and run a specific query from the staging directory without running the entire pipeline.

""", unsafe_allow_html=True) try: staging_queries = list_staging_queries(project_id, region, repo_id, workspace_id, bigquery_creds) selected_query = st.selectbox("Select a query to run", staging_queries, key=f"select_{workspace_id}") if st.button(f"▶️ Run Query: {selected_query}", key=f"run_query_{workspace_id}"): try: with st.spinner(f"Running `{selected_query}`..."): compilation_result_name = compile_workspace( project_id, region, repo_id, workspace_id, bigquery_creds ) df_client = dataform_v1beta1.DataformClient(credentials=bigquery_creds) parent = f"projects/{project_id}/locations/{region}/repositories/{repo_id}" invocation = dataform_v1beta1.WorkflowInvocation( compilation_result=compilation_result_name, # included_targets=[ # dataform_v1beta1.Target( # name=selected_query # ) # ] ) run_response = df_client.create_workflow_invocation( parent=parent, workflow_invocation=invocation ) st.markdown(f"""

✅ Success!

Query `{selected_query}` triggered successfully.
Reference ID: {run_response.name.split('/')[-1]}

""", unsafe_allow_html=True) except Exception as e: st.markdown(f"""

❌ Error

Failed to run selected query: {str(e)}

""", unsafe_allow_html=True) except Exception as e: st.markdown(f"""

❌ Error

Error loading staging queries: {str(e)}

""", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) # --- Pipeline Configuration --- if selected == 'Recon Pipeline': st.markdown("""

Reconciliation Pipeline

Fynd Finance recon pipeline that contains queries related to payouts, disbursement & collection.

""", unsafe_allow_html=True) # Project info in a nice card col1, col2, col3 = st.columns(3) with col1: st.markdown("""

Project

fynd-db

""", unsafe_allow_html=True) with col2: st.markdown("""

Repository

finance_recon_pipeline_asia

""", unsafe_allow_html=True) with col3: st.markdown("""

Region

asia-south1

""", unsafe_allow_html=True) # Display UI components display_pipeline_ui( project_id="fynd-db", region="asia-south1", repo_id="finance_recon_pipeline_asia", workspace_id="recon_pipeline" ) elif selected == 'Partner Collection': st.markdown("""

Partner Collection Pipeline

PG partner pipleine that contains queries related to Delhivery Partner's awb wise details.

""", unsafe_allow_html=True) # Project info in a nice card col1, col2, col3 = st.columns(3) with col1: st.markdown("""

Project

fynd-db

""", unsafe_allow_html=True) with col2: st.markdown("""

Repository

partner_collection_pipeline

""", unsafe_allow_html=True) with col3: st.markdown("""

Region

asia-south1

""", unsafe_allow_html=True) # Display UI components display_pipeline_ui( project_id="fynd-db", region="asia-south1", repo_id="partner_collection_pipeline", workspace_id="partner_collection" ) # Display footer st.markdown("""

© 2025 Finance-pipeline-respository: fynd-db. All Rights Reserved.

""", unsafe_allow_html=True)