# Databricks notebook source # pip install google-cloud-bigquery # COMMAND ---------- !pip install google-cloud-storage # COMMAND ---------- # MAGIC %run ./Read_Config # COMMAND ---------- # MAGIC %run ./pg_Connect # COMMAND ---------- from google.cloud import bigquery from google.oauth2 import service_account from pyspark.sql.functions import concat_ws import os import io import random from google.cloud import storage import hashlib import gzip import sys import datetime import pandas as pd # COMMAND ---------- def create_target_bucket(extract_id): query = f"""select storage_bucket_name from lcef_rule.extract_reg where extract_reg_id={extract_id}""" result = execute_sql(query) bucket_name = str(result[0]['storage_bucket_name']) if bucket_name == '' or bucket_name == None: bucket_name = config.get('TARGET_BUCKET') print(f'result-{result}') print(f"result_subc-{bucket_name}") #env = os.environ.get('CICD_ENV') # if env.upper() != 'PROD': # bucket_name= f"""{env.lower()}-{bucket_name}""" return bucket_name.replace('_', '-').replace(' ', '').lower() def file_delivery_suffix(extract_id): query = f"""select file_delivery_suffix from lcef_rule.extract_reg where extract_reg_id={extract_id}""" result = execute_sql(query) #print(f'result-{result}') print(f"result_subc-{result[0]['file_delivery_suffix']}") return True if result[0]['file_delivery_suffix'] == '.csv.gz' else False def get_file_name(extract_id): query = f"""select extract_file_name from lcef_rule.extract_file_type where extract_reg_id={extract_id} AND extract_file_name is not NULL""" result = execute_sql(query) try: file_name = result[0]['extract_file_name'] except IndexError: return f"extract_{extract_id}" if file_name == '' or file_name == None: file_name = f"extract_{extract_id}" print(f'file name result-{result}') return file_name def getExtractinsert(extract_id, num): query = f""" select extract_file_type_id from lcef_rule.extract_reg eg, lcef_rule.extract_file_type eft where eg.extract_reg_id=eft.extract_reg_id and eg.extract_reg_id={extract_id}""" print(query) result = execute_sql(query) print(f"{id} LOGGING: Extract insert results = {result}") return result[num]['extract_file_type_id'] def insertExtractData(id, name, eid): query = f"""insert into lcef_rule.extract_file_type_instance(extract_file_type_id,extract_file_instance_name,extract_instance_id) values({id},'{name}',{eid}) returning extract_file_type_instance_id""" res2 = execute_sql(query) print(f"{id} LOGGING: Extract insertinto results = {res2}") return res2[0]["extract_file_type_instance_id"] # COMMAND ---------- def insertEIData(id, eid, name): if id == '' or id == None or eid == '' or eid == None: return 'Extract id/instance cannot be null' else: query = f"""update lcef_rule.extract_instance set file_delivery_name='{name}' where extract_reg_id={id} and extract_instance_id={eid} returning extract_instance_id""" res2 = execute_sql(query) print(f"{id} LOGGING: Extract insertinto results = {res2}") return res2[0]["extract_instance_id"] def getExtractInstance(id, run_id): query = f"""select extract_instance_id from lcef_rule.extract_instance WHERE composer_run_id ='{run_id}' and extract_reg_id={id}""" result = execute_sql(query) print(f"extract_instance_id-{result[0]['extract_instance_id']}") return result[0]['extract_instance_id'] def getInfosheetInd(extract_id): query = f"""select infosheet_ind from lcef_rule.extract_reg where extract_reg_id={extract_id}""" result = execute_sql(query) print(f"query: {query}") return result[0]['infosheet_ind'] def getSortData(extract_id): query = f"""select column_name, db_column,ecl.column_alias,ecl.column_seq,ecl.sort_rank from lcef_rule.extract_col_list ecl, lcef_rule.extract_file_type eft , lcef_rule.file_column fc where ecl.extract_file_type_id = eft.extract_file_type_id and fc.file_column_id = ecl.file_column_id and eft.extract_reg_id={extract_id}""" result = execute_sql(query) return result def getSplitColumns(extract_id): query = f"""select db_column from lcef_rule.extract_col_list ecl, lcef_rule.extract_file_type eft , lcef_rule.file_column fc where ecl.extract_file_type_id = eft.extract_file_type_id and fc.file_column_id = ecl.file_column_id and eft.extract_reg_id={extract_id} and ecl.file_split_rank = 1""" result = execute_sql(query) return result # COMMAND ---------- # from pyspark.sql import SparkSession # from pyspark.sql.types import * # # # Create a spark session # spark = SparkSession.builder.appName('Empty_Dataframe').getOrCreate() # # # Create an empty RDD # emp_RDD = spark.sparkContext.emptyRDD() # # # Create an expected schema # columns1 = StructType([StructField('concatenated_column',StringType(), True)]) # # # Create an empty RDD with expected schema # empty_df = spark.createDataFrame(data = emp_RDD,schema = columns1) # # # Print the dataframe # print('Dataframe :') # empty_df.show() # COMMAND ---------- #import pandas as pd # ## create an Empty DataFrame object #empty_df = pd.DataFrame() # #print(empty_df) # COMMAND ---------- from pyspark.sql.functions import when, lit def deliver_bq_table(id, run_id, compress=False, infosheet_ind=False): global empty_df bq_location = config.get('BQ_LOCATION') bq_project = config.get('project') source_dataset = config.get('SOURCE_BQ_DATASET') staging_dataset = config.get('STAGING_BQ_DATASET') bloblist = [] project = "gcp-edplcef2-t-all-a7g2" # os.environ.get('LCEF_PROJECT_ID') # gcp_bucket = os.environ.get('TARGET_BUCKET') gcp_bucket = create_target_bucket(id) print(f"BUCKET PATH: {gcp_bucket}") bkt_name = gcp_bucket.split("/")[0] bkt_folder = gcp_bucket.split("/")[1] bkt_subfolder = gcp_bucket.split("/")[2] eid = run_id print(f'eid->{eid}') storage_client = storage.Client( project=project) print(f'bucket exists ->{storage_client.bucket(gcp_bucket).exists()}') if not storage_client.bucket(gcp_bucket).exists(): print(f"gcp_bucket:::{bkt_name}") # storage_client.create_bucket(bkt_name,location=bq_location) print(f'bucket created - {storage_client.bucket(gcp_bucket)}') print(f'compress-{compress}') filename_list = [] extract_info_sql = f""" select ecl.file_column_id, fc.column_name, fc.db_column, fc.column_datatype, fc.column_width, pft.file_type, er.file_delivery_format, er.file_delivery_suffix, er.header_row_ind, er.compress_ind, er.infosheet_ind from lcef_rule.extract_col_list ecl join lcef_rule.file_column fc on fc.file_column_id = ecl.file_column_id join lcef_rule.project_file_type pft on fc.project_file_type_id = pft.project_file_type_id join lcef_rule.extract_file_type eft on eft.extract_file_type_id = ecl.extract_file_type_id join lcef_rule.extract_reg er on er.extract_reg_id = eft.extract_reg_id where eft.extract_reg_id = {id} """ extract_cols = execute_sql(extract_info_sql) extract_tables = {} for col in extract_cols: if col['file_type'] not in extract_tables: extract_tables[col['file_type']] = [] extract_tables[col['file_type']].append(col) ext = '.csv' if extract_cols[0]["file_delivery_suffix"] == None else extract_cols[0]["file_delivery_suffix"] file_delivery_format = ', Comma Delimited' if extract_cols[0]["file_delivery_format"] == None else extract_cols[0][ "file_delivery_format"] header_row_ind = True if extract_cols[0]["header_row_ind"] == None else bool(extract_cols[0]["header_row_ind"]) compress_ind = True if extract_cols[0]["compress_ind"] == None else bool(extract_cols[0]["compress_ind"]) infosheet_ind = False if extract_cols[0]["infosheet_ind"] == None else bool(extract_cols[0]["infosheet_ind"]) for file_type in extract_tables.keys(): db_file_name=get_file_name(id) print(f"{db_file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}") full_file_name=f"{db_file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" print(f'full_file_name-{full_file_name}') table = f"{source_dataset}.{file_type}" print(f"table-{table}") df = spark.read.format('bigquery').option('table', table).load() df.createOrReplaceTempView('temp') # change 1: A DataFrameWriter object is created for writing data in XML format # Save data in XML format df_write_xml = df.write.format("com.databricks.spark.xml").option("rootTag", "root").option("rowTag", "row") # change 2: The file extension of the destination URI for the XML data was changed from .csv to .xml destination_uri_xml = "gs://{}/{}.xml".format(gcp_bucket,full_file_name) # change 3: The data is saved to Google Cloud Storage in XML format using the df_write_xml object df_write_xml.option("header", "true").save(destination_uri_xml) print(f"destination_uri_xml-{destination_uri_xml}") # change 4: A separate DataFrameWriter object is created for writing data in CSV format # Save data in CSV format df_write_csv = df.write.format("csv") # change 5: The file extension of the destination URI for the CSV data was set to .csv destination_uri_csv = "gs://{}/{}.csv".format(gcp_bucket,full_file_name) # change 5: The data is saved to Google Cloud Storage in CSV format using the df_write_csv object df_write_csv.option("header", "true").save(destination_uri_csv) print(f"destination_uri_csv-{destination_uri_csv}") filename_list.append(destination_uri_xml) filename_list.append(destination_uri_csv) return filename_list # COMMAND ---------- def deliver_bq_table_split(id, run_id, compress=False, infosheet_ind=False): global empty_df bq_location = config.get('BQ_LOCATION') bq_project = config.get('project') source_dataset = config.get('SOURCE_BQ_DATASET') staging_dataset = config.get('STAGING_BQ_DATASET') bloblist = [] project = "gcp-edplcef2-t-all-a7g2" # os.environ.get('LCEF_PROJECT_ID') # gcp_bucket = os.environ.get('TARGET_BUCKET') gcp_bucket = create_target_bucket(id) print(f"BUCKET PATH: {gcp_bucket}") bkt_name = gcp_bucket.split("/")[0] bkt_folder = gcp_bucket.split("/")[1] bkt_subfolder = gcp_bucket.split("/")[2] eid = run_id print(f'eid->{eid}') storage_client = storage.Client( project=project) print(f'bucket exists ->{storage_client.bucket(gcp_bucket).exists()}') if not storage_client.bucket(gcp_bucket).exists(): print(f"gcp_bucket:::{bkt_name}") # storage_client.create_bucket(bkt_name,location=bq_location) print(f'bucket created - {storage_client.bucket(gcp_bucket)}') print(f'compress-{compress}') filename_list = [] extract_info_sql = f""" select ecl.file_column_id, fc.column_name, fc.db_column, fc.column_datatype, fc.column_width, pft.file_type, er.file_delivery_format, er.file_delivery_suffix, er.header_row_ind, er.compress_ind, er.infosheet_ind from lcef_rule.extract_col_list ecl join lcef_rule.file_column fc on fc.file_column_id = ecl.file_column_id join lcef_rule.project_file_type pft on fc.project_file_type_id = pft.project_file_type_id join lcef_rule.extract_file_type eft on eft.extract_file_type_id = ecl.extract_file_type_id join lcef_rule.extract_reg er on er.extract_reg_id = eft.extract_reg_id where eft.extract_reg_id = {id} """ extract_cols = execute_sql(extract_info_sql) extract_tables = {} for col in extract_cols: if col['file_type'] not in extract_tables: extract_tables[col['file_type']] = [] extract_tables[col['file_type']].append(col) ext = '.csv' if extract_cols[0]["file_delivery_suffix"] == None else extract_cols[0]["file_delivery_suffix"] file_delivery_format = ', Comma Delimited' if extract_cols[0]["file_delivery_format"] == None else extract_cols[0][ "file_delivery_format"] header_row_ind = True if extract_cols[0]["header_row_ind"] == None else bool(extract_cols[0]["header_row_ind"]) compress_ind = True if extract_cols[0]["compress_ind"] == None else bool(extract_cols[0]["compress_ind"]) infosheet_ind = False if extract_cols[0]["infosheet_ind"] == None else bool(extract_cols[0]["infosheet_ind"]) for file_type in extract_tables.keys(): db_file_name=get_file_name(id) print(f"{db_file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}") full_file_name=f"{db_file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" print(f'full_file_name-{full_file_name}') table = f"{source_dataset}.{file_type}" print(f"table-{table}") df = spark.read.format('bigquery').option('table', table).load() df.createOrReplaceTempView('temp') split_columns=getSplitColumns(id) for split_column in split_columns: split_col=split_column['db_column'] distinct_values=spark.sql(f"""select distinct {split_col} from temp""") distinct_values_list=[row[split_col] for row in distinct_values.collect()] for val in distinct_values_list: split_df=spark.sql(f"""select * from temp where {split_col}='{val}'""") split_df.createOrReplaceTempView('temp_split') # Save data in XML format df_write_xml = split_df.write.format("com.databricks.spark.xml").option("rootTag", "root").option("rowTag", "row") destination_uri_xml = "gs://{}/{}_{}.xml".format(gcp_bucket,full_file_name,val) df_write_xml.option("header", "true").save(destination_uri_xml) print(f"destination_uri_xml-{destination_uri_xml}") # Save data in CSV format df_write_csv = split_df.write.format("csv") destination_uri_csv = "gs://{}/{}_{}.csv".format(gcp_bucket,full_file_name,val) df_write_csv.option("header", "true").save(destination_uri_csv) print(f"destination_uri_csv-{destination_uri_csv}") filename_list.append(destination_uri_xml) filename_list.append(destination_uri_csv) return filename_list # COMMAND ---------- def deliver_bq_table_split_by_column(id, run_id, compress=False, infosheet_ind=False): global empty_df bq_location = config.get('BQ_LOCATION') bq_project = config.get('project') source_dataset = config.get('SOURCE_BQ_DATASET') staging_dataset = config.get('STAGING_BQ_DATASET') bloblist = [] project = "gcp-edplcef2-t-all-a7g2" # os.environ.get('LCEF_PROJECT_ID') # gcp_bucket = os.environ.get('TARGET_BUCKET') gcp_bucket = create_target_bucket(id) print(f"BUCKET PATH: {gcp_bucket}") bkt_name = gcp_bucket.split("/")[0] bkt_folder = gcp_bucket.split("/")[1] bkt_subfolder = gcp_bucket.split("/")[2] eid = run_id print(f'eid->{eid}') storage_client = storage.Client( project=project) print(f'bucket exists ->{storage_client.bucket(gcp_bucket).exists()}') if not storage_client.bucket(gcp_bucket).exists(): print(f"gcp_bucket:::{bkt_name}") # storage_client.create_bucket(bkt_name,location=bq_location) print(f'bucket created - {storage_client.bucket(gcp_bucket)}') print(f'compress-{compress}') filename_list = [] extract_info_sql = f""" select ecl.file_column_id, fc.column_name, fc.db_column, fc.column_datatype, fc.column_width, pft.file_type, er.file_delivery_format, er.file_delivery_suffix, er.header_row_ind, er.compress_ind, er.infosheet_ind from lcef_rule.extract_col_list ecl join lcef_rule.file_column fc on fc.file_column_id = ecl.file_column_id join lcef_rule.project_file_type pft on fc.project_file_type_id = pft.project_file_type_id join lcef_rule.extract_file_type eft on eft.extract_file_type_id = ecl.extract_file_type_id join lcef_rule.extract_reg er on er.extract_reg_id = eft.extract_reg_id where eft.extract_reg_id = {id} """ extract_cols = execute_sql(extract_info_sql) extract_tables = {} for col in extract_cols: if col['file_type'] not in extract_tables: extract_tables[col['file_type']] = [] extract_tables[col['file_type']].append(col) ext = '.csv' if extract_cols[0]["file_delivery_suffix"] == None else extract_cols[0]["file_delivery_suffix"] file_delivery_format = ', Comma Delimited' if extract_cols[0]["file_delivery_format"] == None else extract_cols[0][ "file_delivery_format"] header_row_ind = True if extract_cols[0]["header_row_ind"] == None else bool(extract_cols[0]["header_row_ind"]) compress_ind = True if extract_cols[0]["compress_ind"] == None else bool(extract_cols[0]["compress_ind"]) infosheet_ind = False if extract_cols[0]["infosheet_ind"] == None else bool(extract_cols[0]["infosheet_ind"]) for file_type in extract_tables.keys(): db_file_name=get_file_name(id) print(f"{db_file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}") full_file_name=f"{db_file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" print(f'full_file_name-{full_file_name}') table = f"{source_dataset}.{file_type}" print(f"table-{table}") df = spark.read.format('bigquery').option('table', table).load() df.createOrReplaceTempView('temp') split_columns=getSplitColumns(id) for split_column in split_columns: split_col=split_column['db_column'] distinct_values=spark.sql(f"""select distinct {split_col} from temp""") distinct_values_list=[row[split_col] for row in distinct_values.collect()] for val in distinct_values_list: split_df=spark.sql(f"""select * from temp where {split_col}='{val}'""") split_df.createOrReplaceTempView('temp_split') # Save data in XML format df_write_xml = split_df.write.format("com.databricks.spark.xml").option("rootTag", "root").option("rowTag", "row") destination_uri_xml = "gs://{}/{}_{}.xml".format(gcp_bucket,full_file_name,val) df_write_xml.option("header", "true").save(destination_uri_xml) print(f"destination_uri_xml-{destination_uri_xml}") # Save data in CSV format df_write_csv = split_df.write.format("csv") destination_uri_csv = "gs://{}/{}_{}.csv".format(gcp_bucket,full_file_name,val) df_write_csv.option("header", "true").save(destination_uri_csv) print(f"destination_uri_csv-{destination_uri_csv}") filename_list.append(destination_uri_xml) filename_list.append(destination_uri_csv) return filename_list