Upload loaders.py with huggingface_hub
Browse files- loaders.py +73 -0
loaders.py
CHANGED
@@ -1,10 +1,21 @@
|
|
|
|
|
|
1 |
from typing import Mapping, Optional, Sequence, Union
|
2 |
|
3 |
from datasets import load_dataset as hf_load_dataset
|
|
|
4 |
|
5 |
from .operator import SourceOperator
|
6 |
from .stream import MultiStream
|
7 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
8 |
|
9 |
class Loader(SourceOperator):
|
10 |
pass
|
@@ -23,3 +34,65 @@ class LoadHF(Loader):
|
|
23 |
)
|
24 |
|
25 |
return MultiStream.from_iterables(dataset)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import os
|
2 |
+
from tempfile import TemporaryDirectory
|
3 |
from typing import Mapping, Optional, Sequence, Union
|
4 |
|
5 |
from datasets import load_dataset as hf_load_dataset
|
6 |
+
from tqdm import tqdm
|
7 |
|
8 |
from .operator import SourceOperator
|
9 |
from .stream import MultiStream
|
10 |
|
11 |
+
try:
|
12 |
+
import ibm_boto3
|
13 |
+
from ibm_botocore.client import ClientError
|
14 |
+
|
15 |
+
ibm_boto3_available = True
|
16 |
+
except ImportError:
|
17 |
+
ibm_boto3_available = False
|
18 |
+
|
19 |
|
20 |
class Loader(SourceOperator):
|
21 |
pass
|
|
|
34 |
)
|
35 |
|
36 |
return MultiStream.from_iterables(dataset)
|
37 |
+
|
38 |
+
|
39 |
+
class LoadFromIBMCloud(Loader):
|
40 |
+
endpoint_url_env: str
|
41 |
+
aws_access_key_id_env: str
|
42 |
+
aws_secret_access_key_env: str
|
43 |
+
bucket_name: str
|
44 |
+
data_dir: str
|
45 |
+
data_files: Sequence[str]
|
46 |
+
|
47 |
+
def _download_from_cos(self, cos, bucket_name, item_name, local_file):
|
48 |
+
print(f"Downloading {item_name} from {bucket_name} COS to {local_file}")
|
49 |
+
try:
|
50 |
+
response = cos.Object(bucket_name, item_name).get()
|
51 |
+
size = response["ContentLength"]
|
52 |
+
except Exception as e:
|
53 |
+
raise Exception(f"Unabled to access {item_name} in {bucket_name} in COS", e)
|
54 |
+
|
55 |
+
progress_bar = tqdm(total=size, unit="iB", unit_scale=True)
|
56 |
+
|
57 |
+
def upload_progress(chunk):
|
58 |
+
progress_bar.update(chunk)
|
59 |
+
|
60 |
+
try:
|
61 |
+
cos.Bucket(bucket_name).download_file(item_name, local_file, Callback=upload_progress)
|
62 |
+
print("\nDownload Successful")
|
63 |
+
except Exception as e:
|
64 |
+
raise Exception(f"Unabled to download {item_name} in {bucket_name}", e)
|
65 |
+
|
66 |
+
def prepare(self):
|
67 |
+
super().prepare()
|
68 |
+
self.endpoint_url = os.getenv(self.endpoint_url_env)
|
69 |
+
self.aws_access_key_id = os.getenv(self.aws_access_key_id_env)
|
70 |
+
self.aws_secret_access_key = os.getenv(self.aws_secret_access_key_env)
|
71 |
+
|
72 |
+
def verify(self):
|
73 |
+
super().verify()
|
74 |
+
assert (
|
75 |
+
ibm_boto3_available
|
76 |
+
), f"Please install ibm_boto3 in order to use the LoadFromIBMCloud loader (using `pip install ibm-cos-sdk`) "
|
77 |
+
assert self.endpoint_url is not None, f"Please set the {self.endpoint_url_env} environmental variable"
|
78 |
+
assert self.aws_access_key_id is not None, f"Please set {self.aws_access_key_id_env} environmental variable"
|
79 |
+
assert (
|
80 |
+
self.aws_secret_access_key is not None
|
81 |
+
), f"Please set {self.aws_secret_access_key_env} environmental variable"
|
82 |
+
|
83 |
+
def process(self):
|
84 |
+
cos = ibm_boto3.resource(
|
85 |
+
"s3",
|
86 |
+
aws_access_key_id=self.aws_access_key_id,
|
87 |
+
aws_secret_access_key=self.aws_secret_access_key,
|
88 |
+
endpoint_url=self.endpoint_url,
|
89 |
+
)
|
90 |
+
|
91 |
+
with TemporaryDirectory() as temp_directory:
|
92 |
+
for data_file in self.data_files:
|
93 |
+
self._download_from_cos(
|
94 |
+
cos, self.bucket_name, self.data_dir + "/" + data_file, temp_directory + "/" + data_file
|
95 |
+
)
|
96 |
+
dataset = hf_load_dataset(temp_directory, streaming=False)
|
97 |
+
|
98 |
+
return MultiStream.from_iterables(dataset)
|