File size: 12,757 Bytes
f786e3b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d79bb48
7e3fef8
cb669f3
d79bb48
 
cb669f3
9564cbf
e3ab2c6
9564cbf
e3ab2c6
cb669f3
e3ab2c6
d79bb48
26a73a2
18ed1aa
9564cbf
26a73a2
d79bb48
18ed1aa
 
cb669f3
 
d79bb48
7e3fef8
cb669f3
 
 
 
 
e3ab2c6
 
7e3fef8
 
 
 
 
f786e3b
7e3fef8
87d48ff
e3ab2c6
 
 
 
 
 
7e3fef8
 
 
 
87d48ff
e3ab2c6
 
7e3fef8
d79bb48
87d48ff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e3fef8
 
 
 
 
d79bb48
18ed1aa
 
 
 
 
 
 
 
 
 
6b15472
18ed1aa
 
 
 
 
 
7e3fef8
 
 
 
 
e3ab2c6
 
cb669f3
 
9564cbf
 
 
 
87d48ff
9564cbf
7e3fef8
9564cbf
 
 
87d48ff
 
 
 
 
 
 
 
9564cbf
7e3fef8
87d48ff
7e3fef8
 
9564cbf
 
 
d79bb48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87d48ff
 
 
d79bb48
 
 
 
 
 
 
 
 
 
 
 
 
 
cb669f3
 
 
 
 
7e3fef8
d79bb48
 
 
 
 
 
 
cb669f3
 
d79bb48
cb669f3
 
 
7e3fef8
cb669f3
7e3fef8
 
 
 
 
 
 
 
 
 
 
 
 
d79bb48
7e3fef8
 
 
cb669f3
 
 
 
 
 
 
7e3fef8
 
 
d79bb48
cb669f3
7e3fef8
 
 
cb669f3
 
 
 
 
 
d79bb48
 
 
 
 
cb669f3
 
 
7e3fef8
 
 
 
cb669f3
7e3fef8
 
cb669f3
 
 
87d48ff
 
cb669f3
 
 
 
 
 
 
 
56803e8
 
 
 
 
 
d79bb48
 
 
 
 
 
 
 
 
 
 
 
7e3fef8
 
 
 
 
 
 
18ed1aa
 
 
 
 
 
 
 
 
 
 
 
d79bb48
 
 
 
 
 
 
cb669f3
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""This section describes unitxt loaders.

Loaders: Generators of Unitxt Multistreams from existing date sources
==============================================================

Unitxt is all about readily preparing of any given data source for feeding into any given language model, and then,
postprocessing the model's output, preparing it for any given evaluator.

Through that journey, the data advances in the form of Unitxt Multistream, undergoing a sequential application
of various off the shelf operators (i.e, picked from Unitxt catalog), or operators easily implemented by inheriting.
The journey starts by a Unitxt Loeader bearing a Multistream from the given datasource.
A loader, therefore, is the first item on any Unitxt Recipe.

Unitxt catalog contains several loaders for the most popular datasource formats.
All these loaders inherit from Loader, and hence, implementing a loader to expand over a new type of datasource, is
straight forward.

Operators in Unitxt catalog:
LoadHF : loads from Huggingface dataset.
LoadCSV: loads from csv (comma separated value) files
LoadFromKaggle: loads datasets from the kaggle.com community site
LoadFromIBMCloud: loads a dataset from the IBM cloud.
------------------------
"""
import importlib
import itertools
import os
import tempfile
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, Mapping, Optional, Sequence, Union

import pandas as pd
from datasets import load_dataset as hf_load_dataset
from tqdm import tqdm

from .logging_utils import get_logger
from .operator import SourceOperator
from .settings_utils import get_settings
from .stream import MultiStream, Stream

logger = get_logger()
settings = get_settings()

try:
    import ibm_boto3

    # from ibm_botocore.client import ClientError

    ibm_boto3_available = True
except ImportError:
    ibm_boto3_available = False


class Loader(SourceOperator):
    # The loader_limit an optional parameter used to control the maximum number of instances to load from the the source.
    # It is usually provided to the loader via the recipe (see standard.py)
    # The loader can use this value to limit the amount of data downloaded from the source
    # to reduce loading time.  However, this may not always be possible, so the
    # loader may ingore this.  In any case, the recipe, will limit the number of instances in the returned
    # stream, after load is complete.
    loader_limit: int = None
    streaming: bool = False


class LoadHF(Loader):
    path: str
    name: Optional[str] = None
    data_dir: Optional[str] = None
    split: Optional[str] = None
    data_files: Optional[
        Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
    ] = None
    streaming: bool = False

    def process(self):
        try:
            with tempfile.TemporaryDirectory() as dir_to_be_deleted:
                try:
                    dataset = hf_load_dataset(
                        self.path,
                        name=self.name,
                        data_dir=self.data_dir,
                        data_files=self.data_files,
                        streaming=self.streaming,
                        cache_dir=None if self.streaming else dir_to_be_deleted,
                        split=self.split,
                        trust_remote_code=settings.allow_unverified_code,
                    )
                except ValueError as e:
                    if "trust_remote_code" in str(e):
                        raise ValueError(
                            f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment vairable: UNITXT_ALLOW_UNVERIFIED_CODE."
                        ) from e
            if self.split is not None:
                dataset = {self.split: dataset}
        except (
            NotImplementedError
        ):  # streaming is not supported for zipped files so we load without streaming
            with tempfile.TemporaryDirectory() as dir_to_be_deleted:
                try:
                    dataset = hf_load_dataset(
                        self.path,
                        name=self.name,
                        data_dir=self.data_dir,
                        data_files=self.data_files,
                        streaming=False,
                        keep_in_memory=True,
                        cache_dir=dir_to_be_deleted,
                        split=self.split,
                        trust_remote_code=settings.allow_unverified_code,
                    )
                except ValueError as e:
                    if "trust_remote_code" in str(e):
                        raise ValueError(
                            f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment vairable: UNITXT_ALLOW_UNVERIFIED_CODE."
                        ) from e
            if self.split is None:
                for split in dataset.keys():
                    dataset[split] = dataset[split].to_iterable_dataset()
            else:
                dataset = {self.split: dataset}

        return MultiStream.from_iterables(dataset)


class LoadCSV(Loader):
    files: Dict[str, str]
    chunksize: int = 1000

    def stream_csv(self, file):
        for chunk in pd.read_csv(file, chunksize=self.chunksize):
            for _index, row in chunk.iterrows():
                yield row.to_dict()

    def process(self):
        if self.streaming:
            return MultiStream(
                {
                    name: Stream(generator=self.stream_csv, gen_kwargs={"file": file})
                    for name, file in self.files.items()
                }
            )

        return MultiStream(
            {
                name: pd.read_csv(file).to_dict("records")
                for name, file in self.files.items()
            }
        )


class MissingKaggleCredentialsError(ValueError):
    pass


# TODO write how to obtain kaggle credentials
class LoadFromKaggle(Loader):
    url: str

    def verify(self):
        super().verify()
        if importlib.util.find_spec("opendatasets") is None:
            raise ImportError(
                "Please install opendatasets in order to use the LoadFromKaggle loader (using `pip install opendatasets`) "
            )
        if not os.path.isfile("kaggle.json"):
            raise MissingKaggleCredentialsError(
                "Please obtain kaggle credentials https://christianjmills.com/posts/kaggle-obtain-api-key-tutorial/ and save them to local ./kaggle.json file"
            )

        if self.streaming:
            raise NotImplementedError("LoadFromKaggle cannot load with streaming.")

    def prepare(self):
        super().prepare()
        from opendatasets import download

        self.downloader = download

    def process(self):
        with TemporaryDirectory() as temp_directory:
            self.downloader(self.url, temp_directory)
            dataset = hf_load_dataset(temp_directory, streaming=False)

        return MultiStream.from_iterables(dataset)


class LoadFromIBMCloud(Loader):
    endpoint_url_env: str
    aws_access_key_id_env: str
    aws_secret_access_key_env: str
    bucket_name: str
    data_dir: str = None

    # Can be either:
    # 1. a list of file names, the split of each file is determined by the file name pattern
    # 2. Mapping: split -> file_name, e.g. {"test" : "test.json", "train": "train.json"}
    # 3. Mapping: split -> file_names, e.g. {"test" : ["test1.json", "test2.json"], "train": ["train.json"]}
    data_files: Union[Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
    caching: bool = True

    def _download_from_cos(self, cos, bucket_name, item_name, local_file):
        logger.info(f"Downloading {item_name} from {bucket_name} COS")
        try:
            response = cos.Object(bucket_name, item_name).get()
            size = response["ContentLength"]
            body = response["Body"]
        except Exception as e:
            raise Exception(
                f"Unabled to access {item_name} in {bucket_name} in COS", e
            ) from e

        if self.loader_limit is not None:
            if item_name.endswith(".jsonl"):
                first_lines = list(
                    itertools.islice(body.iter_lines(), self.loader_limit)
                )
                with open(local_file, "wb") as downloaded_file:
                    for line in first_lines:
                        downloaded_file.write(line)
                        downloaded_file.write(b"\n")
                logger.info(
                    f"\nDownload successful limited to {self.loader_limit} lines"
                )
                return

        progress_bar = tqdm(total=size, unit="iB", unit_scale=True)

        def upload_progress(chunk):
            progress_bar.update(chunk)

        try:
            cos.Bucket(bucket_name).download_file(
                item_name, local_file, Callback=upload_progress
            )
            logger.info("\nDownload Successful")
        except Exception as e:
            raise Exception(
                f"Unabled to download {item_name} in {bucket_name}", e
            ) from e

    def prepare(self):
        super().prepare()
        self.endpoint_url = os.getenv(self.endpoint_url_env)
        self.aws_access_key_id = os.getenv(self.aws_access_key_id_env)
        self.aws_secret_access_key = os.getenv(self.aws_secret_access_key_env)
        root_dir = os.getenv("UNITXT_IBM_COS_CACHE", None) or os.getcwd()
        self.cache_dir = os.path.join(root_dir, "ibmcos_datasets")

        if not os.path.exists(self.cache_dir):
            Path(self.cache_dir).mkdir(parents=True, exist_ok=True)

    def verify(self):
        super().verify()
        assert ibm_boto3_available, "Please install ibm_boto3 in order to use the LoadFromIBMCloud loader (using `pip install ibm-cos-sdk`) "
        assert (
            self.endpoint_url is not None
        ), f"Please set the {self.endpoint_url_env} environmental variable"
        assert (
            self.aws_access_key_id is not None
        ), f"Please set {self.aws_access_key_id_env} environmental variable"
        assert (
            self.aws_secret_access_key is not None
        ), f"Please set {self.aws_secret_access_key_env} environmental variable"
        if self.streaming:
            raise NotImplementedError("LoadFromKaggle cannot load with streaming.")

    def process(self):
        cos = ibm_boto3.resource(
            "s3",
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
            endpoint_url=self.endpoint_url,
        )
        local_dir = os.path.join(
            self.cache_dir,
            self.bucket_name,
            self.data_dir,
            f"loader_limit_{self.loader_limit}",
        )
        if not os.path.exists(local_dir):
            Path(local_dir).mkdir(parents=True, exist_ok=True)
        if isinstance(self.data_files, Mapping):
            data_files_names = list(self.data_files.values())
            if not isinstance(data_files_names[0], str):
                data_files_names = list(itertools.chain(*data_files_names))
        else:
            data_files_names = self.data_files

        for data_file in data_files_names:
            local_file = os.path.join(local_dir, data_file)
            if not self.caching or not os.path.exists(local_file):
                # Build object key based on parameters. Slash character is not
                # allowed to be part of object key in IBM COS.
                object_key = (
                    self.data_dir + "/" + data_file
                    if self.data_dir is not None
                    else data_file
                )
                with tempfile.NamedTemporaryFile() as temp_file:
                    # Download to  a temporary file in same file partition, and then do an atomic move
                    self._download_from_cos(
                        cos,
                        self.bucket_name,
                        object_key,
                        local_dir + "/" + os.path.basename(temp_file.name),
                    )
                    os.rename(
                        local_dir + "/" + os.path.basename(temp_file.name),
                        local_dir + "/" + data_file,
                    )

        if isinstance(self.data_files, list):
            dataset = hf_load_dataset(local_dir, streaming=False)
        else:
            dataset = hf_load_dataset(
                local_dir, streaming=False, data_files=self.data_files
            )

        return MultiStream.from_iterables(dataset)