Elron commited on
Commit
64c3236
·
verified ·
1 Parent(s): be4a716

Upload loaders.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. loaders.py +31 -16
loaders.py CHANGED
@@ -22,19 +22,18 @@ LoadFromKaggle: loads datasets from the kaggle.com community site
22
  LoadFromIBMCloud: loads a dataset from the IBM cloud.
23
  ------------------------
24
  """
25
- import importlib
26
  import itertools
27
  import os
28
  import tempfile
29
  from pathlib import Path
30
  from tempfile import TemporaryDirectory
31
- from typing import Dict, Mapping, Optional, Sequence, Union
32
 
33
  import pandas as pd
34
  from datasets import load_dataset as hf_load_dataset
35
  from tqdm import tqdm
36
 
37
- from .dataclass import InternalField
38
  from .logging_utils import get_logger
39
  from .operator import SourceOperator
40
  from .settings_utils import get_settings
@@ -43,20 +42,13 @@ from .stream import MultiStream, Stream
43
  logger = get_logger()
44
  settings = get_settings()
45
 
46
- try:
47
- import ibm_boto3
48
-
49
- ibm_boto3_available = True
50
- except ImportError:
51
- ibm_boto3_available = False
52
-
53
 
54
  class Loader(SourceOperator):
55
  # The loader_limit an optional parameter used to control the maximum number of instances to load from the the source.
56
  # It is usually provided to the loader via the recipe (see standard.py)
57
  # The loader can use this value to limit the amount of data downloaded from the source
58
  # to reduce loading time. However, this may not always be possible, so the
59
- # loader may ingore this. In any case, the recipe, will limit the number of instances in the returned
60
  # stream, after load is complete.
61
  loader_limit: int = None
62
  streaming: bool = False
@@ -92,7 +84,24 @@ class LoadHF(Loader):
92
  Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
93
  ] = None
94
  streaming: bool = True
 
95
  _cache: dict = InternalField(default=None)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
  def stream_dataset(self):
98
  if self._cache is None:
@@ -114,6 +123,9 @@ class LoadHF(Loader):
114
  f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment vairable: UNITXT_ALLOW_UNVERIFIED_CODE."
115
  ) from e
116
 
 
 
 
117
  if self.split is not None:
118
  dataset = {self.split: dataset}
119
 
@@ -143,6 +155,10 @@ class LoadHF(Loader):
143
  raise ValueError(
144
  f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment vairable: UNITXT_ALLOW_UNVERIFIED_CODE."
145
  ) from e
 
 
 
 
146
  if self.split is None:
147
  for split in dataset.keys():
148
  dataset[split] = dataset[split].to_iterable_dataset()
@@ -241,13 +257,10 @@ class MissingKaggleCredentialsError(ValueError):
241
  # TODO write how to obtain kaggle credentials
242
  class LoadFromKaggle(Loader):
243
  url: str
 
244
 
245
  def verify(self):
246
  super().verify()
247
- if importlib.util.find_spec("opendatasets") is None:
248
- raise ImportError(
249
- "Please install opendatasets in order to use the LoadFromKaggle loader (using `pip install opendatasets`) "
250
- )
251
  if not os.path.isfile("kaggle.json"):
252
  raise MissingKaggleCredentialsError(
253
  "Please obtain kaggle credentials https://christianjmills.com/posts/kaggle-obtain-api-key-tutorial/ and save them to local ./kaggle.json file"
@@ -283,6 +296,7 @@ class LoadFromIBMCloud(Loader):
283
  # 3. Mapping: split -> file_names, e.g. {"test" : ["test1.json", "test2.json"], "train": ["train.json"]}
284
  data_files: Union[Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
285
  caching: bool = True
 
286
 
287
  def _download_from_cos(self, cos, bucket_name, item_name, local_file):
288
  logger.info(f"Downloading {item_name} from {bucket_name} COS")
@@ -337,7 +351,6 @@ class LoadFromIBMCloud(Loader):
337
 
338
  def verify(self):
339
  super().verify()
340
- assert ibm_boto3_available, "Please install ibm_boto3 in order to use the LoadFromIBMCloud loader (using `pip install ibm-cos-sdk`) "
341
  assert (
342
  self.endpoint_url is not None
343
  ), f"Please set the {self.endpoint_url_env} environmental variable"
@@ -351,6 +364,8 @@ class LoadFromIBMCloud(Loader):
351
  raise NotImplementedError("LoadFromKaggle cannot load with streaming.")
352
 
353
  def process(self):
 
 
354
  cos = ibm_boto3.resource(
355
  "s3",
356
  aws_access_key_id=self.aws_access_key_id,
 
22
  LoadFromIBMCloud: loads a dataset from the IBM cloud.
23
  ------------------------
24
  """
 
25
  import itertools
26
  import os
27
  import tempfile
28
  from pathlib import Path
29
  from tempfile import TemporaryDirectory
30
+ from typing import Dict, List, Mapping, Optional, Sequence, Union
31
 
32
  import pandas as pd
33
  from datasets import load_dataset as hf_load_dataset
34
  from tqdm import tqdm
35
 
36
+ from .dataclass import InternalField, OptionalField
37
  from .logging_utils import get_logger
38
  from .operator import SourceOperator
39
  from .settings_utils import get_settings
 
42
  logger = get_logger()
43
  settings = get_settings()
44
 
 
 
 
 
 
 
 
45
 
46
  class Loader(SourceOperator):
47
  # The loader_limit an optional parameter used to control the maximum number of instances to load from the the source.
48
  # It is usually provided to the loader via the recipe (see standard.py)
49
  # The loader can use this value to limit the amount of data downloaded from the source
50
  # to reduce loading time. However, this may not always be possible, so the
51
+ # loader may ignore this. In any case, the recipe, will limit the number of instances in the returned
52
  # stream, after load is complete.
53
  loader_limit: int = None
54
  streaming: bool = False
 
84
  Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
85
  ] = None
86
  streaming: bool = True
87
+ filtering_lambda: Optional[str] = None
88
  _cache: dict = InternalField(default=None)
89
+ requirements_list: List[str] = OptionalField(default_factory=list)
90
+
91
+ def verify(self):
92
+ for requirement in self.requirements_list:
93
+ if requirement not in self._requirements_list:
94
+ self._requirements_list.append(requirement)
95
+ super().verify()
96
+
97
+ def filtered_load(self, dataset):
98
+ logger.info(f"\nLoading filtered by: {self.filtering_lambda};")
99
+ return MultiStream(
100
+ {
101
+ name: dataset[name].filter(eval(self.filtering_lambda))
102
+ for name in dataset
103
+ }
104
+ )
105
 
106
  def stream_dataset(self):
107
  if self._cache is None:
 
123
  f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment vairable: UNITXT_ALLOW_UNVERIFIED_CODE."
124
  ) from e
125
 
126
+ if self.filtering_lambda is not None:
127
+ dataset = self.filtered_load(dataset)
128
+
129
  if self.split is not None:
130
  dataset = {self.split: dataset}
131
 
 
155
  raise ValueError(
156
  f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment vairable: UNITXT_ALLOW_UNVERIFIED_CODE."
157
  ) from e
158
+
159
+ if self.filtering_lambda is not None:
160
+ dataset = self.filtered_load(dataset)
161
+
162
  if self.split is None:
163
  for split in dataset.keys():
164
  dataset[split] = dataset[split].to_iterable_dataset()
 
257
  # TODO write how to obtain kaggle credentials
258
  class LoadFromKaggle(Loader):
259
  url: str
260
+ _requirements_list: List[str] = ["opendatasets"]
261
 
262
  def verify(self):
263
  super().verify()
 
 
 
 
264
  if not os.path.isfile("kaggle.json"):
265
  raise MissingKaggleCredentialsError(
266
  "Please obtain kaggle credentials https://christianjmills.com/posts/kaggle-obtain-api-key-tutorial/ and save them to local ./kaggle.json file"
 
296
  # 3. Mapping: split -> file_names, e.g. {"test" : ["test1.json", "test2.json"], "train": ["train.json"]}
297
  data_files: Union[Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
298
  caching: bool = True
299
+ _requirements_list: List[str] = ["ibm_boto3"]
300
 
301
  def _download_from_cos(self, cos, bucket_name, item_name, local_file):
302
  logger.info(f"Downloading {item_name} from {bucket_name} COS")
 
351
 
352
  def verify(self):
353
  super().verify()
 
354
  assert (
355
  self.endpoint_url is not None
356
  ), f"Please set the {self.endpoint_url_env} environmental variable"
 
364
  raise NotImplementedError("LoadFromKaggle cannot load with streaming.")
365
 
366
  def process(self):
367
+ import ibm_boto3
368
+
369
  cos = ibm_boto3.resource(
370
  "s3",
371
  aws_access_key_id=self.aws_access_key_id,