Spaces:
Running
Running
"""COS utitities.""" | |
import logging | |
import os | |
import tempfile | |
from io import BufferedReader | |
from typing import List, Optional, Tuple | |
from urllib.parse import urlparse | |
import boto3 | |
from boto3_type_annotations.s3 import Bucket | |
from botocore.client import Config | |
logger = logging.getLogger(__name__) | |
def connect_bucket(s3_uri: str) -> Tuple[Bucket, List[str]]: | |
parsed_uri = urlparse(s3_uri) | |
# parse bucket and path, where path can be empty list | |
_, bucket_name, *split_key = parsed_uri.path.split("/") | |
# parsing credentials and host | |
credentials, host = parsed_uri.netloc.split("@") | |
# getting keys | |
access, secret = credentials.split(":") | |
# establish connection | |
connection = boto3.resource( | |
"s3", | |
endpoint_url="http://{}".format(host), | |
aws_access_key_id=access, | |
aws_secret_access_key=secret, | |
config=Config(signature_version="s3v4"), | |
region_name="us-east-1", | |
) | |
return connection.Bucket(bucket_name), split_key | |
def ensure_filepath_from_uri(file_uri: str) -> str: | |
""" | |
Get a file on the local storage. | |
In case the file_uri provided is a S3 URI, dowloads the | |
file and return the local path. | |
Args: | |
file_uri (str): a uri, either filesystem or S3. | |
Returns: | |
str: the path to the file on the local filesystem. | |
""" | |
if file_uri.startswith("s3://"): | |
try: | |
bucket, split_key = connect_bucket(file_uri) | |
path = os.path.join(*split_key) | |
# create a file handle for storing the file locally | |
a_file = tempfile.NamedTemporaryFile(delete=False) | |
# make sure we close the file | |
a_file.close() | |
# download the file | |
bucket.download_file(path, a_file.name) | |
return a_file.name | |
except Exception: | |
message = "Getting file from COS failed " "for the provided URI: {}".format( | |
file_uri | |
) | |
logger.exception(message) | |
raise RuntimeError(message) | |
else: | |
logger.debug(f"Searching for {file_uri}") | |
if os.path.exists(file_uri): | |
return file_uri | |
else: | |
message = "File not found on local filesystem." | |
logger.error(message) | |
raise RuntimeError(message) | |
# COS configuration | |
COS_BUCKET_URI = os.environ.get( | |
"COS_BUCKET_URI", os.path.join(os.getcwd(), "artifacts") | |
) | |
COS_UPLOAD_POLICY = os.environ.get("COS_UPLOAD_POLICY", "public-read-write") | |
# results prefix | |
RESULTS_PREFIX = "results" | |
def download_from_key(key: str, file_path: Optional[str] = None) -> None: | |
"""Download a single file from COS. | |
If no file_path is given, object name is taken as relative local path. | |
Args: | |
key (str): S3 key. | |
file_path (str, optional): Path of downloaded file. Defaults to None. | |
""" | |
file_path = key if file_path is None else file_path | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
BUCKET.download_file(key, file_path) | |
def upload_to_key(file_path: str, key: str) -> None: | |
"""Upload local file to COS. | |
Args: | |
file_path (str): Local filepath. | |
key (str): S3 key. | |
""" | |
BUCKET.upload_file(file_path, key) | |
def fileobject_to_key(readable_binary: BufferedReader, key: str) -> None: | |
"""Upload readable, binary file from handle to COS. | |
Args: | |
readable_binary (BufferedReader): filehandle, e.g. opened in 'rb' mode. | |
key (str): S3 key. | |
""" | |
BUCKET.upload_fileobj(readable_binary, key) | |
def delete_from_key(key_or_prefix: str) -> None: | |
"""Delete all files matching given prefix from COS. | |
Args: | |
key_or_prefix (str): S3 uri including object name prefix. | |
""" | |
BUCKET.objects.filter(Prefix=key_or_prefix).delete() | |
def string_to_key(string: str, key: str) -> None: | |
"""Upload string as object to COS. | |
Args: | |
string (str): object to be stored. | |
key (str): S3 key. | |
""" | |
BUCKET.put_object(Key=key, Body=string.encode()) | |
def bytes_to_key(some_bytes: bytes, key: str) -> None: | |
"""Upload bytes as object to COS. | |
Args: | |
some_bytes (bytes): object to be stored. | |
key (str): S3 key. | |
""" | |
BUCKET.put_object(Key=key, Body=some_bytes) | |
def string_from_key(key: str) -> str: | |
"""Get object from COS as string. | |
Args: | |
key (str): S3 key. | |
Returns: | |
str: object. | |
""" | |
return BUCKET.Object(key).get()["Body"].read().decode("utf-8") | |
def bytes_from_key(key: str) -> bytes: | |
"""Get object from COS as bytes. | |
Args: | |
key (str): S3 key. | |
Returns: | |
bytes: object. | |
""" | |
return BUCKET.Object(key).get()["Body"].read() | |