Spaces:
Running
Running
File size: 4,685 Bytes
ec53722 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
"""COS utitities."""
import logging
import os
import tempfile
from io import BufferedReader
from typing import List, Optional, Tuple
from urllib.parse import urlparse
import boto3
from boto3_type_annotations.s3 import Bucket
from botocore.client import Config
logger = logging.getLogger(__name__)
def connect_bucket(s3_uri: str) -> Tuple[Bucket, List[str]]:
parsed_uri = urlparse(s3_uri)
# parse bucket and path, where path can be empty list
_, bucket_name, *split_key = parsed_uri.path.split("/")
# parsing credentials and host
credentials, host = parsed_uri.netloc.split("@")
# getting keys
access, secret = credentials.split(":")
# establish connection
connection = boto3.resource(
"s3",
endpoint_url="http://{}".format(host),
aws_access_key_id=access,
aws_secret_access_key=secret,
config=Config(signature_version="s3v4"),
region_name="us-east-1",
)
return connection.Bucket(bucket_name), split_key
def ensure_filepath_from_uri(file_uri: str) -> str:
"""
Get a file on the local storage.
In case the file_uri provided is a S3 URI, dowloads the
file and return the local path.
Args:
file_uri (str): a uri, either filesystem or S3.
Returns:
str: the path to the file on the local filesystem.
"""
if file_uri.startswith("s3://"):
try:
bucket, split_key = connect_bucket(file_uri)
path = os.path.join(*split_key)
# create a file handle for storing the file locally
a_file = tempfile.NamedTemporaryFile(delete=False)
# make sure we close the file
a_file.close()
# download the file
bucket.download_file(path, a_file.name)
return a_file.name
except Exception:
message = "Getting file from COS failed " "for the provided URI: {}".format(
file_uri
)
logger.exception(message)
raise RuntimeError(message)
else:
logger.debug(f"Searching for {file_uri}")
if os.path.exists(file_uri):
return file_uri
else:
message = "File not found on local filesystem."
logger.error(message)
raise RuntimeError(message)
# COS configuration
COS_BUCKET_URI = os.environ.get(
"COS_BUCKET_URI", os.path.join(os.getcwd(), "artifacts")
)
COS_UPLOAD_POLICY = os.environ.get("COS_UPLOAD_POLICY", "public-read-write")
# results prefix
RESULTS_PREFIX = "results"
def download_from_key(key: str, file_path: Optional[str] = None) -> None:
"""Download a single file from COS.
If no file_path is given, object name is taken as relative local path.
Args:
key (str): S3 key.
file_path (str, optional): Path of downloaded file. Defaults to None.
"""
file_path = key if file_path is None else file_path
os.makedirs(os.path.dirname(file_path), exist_ok=True)
BUCKET.download_file(key, file_path)
def upload_to_key(file_path: str, key: str) -> None:
"""Upload local file to COS.
Args:
file_path (str): Local filepath.
key (str): S3 key.
"""
BUCKET.upload_file(file_path, key)
def fileobject_to_key(readable_binary: BufferedReader, key: str) -> None:
"""Upload readable, binary file from handle to COS.
Args:
readable_binary (BufferedReader): filehandle, e.g. opened in 'rb' mode.
key (str): S3 key.
"""
BUCKET.upload_fileobj(readable_binary, key)
def delete_from_key(key_or_prefix: str) -> None:
"""Delete all files matching given prefix from COS.
Args:
key_or_prefix (str): S3 uri including object name prefix.
"""
BUCKET.objects.filter(Prefix=key_or_prefix).delete()
def string_to_key(string: str, key: str) -> None:
"""Upload string as object to COS.
Args:
string (str): object to be stored.
key (str): S3 key.
"""
BUCKET.put_object(Key=key, Body=string.encode())
def bytes_to_key(some_bytes: bytes, key: str) -> None:
"""Upload bytes as object to COS.
Args:
some_bytes (bytes): object to be stored.
key (str): S3 key.
"""
BUCKET.put_object(Key=key, Body=some_bytes)
def string_from_key(key: str) -> str:
"""Get object from COS as string.
Args:
key (str): S3 key.
Returns:
str: object.
"""
return BUCKET.Object(key).get()["Body"].read().decode("utf-8")
def bytes_from_key(key: str) -> bytes:
"""Get object from COS as bytes.
Args:
key (str): S3 key.
Returns:
bytes: object.
"""
return BUCKET.Object(key).get()["Body"].read()
|