import os import pandas as pd import streamlit as st from google.cloud import bigquery, dataform_v1beta1 from utils import load_gcp_credentials, upload_to_bigquery, authenticate_bigquery from html_templates import ( logo, BigQuery_upload_title, table_id_placeholder, uploader, button_styles, tooltip_message_bq, recon_title ) import json from google.cloud.dataform_v1beta1.services.dataform import DataformClient from google.cloud.dataform_v1beta1.types import SearchFilesRequest from google.cloud import dataform_v1beta1 from streamlit_option_menu import option_menu # --- Page Configuration --- st.set_page_config( page_title="Pipeline flow", page_icon="💰", layout="wide", initial_sidebar_state="expanded" ) st.markdown(logo, unsafe_allow_html=True) st.logo("alerter_4.jpeg") # --- Custom CSS for better aesthetics --- st.markdown(""" """, unsafe_allow_html=True) st.markdown("
Access and run financial reconciliation pipelines
", unsafe_allow_html=True) # --- GCP Credential Setup --- with st.container(): gcp_credentials = load_gcp_credentials() if not gcp_credentials: st.warning("⚠️ Please upload your GCP credentials to continue.") else: st.write("") st.markdown("", unsafe_allow_html=True) if gcp_credentials: bigquery_creds = authenticate_bigquery() client = bigquery.Client(credentials=bigquery_creds) # --- Tab Selection with styled option menu --- selected = option_menu( menu_title=None, options=["Recon Pipeline", "Partner Collection"], icons=["graph-up", "collection"], menu_icon="cast", default_index=0, orientation="horizontal", styles={ "container": { "padding": "0px", "background-color": "#800000", "border-radius": "8px", "margin-bottom": "20px", "width": "100%", # Ensure container uses full width # Remove any unwanted margin or padding that might create white space "margin": "0px", }, "icon": { "color": "#ffffff", "font-size": "20px" }, "nav-link": { "font-size": "18px", "text-align": "center", "margin": "0px", "padding": "12px 15px", "color": "#ffffff", "background-color": "#800000", "border-radius": "0px", "width": "100%", # Ensure links use full width }, "nav-link-selected": { "background-color": "#a52a2a", "color": "#ffffff", "font-weight": "bold", "border-bottom": "3px solid #ffffff", }, }, ) # --- Common Functions --- def compile_workspace(project_id, region, repo_id, workspace_id, credentials): df_client = dataform_v1beta1.DataformClient(credentials=credentials) parent = f"projects/{project_id}/locations/{region}/repositories/{repo_id}" workspace_path = f"{parent}/workspaces/{workspace_id}" compilation_result = dataform_v1beta1.CompilationResult( git_commitish="HEAD", workspace=workspace_path ) response = df_client.create_compilation_result( parent=parent, compilation_result=compilation_result ) return response.name def run_dataform_workflow(project_id, region, repo_id, compilation_result_name, credentials): df_client = dataform_v1beta1.DataformClient(credentials=credentials) parent = f"projects/{project_id}/locations/{region}/repositories/{repo_id}" invocation = dataform_v1beta1.WorkflowInvocation( compilation_result=compilation_result_name ) run_response = df_client.create_workflow_invocation( parent=parent, workflow_invocation=invocation ) return run_response.name def list_staging_queries(project_id, region, repo_id, workspace_id, credentials): df_client = dataform_v1beta1.DataformClient(credentials=credentials) workspace_path = f"projects/{project_id}/locations/{region}/repositories/{repo_id}/workspaces/{workspace_id}" directory_path = "definitions/staging" request = dataform_v1beta1.QueryDirectoryContentsRequest( workspace=workspace_path, path=directory_path ) query_files = [] response = df_client.query_directory_contents(request=request) for entry in response.directory_entries: if entry.file and entry.file.endswith(".sqlx"): clean_name = entry.file.split('/')[-1].replace(".sqlx", "") query_files.append(clean_name) return query_files def display_pipeline_ui(project_id, region, repo_id, workspace_id): st.markdown("Trigger the complete dataform workflow with all queries in proper sequence.
""", unsafe_allow_html=True) if st.button(" ⚡ Run Complete Workflow", key=f"run_full_{workspace_id}"): try: with st.spinner("Compiling workspace and triggering workflow..."): compilation_result_name = compile_workspace( project_id, region, repo_id, workspace_id, bigquery_creds ) invocation_name = run_dataform_workflow( project_id, region, repo_id, compilation_result_name, bigquery_creds ) st.markdown(f""" """, unsafe_allow_html=True) except Exception as e: st.markdown(f""" """, unsafe_allow_html=True) with col2: st.markdown("### 🔍 Run Individual Query") st.markdown("""Select and run a specific query from the staging directory without running the entire pipeline.
""", unsafe_allow_html=True) try: staging_queries = list_staging_queries(project_id, region, repo_id, workspace_id, bigquery_creds) selected_query = st.selectbox("Select a query to run", staging_queries, key=f"select_{workspace_id}") if st.button(f"▶️ Run Query: {selected_query}", key=f"run_query_{workspace_id}"): try: with st.spinner(f"Running `{selected_query}`..."): compilation_result_name = compile_workspace( project_id, region, repo_id, workspace_id, bigquery_creds ) df_client = dataform_v1beta1.DataformClient(credentials=bigquery_creds) parent = f"projects/{project_id}/locations/{region}/repositories/{repo_id}" invocation = dataform_v1beta1.WorkflowInvocation( compilation_result=compilation_result_name, # included_targets=[ # dataform_v1beta1.Target( # name=selected_query # ) # ] ) run_response = df_client.create_workflow_invocation( parent=parent, workflow_invocation=invocation ) st.markdown(f""" """, unsafe_allow_html=True) except Exception as e: st.markdown(f""" """, unsafe_allow_html=True) except Exception as e: st.markdown(f""" """, unsafe_allow_html=True) st.markdown("Fynd Finance recon pipeline that contains queries related to payouts, disbursement & collection.
""", unsafe_allow_html=True) # Project info in a nice card col1, col2, col3 = st.columns(3) with col1: st.markdown("""fynd-db
finance_recon_pipeline_asia
asia-south1
PG partner pipleine that contains queries related to Delhivery Partner's awb wise details.
""", unsafe_allow_html=True) # Project info in a nice card col1, col2, col3 = st.columns(3) with col1: st.markdown("""fynd-db
partner_collection_pipeline
asia-south1
© 2025 Finance-pipeline-respository: fynd-db. All Rights Reserved.