import os import pandas as pd import streamlit as st from google.cloud import bigquery from utils import load_gcp_credentials, upload_to_bigquery, authenticate_bigquery from html_templates import logo, BigQuery_upload_title, table_id_placeholder, uploader, button_styles, tooltip_message_bq, recon_title import json st.markdown(logo, unsafe_allow_html=True) st.logo("alerter_4.jpeg") st.markdown(BigQuery_upload_title, unsafe_allow_html=True) st.write("") st.write("") #Tooltip st.markdown(tooltip_message_bq, unsafe_allow_html=True) # Display input for BigQuery table ID st.markdown(table_id_placeholder, unsafe_allow_html=True) table_id = st.text_input("", key="table_id_name", help="Enter the BigQuery table ID where you want to upload the data. Example: project_id.dataset_id.table_id") # Retrieve the service account credentials gcp_credentials = load_gcp_credentials() if gcp_credentials: bigquery_creds = authenticate_bigquery() client = bigquery.Client(credentials=bigquery_creds) st.markdown(uploader, unsafe_allow_html=True) uploaded_file = st.file_uploader("", type=["csv", "xlsx"], help = "Upload the CSV/Excel file you need to upload in the BQ table") if uploaded_file is not None: if uploaded_file.type in ['application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet']: df = pd.read_excel(uploaded_file) else: df = pd.read_csv(uploaded_file) # Preview the DataFrame st.write(df) # Convert DataFrame to JSON try: json_data = df.to_json(orient="records", lines=True) st.write("") except Exception as e: st.error(f"Error converting DataFrame to JSON: {e}") st.stop() # Submit to BigQuery st.markdown(button_styles, unsafe_allow_html = True) if st.button("Submit"): try: # Configure BigQuery job job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, write_disposition=bigquery.WriteDisposition.WRITE_APPEND, ) # Parse JSON data into lines json_lines = [json.loads(line) for line in json_data.splitlines()] # Load JSON data into BigQuery job = client.load_table_from_json( json_lines, table_id, job_config=job_config ) job.result() # Wait for the job to complete records_inserted = job.output_rows record_word = "record" if records_inserted == 1 else "records" st.success(f"{records_inserted} {record_word} inserted into BigQuery") except Exception as e: st.error(f"Error uploading data to BigQuery: {e}")