|
import os |
|
from tempfile import TemporaryDirectory |
|
from typing import Dict, Mapping, Optional, Sequence, Union |
|
|
|
import pandas as pd |
|
from datasets import load_dataset as hf_load_dataset |
|
from tqdm import tqdm |
|
|
|
from .operator import SourceOperator |
|
from .stream import MultiStream, Stream |
|
|
|
try: |
|
import ibm_boto3 |
|
from ibm_botocore.client import ClientError |
|
|
|
ibm_boto3_available = True |
|
except ImportError: |
|
ibm_boto3_available = False |
|
|
|
|
|
class Loader(SourceOperator): |
|
pass |
|
|
|
|
|
class LoadHF(Loader): |
|
path: str |
|
name: Optional[str] = None |
|
data_dir: Optional[str] = None |
|
data_files: Optional[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]] = None |
|
streaming: bool = True |
|
cached = False |
|
|
|
def process(self): |
|
dataset = hf_load_dataset( |
|
self.path, name=self.name, data_dir=self.data_dir, data_files=self.data_files, streaming=self.streaming |
|
) |
|
|
|
return MultiStream.from_iterables(dataset) |
|
|
|
|
|
class LoadCSV(Loader): |
|
files: Dict[str, str] |
|
chunksize: int = 1000 |
|
|
|
def load_csv(self, file): |
|
for chunk in pd.read_csv(file, chunksize=self.chunksize): |
|
for index, row in chunk.iterrows(): |
|
yield row.to_dict() |
|
|
|
def process(self): |
|
return MultiStream( |
|
{name: Stream(generator=self.load_csv, gen_kwargs={"file": file}) for name, file in self.files.items()} |
|
) |
|
|
|
|
|
class LoadFromIBMCloud(Loader): |
|
endpoint_url_env: str |
|
aws_access_key_id_env: str |
|
aws_secret_access_key_env: str |
|
bucket_name: str |
|
data_dir: str |
|
data_files: Sequence[str] |
|
|
|
def _download_from_cos(self, cos, bucket_name, item_name, local_file): |
|
print(f"Downloading {item_name} from {bucket_name} COS to {local_file}") |
|
try: |
|
response = cos.Object(bucket_name, item_name).get() |
|
size = response["ContentLength"] |
|
except Exception as e: |
|
raise Exception(f"Unabled to access {item_name} in {bucket_name} in COS", e) |
|
|
|
progress_bar = tqdm(total=size, unit="iB", unit_scale=True) |
|
|
|
def upload_progress(chunk): |
|
progress_bar.update(chunk) |
|
|
|
try: |
|
cos.Bucket(bucket_name).download_file(item_name, local_file, Callback=upload_progress) |
|
print("\nDownload Successful") |
|
except Exception as e: |
|
raise Exception(f"Unabled to download {item_name} in {bucket_name}", e) |
|
|
|
def prepare(self): |
|
super().prepare() |
|
self.endpoint_url = os.getenv(self.endpoint_url_env) |
|
self.aws_access_key_id = os.getenv(self.aws_access_key_id_env) |
|
self.aws_secret_access_key = os.getenv(self.aws_secret_access_key_env) |
|
|
|
def verify(self): |
|
super().verify() |
|
assert ( |
|
ibm_boto3_available |
|
), f"Please install ibm_boto3 in order to use the LoadFromIBMCloud loader (using `pip install ibm-cos-sdk`) " |
|
assert self.endpoint_url is not None, f"Please set the {self.endpoint_url_env} environmental variable" |
|
assert self.aws_access_key_id is not None, f"Please set {self.aws_access_key_id_env} environmental variable" |
|
assert ( |
|
self.aws_secret_access_key is not None |
|
), f"Please set {self.aws_secret_access_key_env} environmental variable" |
|
|
|
def process(self): |
|
cos = ibm_boto3.resource( |
|
"s3", |
|
aws_access_key_id=self.aws_access_key_id, |
|
aws_secret_access_key=self.aws_secret_access_key, |
|
endpoint_url=self.endpoint_url, |
|
) |
|
|
|
with TemporaryDirectory() as temp_directory: |
|
for data_file in self.data_files: |
|
self._download_from_cos( |
|
cos, self.bucket_name, self.data_dir + "/" + data_file, temp_directory + "/" + data_file |
|
) |
|
dataset = hf_load_dataset(temp_directory, streaming=False) |
|
|
|
return MultiStream.from_iterables(dataset) |
|
|