import json from google.cloud import storage from google.oauth2 import service_account from googleapiclient.http import MediaIoBaseDownload class GoogleCloudStorage: def __init__(self, service_account_key_string): credentials_dict = json.loads(service_account_key_string) credentials = service_account.Credentials.from_service_account_info(credentials_dict) self.client = storage.Client(credentials=credentials, project=credentials_dict['project_id']) def check_file_exists(self, bucket_name, file_name): blob = self.client.bucket(bucket_name).blob(file_name) return blob.exists() def upload_file(self, bucket_name, destination_blob_name, file_path): blob = self.client.bucket(bucket_name).blob(destination_blob_name) blob.upload_from_filename(file_path) print(f"File {file_path} uploaded to {destination_blob_name} in GCS.") def upload_file_as_string(self, bucket_name, destination_blob_name, content): blob = self.client.bucket(bucket_name).blob(destination_blob_name) blob.upload_from_string(content) print(f"String content uploaded to {destination_blob_name} in GCS.") return None def upload_json_string(self, bucket_name, destination_blob_name, json_data): """Uploads a JSON string to a specified GCS bucket.""" blob = self.client.bucket(bucket_name).blob(destination_blob_name) blob.upload_from_string(json_data, content_type='application/json') print(f"JSON string uploaded to {destination_blob_name} in GCS.") def download_as_string(self, bucket_name, source_blob_name): blob = self.client.bucket(bucket_name).blob(source_blob_name) return blob.download_as_text() def make_blob_public(self, bucket_name, blob_name): blob = self.client.bucket(bucket_name).blob(blob_name) blob.make_public() print(f"Blob {blob_name} is now publicly accessible at {blob.public_url}") def get_public_url(self, bucket_name, blob_name): blob = self.client.bucket(bucket_name).blob(blob_name) return blob.public_url def upload_image_and_get_public_url(self, bucket_name, file_name, file_path): self.upload_file(bucket_name, file_name, file_path) self.make_blob_public(bucket_name, file_name) return self.get_public_url(bucket_name, file_name) def delete_blob(self, bucket_name, blob_name): blob = self.client.bucket(bucket_name).blob(blob_name) blob.delete() print(f"Blob {blob_name} deleted from {bucket_name}.")