File size: 4,685 Bytes
ec53722
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""COS utitities."""
import logging
import os
import tempfile
from io import BufferedReader
from typing import List, Optional, Tuple
from urllib.parse import urlparse

import boto3
from boto3_type_annotations.s3 import Bucket
from botocore.client import Config

logger = logging.getLogger(__name__)


def connect_bucket(s3_uri: str) -> Tuple[Bucket, List[str]]:
    parsed_uri = urlparse(s3_uri)
    # parse bucket and path, where path can be empty list
    _, bucket_name, *split_key = parsed_uri.path.split("/")
    # parsing credentials and host
    credentials, host = parsed_uri.netloc.split("@")
    # getting keys
    access, secret = credentials.split(":")
    # establish connection
    connection = boto3.resource(
        "s3",
        endpoint_url="http://{}".format(host),
        aws_access_key_id=access,
        aws_secret_access_key=secret,
        config=Config(signature_version="s3v4"),
        region_name="us-east-1",
    )
    return connection.Bucket(bucket_name), split_key


def ensure_filepath_from_uri(file_uri: str) -> str:
    """
    Get a file on the local storage.
    In case the file_uri provided is a S3 URI, dowloads the
    file and return the local path.
    Args:
        file_uri (str): a uri, either filesystem or S3.
    Returns:
        str: the path to the file on the local filesystem.
    """
    if file_uri.startswith("s3://"):
        try:
            bucket, split_key = connect_bucket(file_uri)
            path = os.path.join(*split_key)
            # create a file handle for storing the file locally
            a_file = tempfile.NamedTemporaryFile(delete=False)
            # make sure we close the file
            a_file.close()
            # download the file
            bucket.download_file(path, a_file.name)
            return a_file.name
        except Exception:
            message = "Getting file from COS failed " "for the provided URI: {}".format(
                file_uri
            )
            logger.exception(message)
            raise RuntimeError(message)
    else:
        logger.debug(f"Searching for {file_uri}")
        if os.path.exists(file_uri):
            return file_uri
        else:
            message = "File not found on local filesystem."
            logger.error(message)
            raise RuntimeError(message)


# COS configuration
COS_BUCKET_URI = os.environ.get(
    "COS_BUCKET_URI", os.path.join(os.getcwd(), "artifacts")
)
COS_UPLOAD_POLICY = os.environ.get("COS_UPLOAD_POLICY", "public-read-write")
# results prefix
RESULTS_PREFIX = "results"


def download_from_key(key: str, file_path: Optional[str] = None) -> None:
    """Download a single file from COS.
    If no file_path is given, object name is taken as relative local path.
    Args:
        key (str): S3 key.
        file_path (str, optional): Path of downloaded file. Defaults to None.
    """
    file_path = key if file_path is None else file_path
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    BUCKET.download_file(key, file_path)


def upload_to_key(file_path: str, key: str) -> None:
    """Upload local file to COS.
    Args:
        file_path (str): Local filepath.
        key (str): S3 key.
    """
    BUCKET.upload_file(file_path, key)


def fileobject_to_key(readable_binary: BufferedReader, key: str) -> None:
    """Upload readable, binary file from handle to COS.
    Args:
        readable_binary (BufferedReader): filehandle, e.g. opened in 'rb' mode.
        key (str): S3 key.
    """
    BUCKET.upload_fileobj(readable_binary, key)


def delete_from_key(key_or_prefix: str) -> None:
    """Delete all files matching given prefix from COS.
    Args:
        key_or_prefix (str): S3 uri including object name prefix.
    """
    BUCKET.objects.filter(Prefix=key_or_prefix).delete()


def string_to_key(string: str, key: str) -> None:
    """Upload string as object to COS.
    Args:
        string (str): object to be stored.
        key (str): S3 key.
    """
    BUCKET.put_object(Key=key, Body=string.encode())


def bytes_to_key(some_bytes: bytes, key: str) -> None:
    """Upload bytes as object to COS.
    Args:
        some_bytes (bytes): object to be stored.
        key (str): S3 key.
    """
    BUCKET.put_object(Key=key, Body=some_bytes)


def string_from_key(key: str) -> str:
    """Get object from COS as string.
    Args:
        key (str): S3 key.
    Returns:
        str: object.
    """
    return BUCKET.Object(key).get()["Body"].read().decode("utf-8")


def bytes_from_key(key: str) -> bytes:
    """Get object from COS as bytes.
    Args:
        key (str): S3 key.
    Returns:
        bytes: object.
    """
    return BUCKET.Object(key).get()["Body"].read()