File size: 2,590 Bytes
3bdba8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10f0966
 
 
 
 
 
3bdba8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10f0966
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import json
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient.http import MediaIoBaseDownload


class GoogleCloudStorage:
    def __init__(self, service_account_key_string):
        credentials_dict = json.loads(service_account_key_string)
        credentials = service_account.Credentials.from_service_account_info(credentials_dict)
        self.client = storage.Client(credentials=credentials, project=credentials_dict['project_id'])

    def check_file_exists(self, bucket_name, file_name):
        blob = self.client.bucket(bucket_name).blob(file_name)
        return blob.exists()

    def upload_file(self, bucket_name, destination_blob_name, file_path):
        blob = self.client.bucket(bucket_name).blob(destination_blob_name)
        blob.upload_from_filename(file_path)
        print(f"File {file_path} uploaded to {destination_blob_name} in GCS.")

    def upload_file_as_string(self, bucket_name, destination_blob_name, content):
        blob = self.client.bucket(bucket_name).blob(destination_blob_name)
        blob.upload_from_string(content)
        print(f"String content uploaded to {destination_blob_name} in GCS.")
        return None
    
    def upload_json_string(self, bucket_name, destination_blob_name, json_data):
        """Uploads a JSON string to a specified GCS bucket."""
        blob = self.client.bucket(bucket_name).blob(destination_blob_name)
        blob.upload_from_string(json_data, content_type='application/json')
        print(f"JSON string uploaded to {destination_blob_name} in GCS.")

    def download_as_string(self, bucket_name, source_blob_name):
        blob = self.client.bucket(bucket_name).blob(source_blob_name)
        return blob.download_as_text()

    def make_blob_public(self, bucket_name, blob_name):
        blob = self.client.bucket(bucket_name).blob(blob_name)
        blob.make_public()
        print(f"Blob {blob_name} is now publicly accessible at {blob.public_url}")

    def get_public_url(self, bucket_name, blob_name):
        blob = self.client.bucket(bucket_name).blob(blob_name)
        return blob.public_url

    def upload_image_and_get_public_url(self, bucket_name, file_name, file_path):
        self.upload_file(bucket_name, file_name, file_path)
        self.make_blob_public(bucket_name, file_name)
        return self.get_public_url(bucket_name, file_name)
    
    def delete_blob(self, bucket_name, blob_name):
        blob = self.client.bucket(bucket_name).blob(blob_name)
        blob.delete()
        print(f"Blob {blob_name} deleted from {bucket_name}.")