""" Google Cloud Util Functions: - Upload to Google Storage Bucket - Delete from Google Storage Bucket - Generate signed URL """ import os import datetime from google.cloud import storage from google.cloud.storage.blob import Blob ## Google Storage Client client = storage.Client() def upload_blob(bucket_name, source_file_name, destination_blob_name, content_type='', algo_unique_key=''): """Uploading File to Google Storage Bucket Args: bucket_name (str): Google Storage Bucket Name source_file_name (str): Local Absolute Filpath to upload destination_blob_name (str): File Name used to store in bucket content_type (str, optional): The content type of the file being uploaded algo_unique_key (str, optional): [description]. Defaults to ''. Algorithmia Data Source Bucket Unique Key Returns: [str]: Google Storage Object URI, if algorithmia key is given the same is modified. """ bucket = client.get_bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(source_file_name, content_type=content_type) data_uri = blob.self_link if algo_unique_key!="": return os.path.join("gs+{}://{}".format(algo_unique_key, bucket_name), data_uri.split("/")[-1]) return os.path.join("gs://{}".format(bucket_name), data_uri.split("/")[-1]) def delete_blob(bucket_name, blob_name): """Deletes a blob from the bucket.""" # bucket_name = "your-bucket-name" # blob_name = "your-object-name" bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) blob.delete() print("Blob {} deleted.".format(blob_name)) def download_video(bucket_name, filename, output_filename): bucket = client.get_bucket(bucket_name) # Create a blob object from the filepath blob = bucket.blob(filename) # Download the file to a destination blob.download_to_filename(output_filename) return output_filename def generate_signed_url(output_uri): expiration_time = datetime.timedelta(minutes=5) blob = Blob.from_string(output_uri, client=client) signed_url = blob.generate_signed_url(expiration=expiration_time, version='v4', response_disposition='attachment') return signed_url