Elron commited on
Commit
7e3fef8
·
1 Parent(s): 8287b2e

Upload loaders.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. loaders.py +87 -18
loaders.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import os
2
  from tempfile import TemporaryDirectory
3
  from typing import Dict, Mapping, Optional, Sequence, Union
@@ -11,7 +13,7 @@ from .stream import MultiStream, Stream
11
 
12
  try:
13
  import ibm_boto3
14
- from ibm_botocore.client import ClientError
15
 
16
  ibm_boto3_available = True
17
  except ImportError:
@@ -19,6 +21,13 @@ except ImportError:
19
 
20
 
21
  class Loader(SourceOperator):
 
 
 
 
 
 
 
22
  pass
23
 
24
 
@@ -26,14 +35,41 @@ class LoadHF(Loader):
26
  path: str
27
  name: Optional[str] = None
28
  data_dir: Optional[str] = None
29
- data_files: Optional[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]] = None
 
 
 
30
  streaming: bool = True
31
  cached = False
32
 
33
  def process(self):
34
- dataset = hf_load_dataset(
35
- self.path, name=self.name, data_dir=self.data_dir, data_files=self.data_files, streaming=self.streaming
36
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
  return MultiStream.from_iterables(dataset)
39
 
@@ -44,12 +80,15 @@ class LoadCSV(Loader):
44
 
45
  def load_csv(self, file):
46
  for chunk in pd.read_csv(file, chunksize=self.chunksize):
47
- for index, row in chunk.iterrows():
48
  yield row.to_dict()
49
 
50
  def process(self):
51
  return MultiStream(
52
- {name: Stream(generator=self.load_csv, gen_kwargs={"file": file}) for name, file in self.files.items()}
 
 
 
53
  )
54
 
55
 
@@ -58,16 +97,33 @@ class LoadFromIBMCloud(Loader):
58
  aws_access_key_id_env: str
59
  aws_secret_access_key_env: str
60
  bucket_name: str
61
- data_dir: str
62
  data_files: Sequence[str]
63
 
64
  def _download_from_cos(self, cos, bucket_name, item_name, local_file):
65
- print(f"Downloading {item_name} from {bucket_name} COS to {local_file}")
66
  try:
67
  response = cos.Object(bucket_name, item_name).get()
68
  size = response["ContentLength"]
 
69
  except Exception as e:
70
- raise Exception(f"Unabled to access {item_name} in {bucket_name} in COS", e)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
 
72
  progress_bar = tqdm(total=size, unit="iB", unit_scale=True)
73
 
@@ -75,10 +131,14 @@ class LoadFromIBMCloud(Loader):
75
  progress_bar.update(chunk)
76
 
77
  try:
78
- cos.Bucket(bucket_name).download_file(item_name, local_file, Callback=upload_progress)
79
- print("\nDownload Successful")
 
 
80
  except Exception as e:
81
- raise Exception(f"Unabled to download {item_name} in {bucket_name}", e)
 
 
82
 
83
  def prepare(self):
84
  super().prepare()
@@ -88,11 +148,13 @@ class LoadFromIBMCloud(Loader):
88
 
89
  def verify(self):
90
  super().verify()
 
 
 
 
91
  assert (
92
- ibm_boto3_available
93
- ), f"Please install ibm_boto3 in order to use the LoadFromIBMCloud loader (using `pip install ibm-cos-sdk`) "
94
- assert self.endpoint_url is not None, f"Please set the {self.endpoint_url_env} environmental variable"
95
- assert self.aws_access_key_id is not None, f"Please set {self.aws_access_key_id_env} environmental variable"
96
  assert (
97
  self.aws_secret_access_key is not None
98
  ), f"Please set {self.aws_secret_access_key_env} environmental variable"
@@ -107,8 +169,15 @@ class LoadFromIBMCloud(Loader):
107
 
108
  with TemporaryDirectory() as temp_directory:
109
  for data_file in self.data_files:
 
 
 
 
 
 
 
110
  self._download_from_cos(
111
- cos, self.bucket_name, self.data_dir + "/" + data_file, temp_directory + "/" + data_file
112
  )
113
  dataset = hf_load_dataset(temp_directory, streaming=False)
114
 
 
1
+ import itertools
2
+ import logging
3
  import os
4
  from tempfile import TemporaryDirectory
5
  from typing import Dict, Mapping, Optional, Sequence, Union
 
13
 
14
  try:
15
  import ibm_boto3
16
+ # from ibm_botocore.client import ClientError
17
 
18
  ibm_boto3_available = True
19
  except ImportError:
 
21
 
22
 
23
  class Loader(SourceOperator):
24
+ # The loader_limit an optional parameter used to control the maximum number of instances to load from the the source.
25
+ # It is usually provided to the loader via the recipe (see standard.py)
26
+ # The loader can use this value to limit the amount of data downloaded from the source
27
+ # to reduce loading time. However, this may not always be possible, so the
28
+ # loader may ingore this. In any case, the recipe, will limit the number of instances in the returned
29
+ # stream after, after load is complete.
30
+ loader_limit: int = None
31
  pass
32
 
33
 
 
35
  path: str
36
  name: Optional[str] = None
37
  data_dir: Optional[str] = None
38
+ split: Optional[str] = None
39
+ data_files: Optional[
40
+ Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
41
+ ] = None
42
  streaming: bool = True
43
  cached = False
44
 
45
  def process(self):
46
+ try:
47
+ dataset = hf_load_dataset(
48
+ self.path,
49
+ name=self.name,
50
+ data_dir=self.data_dir,
51
+ data_files=self.data_files,
52
+ streaming=self.streaming,
53
+ split=self.split,
54
+ )
55
+ if self.split is not None:
56
+ dataset = {self.split: dataset}
57
+ except (
58
+ NotImplementedError
59
+ ): # streaming is not supported for zipped files so we load without streaming
60
+ dataset = hf_load_dataset(
61
+ self.path,
62
+ name=self.name,
63
+ data_dir=self.data_dir,
64
+ data_files=self.data_files,
65
+ streaming=False,
66
+ split=self.split,
67
+ )
68
+ if self.split is None:
69
+ for split in dataset.keys():
70
+ dataset[split] = dataset[split].to_iterable_dataset()
71
+ else:
72
+ dataset = {self.split: dataset}
73
 
74
  return MultiStream.from_iterables(dataset)
75
 
 
80
 
81
  def load_csv(self, file):
82
  for chunk in pd.read_csv(file, chunksize=self.chunksize):
83
+ for _index, row in chunk.iterrows():
84
  yield row.to_dict()
85
 
86
  def process(self):
87
  return MultiStream(
88
+ {
89
+ name: Stream(generator=self.load_csv, gen_kwargs={"file": file})
90
+ for name, file in self.files.items()
91
+ }
92
  )
93
 
94
 
 
97
  aws_access_key_id_env: str
98
  aws_secret_access_key_env: str
99
  bucket_name: str
100
+ data_dir: str = None
101
  data_files: Sequence[str]
102
 
103
  def _download_from_cos(self, cos, bucket_name, item_name, local_file):
104
+ logging.info(f"Downloading {item_name} from {bucket_name} COS")
105
  try:
106
  response = cos.Object(bucket_name, item_name).get()
107
  size = response["ContentLength"]
108
+ body = response["Body"]
109
  except Exception as e:
110
+ raise Exception(
111
+ f"Unabled to access {item_name} in {bucket_name} in COS", e
112
+ ) from e
113
+
114
+ if self.loader_limit is not None:
115
+ if item_name.endswith(".jsonl"):
116
+ first_lines = list(
117
+ itertools.islice(body.iter_lines(), self.loader_limit)
118
+ )
119
+ with open(local_file, "wb") as downloaded_file:
120
+ for line in first_lines:
121
+ downloaded_file.write(line)
122
+ downloaded_file.write(b"\n")
123
+ logging.info(
124
+ f"\nDownload successful limited to {self.loader_limit} lines"
125
+ )
126
+ return
127
 
128
  progress_bar = tqdm(total=size, unit="iB", unit_scale=True)
129
 
 
131
  progress_bar.update(chunk)
132
 
133
  try:
134
+ cos.Bucket(bucket_name).download_file(
135
+ item_name, local_file, Callback=upload_progress
136
+ )
137
+ logging.info("\nDownload Successful")
138
  except Exception as e:
139
+ raise Exception(
140
+ f"Unabled to download {item_name} in {bucket_name}", e
141
+ ) from e
142
 
143
  def prepare(self):
144
  super().prepare()
 
148
 
149
  def verify(self):
150
  super().verify()
151
+ assert ibm_boto3_available, "Please install ibm_boto3 in order to use the LoadFromIBMCloud loader (using `pip install ibm-cos-sdk`) "
152
+ assert (
153
+ self.endpoint_url is not None
154
+ ), f"Please set the {self.endpoint_url_env} environmental variable"
155
  assert (
156
+ self.aws_access_key_id is not None
157
+ ), f"Please set {self.aws_access_key_id_env} environmental variable"
 
 
158
  assert (
159
  self.aws_secret_access_key is not None
160
  ), f"Please set {self.aws_secret_access_key_env} environmental variable"
 
169
 
170
  with TemporaryDirectory() as temp_directory:
171
  for data_file in self.data_files:
172
+ # Build object key based on parameters. Slash character is not
173
+ # allowed to be part of object key in IBM COS.
174
+ object_key = (
175
+ self.data_dir + "/" + data_file
176
+ if self.data_dir is not None
177
+ else data_file
178
+ )
179
  self._download_from_cos(
180
+ cos, self.bucket_name, object_key, temp_directory + "/" + data_file
181
  )
182
  dataset = hf_load_dataset(temp_directory, streaming=False)
183