Spaces:
Sleeping
Sleeping
| import os | |
| from io import BytesIO | |
| import pandas as pd | |
| from azure.storage.blob import ( | |
| BlobServiceClient, | |
| __version__, | |
| ) | |
| import json | |
| from azure.storage.blob import BlobServiceClient | |
| class AzureBlob: | |
| def __init__(self, str_connection): | |
| self.str_connection = str_connection | |
| self.blob_service_client = BlobServiceClient.from_connection_string(str_connection) | |
| def get_container_client(self, str_container): | |
| container_client = self.blob_service_client.get_container_client(str_container) | |
| return container_client | |
| def get_blob_client(self, str_container, str_blob): | |
| blob_client = self.blob_service_client.get_blob_client(container=str_container, blob=str_blob) | |
| return blob_client | |
| def create_container(self, str_container): | |
| container_client = self.get_container_client(str_container) | |
| if not container_client.exists(): | |
| container_client.create_container() | |
| print('Created container:', str_container) | |
| def get_blob_list(self, str_container): | |
| container_client = self.get_container_client(str_container) | |
| return container_client.list_blobs() | |
| def delete_blob(self, str_container, str_blob): | |
| blob_client = self.get_blob_client(str_container, str_blob) | |
| blob_client.delete_blob() | |
| def copy_blob(self, str_container, str_new_container, str_blob): | |
| original_blob_client = self.get_blob_client(str_container, str_blob) | |
| new_blob_client = self.get_blob_client(str_new_container, str_blob) | |
| copy_operation = new_blob_client.start_copy_from_url(original_blob_client.url) | |
| if copy_operation['copy_status'] == "success": | |
| print(f"Copied {str_blob} to {str_new_container}.") | |
| else: | |
| print(f"Failed to copy {str_blob}.") | |
| def upload_file(self, str_container, str_blob, str_filename, str_filepath): | |
| blob_client = self.get_blob_client(str_container, str_blob) | |
| if blob_client.exists(): | |
| print(f'\nBlob already exists:\n\t{str_filename}') | |
| pass | |
| else: | |
| print("\nUploading to Azure Storage as blob:\n\t" + str_filename) | |
| with open(file=str_filepath, mode="rb") as data: | |
| blob_client.upload_blob(data) | |
| def upload_from_memory(self, str_container, str_blob, data): | |
| blob_client = self.get_blob_client(str_container, str_blob) | |
| if blob_client.exists(): | |
| print(f'\nBlob already exists:\n\t{str_blob}') | |
| pass | |
| else: | |
| print("\nUploading to Azure Storage as blob:\n\t" + str_blob) | |
| blob_client.upload_blob(data) | |
| def overwrite_blob(self, str_container, str_blob, data): | |
| blob_client = self.get_blob_client(str_container, str_blob) | |
| blob_client.upload_blob(data, overwrite=True) | |
| def overwrite_file(self, str_container, str_blob, str_filename, str_filepath): | |
| blob_client = self.get_blob_client(str_container, str_blob) | |
| with open(file=str_filepath, mode="rb") as data: | |
| blob_client.upload_blob(data, overwrite=True) | |
| def get_latest_parquet(self, str_container, str_client, str_view, str_table): | |
| print("Start running get_latest_parquet") | |
| container_client = self.get_container_client(str_container) | |
| # List the blobs in the container | |
| list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") | |
| if str_table == 'so_': | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")] | |
| elif str_table == 'stock_move_': | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/stock_move_") and not blob.name.startswith(f"{str_client}/{str_view}/stock_move_line")] | |
| elif str_table == 'product_': | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/product_") and not blob.name.startswith(f"{str_client}/{str_view}/product_detail")] | |
| else: | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')] | |
| # Sort and get the latest blob | |
| sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) | |
| latest_blob = sorted_blobs[0] | |
| # Download the blob contents into memory | |
| blob_contents = BytesIO() | |
| blob_client = container_client.get_blob_client(latest_blob.name) | |
| blob_client.download_blob().readinto(blob_contents) | |
| blob_contents.seek(0) | |
| print(f"Found parquet file:\t{latest_blob.name}") | |
| return blob_contents | |
| def download_blob(self, str_container, str_blob, str_filepath): | |
| blob_client = self.get_blob_client(str_container, str_blob) | |
| with open(str_filepath, "wb") as my_blob: | |
| blob_client.download_blob().readinto(my_blob) | |
| print(f"Downloaded blob:\t{str_blob}") | |
| def get_latest_parquet_date(self, str_container, str_client, str_view, str_table): | |
| print("Start running get_latest_parquet") | |
| container_client = self.get_container_client(str_container) | |
| # List the blobs in the container | |
| list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") | |
| if str_table == 'so_': | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")] | |
| else: | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')] | |
| # Sort and get the latest blob | |
| sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) | |
| latest_blob = sorted_blobs[0] | |
| latest_parquet_date = latest_blob.name.split('_')[-1].split('.')[0] | |
| print(f"The latest parquet date is\t{latest_parquet_date}") | |
| return latest_parquet_date | |
| def get_latest_json_date(self, str_container, str_client, str_view, str_table): | |
| container_client = self.get_container_client(str_container) | |
| list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')] | |
| # Sort and get the latest blob | |
| sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) | |
| latest_blob = sorted_blobs[0] | |
| sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) | |
| latest_blob = sorted_blobs[0] | |
| latest_json_date = latest_blob.name.split('_')[-1].split('.')[0] | |
| print(f"The latest json date is\t{latest_json_date}") | |
| return latest_json_date | |
| def get_latest_json(self, str_container, str_client, str_view, str_table): | |
| container_client = self.get_container_client(str_container) | |
| list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')] | |
| # Sort and get the latest blob | |
| sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) | |
| latest_blob = sorted_blobs[0] | |
| # Download the blob contents into memory | |
| blob_contents = BytesIO() | |
| blob_client = container_client.get_blob_client(latest_blob.name) | |
| blob_client.download_blob().readinto(blob_contents) | |
| blob_contents.seek(0) | |
| print(f"Found json file:\t{latest_blob.name}") | |
| return blob_contents | |
| def get_latest_csv(self, str_container, str_client, str_view, str_table): | |
| container_client = self.get_container_client(str_container) | |
| list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.csv')] | |
| # Sort and get the latest blob | |
| sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) | |
| latest_blob = sorted_blobs[0] | |
| # Download the blob contents into memory | |
| blob_contents = BytesIO() | |
| blob_client = container_client.get_blob_client(latest_blob.name) | |
| blob_client.download_blob().readinto(blob_contents) | |
| blob_contents.seek(0) | |
| print(f"Found csv file:\t{latest_blob.name}") | |
| return blob_contents | |
| def get_latest_sql(self, str_container, str_client, str_view, str_table): | |
| container_client = self.get_container_client(str_container) | |
| list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}.sql") | |
| list_parquet_blob = [blob for blob in list_blob] | |
| # Sort and get the latest blob | |
| sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) | |
| latest_blob = sorted_blobs[0] | |
| # Download the blob contents into memory | |
| blob_contents = BytesIO() | |
| blob_client = container_client.get_blob_client(latest_blob.name) | |
| blob_client.download_blob().readinto(blob_contents) | |
| blob_contents.seek(0) | |
| print(f"Found sql file:\t{latest_blob.name}") | |
| return blob_contents | |
| def get_analytic(self, str_container, str_view, str_table): | |
| container_client = self.get_container_client(str_container) | |
| list_blob = container_client.list_blobs(name_starts_with=f"{str_view}/{str_table}") | |
| if str_table == 'membership_tag': | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json') and blob.name.startswith(f"{str_view}/membership_tag") and not blob.name.startswith(f"{str_view}/membership_tag_config_change_log")] | |
| else: | |
| list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')] | |
| # Sort and get the latest blob | |
| sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) | |
| latest_blob = sorted_blobs[0] | |
| # Download the blob contents into memory | |
| blob_contents = BytesIO() | |
| blob_client = container_client.get_blob_client(latest_blob.name) | |
| blob_client.download_blob().readinto(blob_contents) | |
| blob_contents.seek(0) | |
| print(f"Found parquet file:\t{latest_blob.name}") | |
| return blob_contents | |
| def get_last_modified(self, str_container, str_client, str_view, str_table, str_format): | |
| container_client = self.get_container_client(str_container) | |
| list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") | |
| list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")] | |
| # Sort and get the latest blob | |
| sorted_blobs = sorted(list_required_blob, key=lambda b: b.name, reverse=True) | |
| latest_blob = sorted_blobs[0] | |
| last_modified = latest_blob.last_modified | |
| print(f"Last modified date:\t{last_modified}") | |
| return last_modified | |
| def get_file_list(self, str_container, str_client, str_view, str_table, str_format): | |
| container_client = self.get_container_client(str_container) | |
| list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") | |
| list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")] | |
| return list_required_blob | |