paccmann / cos.py
jannisborn's picture
update
ec53722 unverified
raw
history blame
4.69 kB
"""COS utitities."""
import logging
import os
import tempfile
from io import BufferedReader
from typing import List, Optional, Tuple
from urllib.parse import urlparse
import boto3
from boto3_type_annotations.s3 import Bucket
from botocore.client import Config
logger = logging.getLogger(__name__)
def connect_bucket(s3_uri: str) -> Tuple[Bucket, List[str]]:
parsed_uri = urlparse(s3_uri)
# parse bucket and path, where path can be empty list
_, bucket_name, *split_key = parsed_uri.path.split("/")
# parsing credentials and host
credentials, host = parsed_uri.netloc.split("@")
# getting keys
access, secret = credentials.split(":")
# establish connection
connection = boto3.resource(
"s3",
endpoint_url="http://{}".format(host),
aws_access_key_id=access,
aws_secret_access_key=secret,
config=Config(signature_version="s3v4"),
region_name="us-east-1",
)
return connection.Bucket(bucket_name), split_key
def ensure_filepath_from_uri(file_uri: str) -> str:
"""
Get a file on the local storage.
In case the file_uri provided is a S3 URI, dowloads the
file and return the local path.
Args:
file_uri (str): a uri, either filesystem or S3.
Returns:
str: the path to the file on the local filesystem.
"""
if file_uri.startswith("s3://"):
try:
bucket, split_key = connect_bucket(file_uri)
path = os.path.join(*split_key)
# create a file handle for storing the file locally
a_file = tempfile.NamedTemporaryFile(delete=False)
# make sure we close the file
a_file.close()
# download the file
bucket.download_file(path, a_file.name)
return a_file.name
except Exception:
message = "Getting file from COS failed " "for the provided URI: {}".format(
file_uri
)
logger.exception(message)
raise RuntimeError(message)
else:
logger.debug(f"Searching for {file_uri}")
if os.path.exists(file_uri):
return file_uri
else:
message = "File not found on local filesystem."
logger.error(message)
raise RuntimeError(message)
# COS configuration
COS_BUCKET_URI = os.environ.get(
"COS_BUCKET_URI", os.path.join(os.getcwd(), "artifacts")
)
COS_UPLOAD_POLICY = os.environ.get("COS_UPLOAD_POLICY", "public-read-write")
# results prefix
RESULTS_PREFIX = "results"
def download_from_key(key: str, file_path: Optional[str] = None) -> None:
"""Download a single file from COS.
If no file_path is given, object name is taken as relative local path.
Args:
key (str): S3 key.
file_path (str, optional): Path of downloaded file. Defaults to None.
"""
file_path = key if file_path is None else file_path
os.makedirs(os.path.dirname(file_path), exist_ok=True)
BUCKET.download_file(key, file_path)
def upload_to_key(file_path: str, key: str) -> None:
"""Upload local file to COS.
Args:
file_path (str): Local filepath.
key (str): S3 key.
"""
BUCKET.upload_file(file_path, key)
def fileobject_to_key(readable_binary: BufferedReader, key: str) -> None:
"""Upload readable, binary file from handle to COS.
Args:
readable_binary (BufferedReader): filehandle, e.g. opened in 'rb' mode.
key (str): S3 key.
"""
BUCKET.upload_fileobj(readable_binary, key)
def delete_from_key(key_or_prefix: str) -> None:
"""Delete all files matching given prefix from COS.
Args:
key_or_prefix (str): S3 uri including object name prefix.
"""
BUCKET.objects.filter(Prefix=key_or_prefix).delete()
def string_to_key(string: str, key: str) -> None:
"""Upload string as object to COS.
Args:
string (str): object to be stored.
key (str): S3 key.
"""
BUCKET.put_object(Key=key, Body=string.encode())
def bytes_to_key(some_bytes: bytes, key: str) -> None:
"""Upload bytes as object to COS.
Args:
some_bytes (bytes): object to be stored.
key (str): S3 key.
"""
BUCKET.put_object(Key=key, Body=some_bytes)
def string_from_key(key: str) -> str:
"""Get object from COS as string.
Args:
key (str): S3 key.
Returns:
str: object.
"""
return BUCKET.Object(key).get()["Body"].read().decode("utf-8")
def bytes_from_key(key: str) -> bytes:
"""Get object from COS as bytes.
Args:
key (str): S3 key.
Returns:
bytes: object.
"""
return BUCKET.Object(key).get()["Body"].read()