| import io |
| import json |
| import warnings |
| from typing import Literal |
|
|
| import fsspec |
|
|
| from .core import url_to_fs |
| from .spec import AbstractBufferedFile |
| from .utils import merge_offset_ranges |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| class AlreadyBufferedFile(AbstractBufferedFile): |
| def _fetch_range(self, start, end): |
| raise NotImplementedError |
|
|
|
|
| def open_parquet_files( |
| path: list[str], |
| mode: Literal["rb"] = "rb", |
| fs: None | fsspec.AbstractFileSystem = None, |
| metadata=None, |
| columns: None | list[str] = None, |
| row_groups: None | list[int] = None, |
| storage_options: None | dict = None, |
| engine: str = "auto", |
| max_gap: int = 64_000, |
| max_block: int = 256_000_000, |
| footer_sample_size: int = 1_000_000, |
| filters: None | list[list[list[str]]] = None, |
| **kwargs, |
| ): |
| """ |
| Return a file-like object for a single Parquet file. |
| |
| The specified parquet `engine` will be used to parse the |
| footer metadata, and determine the required byte ranges |
| from the file. The target path will then be opened with |
| the "parts" (`KnownPartsOfAFile`) caching strategy. |
| |
| Note that this method is intended for usage with remote |
| file systems, and is unlikely to improve parquet-read |
| performance on local file systems. |
| |
| Parameters |
| ---------- |
| path: str |
| Target file path. |
| mode: str, optional |
| Mode option to be passed through to `fs.open`. Default is "rb". |
| metadata: Any, optional |
| Parquet metadata object. Object type must be supported |
| by the backend parquet engine. For now, only the "fastparquet" |
| engine supports an explicit `ParquetFile` metadata object. |
| If a metadata object is supplied, the remote footer metadata |
| will not need to be transferred into local memory. |
| fs: AbstractFileSystem, optional |
| Filesystem object to use for opening the file. If nothing is |
| specified, an `AbstractFileSystem` object will be inferred. |
| engine : str, default "auto" |
| Parquet engine to use for metadata parsing. Allowed options |
| include "fastparquet", "pyarrow", and "auto". The specified |
| engine must be installed in the current environment. If |
| "auto" is specified, and both engines are installed, |
| "fastparquet" will take precedence over "pyarrow". |
| columns: list, optional |
| List of all column names that may be read from the file. |
| row_groups : list, optional |
| List of all row-groups that may be read from the file. This |
| may be a list of row-group indices (integers), or it may be |
| a list of `RowGroup` metadata objects (if the "fastparquet" |
| engine is used). |
| storage_options : dict, optional |
| Used to generate an `AbstractFileSystem` object if `fs` was |
| not specified. |
| max_gap : int, optional |
| Neighboring byte ranges will only be merged when their |
| inter-range gap is <= `max_gap`. Default is 64KB. |
| max_block : int, optional |
| Neighboring byte ranges will only be merged when the size of |
| the aggregated range is <= `max_block`. Default is 256MB. |
| footer_sample_size : int, optional |
| Number of bytes to read from the end of the path to look |
| for the footer metadata. If the sampled bytes do not contain |
| the footer, a second read request will be required, and |
| performance will suffer. Default is 1MB. |
| filters : list[list], optional |
| List of filters to apply to prevent reading row groups, of the |
| same format as accepted by the loading engines. Ignored if |
| ``row_groups`` is specified. |
| **kwargs : |
| Optional key-word arguments to pass to `fs.open` |
| """ |
|
|
| |
| |
| if fs is None: |
| path0 = path |
| if isinstance(path, (list, tuple)): |
| path = path[0] |
| fs, path = url_to_fs(path, **(storage_options or {})) |
| else: |
| path0 = path |
|
|
| |
| |
| if columns is not None and len(columns) == 0: |
| columns = None |
|
|
| |
| engine = _set_engine(engine) |
|
|
| if isinstance(path0, (list, tuple)): |
| paths = path0 |
| elif "*" in path: |
| paths = fs.glob(path) |
| elif path0.endswith("/"): |
| paths = [ |
| _ |
| for _ in fs.find(path, withdirs=False, detail=False) |
| if _.endswith((".parquet", ".parq")) |
| ] |
| else: |
| paths = [path] |
|
|
| data = _get_parquet_byte_ranges( |
| paths, |
| fs, |
| metadata=metadata, |
| columns=columns, |
| row_groups=row_groups, |
| engine=engine, |
| max_gap=max_gap, |
| max_block=max_block, |
| footer_sample_size=footer_sample_size, |
| filters=filters, |
| ) |
|
|
| |
| options = kwargs.pop("cache_options", {}).copy() |
| return [ |
| AlreadyBufferedFile( |
| fs=None, |
| path=fn, |
| mode=mode, |
| cache_type="parts", |
| cache_options={ |
| **options, |
| "data": data.get(fn, {}), |
| }, |
| size=max(_[1] for _ in data.get(fn, {})), |
| **kwargs, |
| ) |
| for fn in data |
| ] |
|
|
|
|
| def open_parquet_file(*args, **kwargs): |
| """Create files tailed to reading specific parts of parquet files |
| |
| Please see ``open_parquet_files`` for details of the arguments. The |
| difference is, this function always returns a single ``AleadyBufferedFile``, |
| whereas `open_parquet_files`` always returns a list of files, even if |
| there are one or zero matching parquet files. |
| """ |
| return open_parquet_files(*args, **kwargs)[0] |
|
|
|
|
| def _get_parquet_byte_ranges( |
| paths, |
| fs, |
| metadata=None, |
| columns=None, |
| row_groups=None, |
| max_gap=64_000, |
| max_block=256_000_000, |
| footer_sample_size=1_000_000, |
| engine="auto", |
| filters=None, |
| ): |
| """Get a dictionary of the known byte ranges needed |
| to read a specific column/row-group selection from a |
| Parquet dataset. Each value in the output dictionary |
| is intended for use as the `data` argument for the |
| `KnownPartsOfAFile` caching strategy of a single path. |
| """ |
|
|
| |
| if isinstance(engine, str): |
| engine = _set_engine(engine) |
|
|
| |
| if metadata is not None: |
| |
| |
| return _get_parquet_byte_ranges_from_metadata( |
| metadata, |
| fs, |
| engine, |
| columns=columns, |
| row_groups=row_groups, |
| max_gap=max_gap, |
| max_block=max_block, |
| filters=filters, |
| ) |
|
|
| |
| file_sizes = fs.sizes(paths) |
|
|
| |
| result = {} |
| data_paths = [] |
| data_starts = [] |
| data_ends = [] |
| add_header_magic = True |
| if columns is None and row_groups is None and filters is None: |
| |
| |
| |
| |
| for i, path in enumerate(paths): |
| result[path] = {} |
| data_paths.append(path) |
| data_starts.append(0) |
| data_ends.append(file_sizes[i]) |
| add_header_magic = False |
| else: |
| |
| |
| |
| |
| |
| footer_starts = [] |
| footer_ends = [] |
| for i, path in enumerate(paths): |
| footer_ends.append(file_sizes[i]) |
| sample_size = max(0, file_sizes[i] - footer_sample_size) |
| footer_starts.append(sample_size) |
| footer_samples = fs.cat_ranges(paths, footer_starts, footer_ends) |
|
|
| |
| missing_footer_starts = footer_starts.copy() |
| large_footer = 0 |
| for i, path in enumerate(paths): |
| footer_size = int.from_bytes(footer_samples[i][-8:-4], "little") |
| real_footer_start = file_sizes[i] - (footer_size + 8) |
| if real_footer_start < footer_starts[i]: |
| missing_footer_starts[i] = real_footer_start |
| large_footer = max(large_footer, (footer_size + 8)) |
| if large_footer: |
| warnings.warn( |
| f"Not enough data was used to sample the parquet footer. " |
| f"Try setting footer_sample_size >= {large_footer}." |
| ) |
| for i, block in enumerate( |
| fs.cat_ranges( |
| paths, |
| missing_footer_starts, |
| footer_starts, |
| ) |
| ): |
| footer_samples[i] = block + footer_samples[i] |
| footer_starts[i] = missing_footer_starts[i] |
|
|
| |
| for i, path in enumerate(paths): |
| |
| path_data_starts, path_data_ends = engine._parquet_byte_ranges( |
| columns, |
| row_groups=row_groups, |
| footer=footer_samples[i], |
| footer_start=footer_starts[i], |
| filters=filters, |
| ) |
|
|
| data_paths += [path] * len(path_data_starts) |
| data_starts += path_data_starts |
| data_ends += path_data_ends |
| result.setdefault(path, {})[(footer_starts[i], file_sizes[i])] = ( |
| footer_samples[i] |
| ) |
|
|
| |
| data_paths, data_starts, data_ends = merge_offset_ranges( |
| data_paths, |
| data_starts, |
| data_ends, |
| max_gap=max_gap, |
| max_block=max_block, |
| sort=False, |
| ) |
|
|
| |
| for i, path in enumerate(paths): |
| result[path] = {(footer_starts[i], footer_ends[i]): footer_samples[i]} |
|
|
| |
| _transfer_ranges(fs, result, data_paths, data_starts, data_ends) |
|
|
| |
| if add_header_magic: |
| _add_header_magic(result) |
|
|
| return result |
|
|
|
|
| def _get_parquet_byte_ranges_from_metadata( |
| metadata, |
| fs, |
| engine, |
| columns=None, |
| row_groups=None, |
| max_gap=64_000, |
| max_block=256_000_000, |
| filters=None, |
| ): |
| """Simplified version of `_get_parquet_byte_ranges` for |
| the case that an engine-specific `metadata` object is |
| provided, and the remote footer metadata does not need to |
| be transferred before calculating the required byte ranges. |
| """ |
|
|
| |
| data_paths, data_starts, data_ends = engine._parquet_byte_ranges( |
| columns, row_groups=row_groups, metadata=metadata, filters=filters |
| ) |
|
|
| |
| data_paths, data_starts, data_ends = merge_offset_ranges( |
| data_paths, |
| data_starts, |
| data_ends, |
| max_gap=max_gap, |
| max_block=max_block, |
| sort=False, |
| ) |
|
|
| |
| result = {fn: {} for fn in list(set(data_paths))} |
| _transfer_ranges(fs, result, data_paths, data_starts, data_ends) |
|
|
| |
| _add_header_magic(result) |
|
|
| return result |
|
|
|
|
| def _transfer_ranges(fs, blocks, paths, starts, ends): |
| |
| ranges = (paths, starts, ends) |
| for path, start, stop, data in zip(*ranges, fs.cat_ranges(*ranges)): |
| blocks[path][(start, stop)] = data |
|
|
|
|
| def _add_header_magic(data): |
| |
| for path in list(data.keys()): |
| add_magic = True |
| for k in data[path]: |
| if k[0] == 0 and k[1] >= 4: |
| add_magic = False |
| break |
| if add_magic: |
| data[path][(0, 4)] = b"PAR1" |
|
|
|
|
| def _set_engine(engine_str): |
| |
| if engine_str == "auto": |
| try_engines = ("fastparquet", "pyarrow") |
| elif not isinstance(engine_str, str): |
| raise ValueError( |
| "Failed to set parquet engine! " |
| "Please pass 'fastparquet', 'pyarrow', or 'auto'" |
| ) |
| elif engine_str not in ("fastparquet", "pyarrow"): |
| raise ValueError(f"{engine_str} engine not supported by `fsspec.parquet`") |
| else: |
| try_engines = [engine_str] |
|
|
| |
| |
| for engine in try_engines: |
| try: |
| if engine == "fastparquet": |
| return FastparquetEngine() |
| elif engine == "pyarrow": |
| return PyarrowEngine() |
| except ImportError: |
| pass |
|
|
| |
| |
| raise ImportError( |
| f"The following parquet engines are not installed " |
| f"in your python environment: {try_engines}." |
| f"Please install 'fastparquert' or 'pyarrow' to " |
| f"utilize the `fsspec.parquet` module." |
| ) |
|
|
|
|
| class FastparquetEngine: |
| |
| |
| |
| |
| |
|
|
| def __init__(self): |
| import fastparquet as fp |
|
|
| self.fp = fp |
|
|
| def _row_group_filename(self, row_group, pf): |
| return pf.row_group_filename(row_group) |
|
|
| def _parquet_byte_ranges( |
| self, |
| columns, |
| row_groups=None, |
| metadata=None, |
| footer=None, |
| footer_start=None, |
| filters=None, |
| ): |
| |
| pf = metadata |
| data_paths, data_starts, data_ends = [], [], [] |
| if filters and row_groups: |
| raise ValueError("filters and row_groups cannot be used together") |
| if pf is None: |
| pf = self.fp.ParquetFile(io.BytesIO(footer)) |
|
|
| |
| |
| column_set = None if columns is None else {c.split(".", 1)[0] for c in columns} |
| if column_set is not None and hasattr(pf, "pandas_metadata"): |
| md_index = [ |
| ind |
| for ind in pf.pandas_metadata.get("index_columns", []) |
| |
| if not isinstance(ind, dict) |
| ] |
| column_set |= set(md_index) |
|
|
| |
| |
| if filters: |
| from fastparquet.api import filter_row_groups |
|
|
| row_group_indices = None |
| row_groups = filter_row_groups(pf, filters) |
| elif row_groups and not isinstance(row_groups[0], int): |
| |
| row_group_indices = None |
| else: |
| |
| row_group_indices = row_groups |
| row_groups = pf.row_groups |
|
|
| |
| for r, row_group in enumerate(row_groups): |
| |
| |
| if row_group_indices is None or r in row_group_indices: |
| |
| fn = self._row_group_filename(row_group, pf) |
|
|
| for column in row_group.columns: |
| name = column.meta_data.path_in_schema[0] |
| |
| |
| if column_set is None or name in column_set: |
| file_offset0 = column.meta_data.dictionary_page_offset |
| if file_offset0 is None: |
| file_offset0 = column.meta_data.data_page_offset |
| num_bytes = column.meta_data.total_compressed_size |
| if footer_start is None or file_offset0 < footer_start: |
| data_paths.append(fn) |
| data_starts.append(file_offset0) |
| data_ends.append( |
| min( |
| file_offset0 + num_bytes, |
| footer_start or (file_offset0 + num_bytes), |
| ) |
| ) |
|
|
| if metadata: |
| |
| |
| return data_paths, data_starts, data_ends |
| return data_starts, data_ends |
|
|
|
|
| class PyarrowEngine: |
| |
| |
| |
| |
| |
|
|
| def __init__(self): |
| import pyarrow.parquet as pq |
|
|
| self.pq = pq |
|
|
| def _row_group_filename(self, row_group, metadata): |
| raise NotImplementedError |
|
|
| def _parquet_byte_ranges( |
| self, |
| columns, |
| row_groups=None, |
| metadata=None, |
| footer=None, |
| footer_start=None, |
| filters=None, |
| ): |
| if metadata is not None: |
| raise ValueError("metadata input not supported for PyarrowEngine") |
| if filters: |
| raise NotImplementedError |
|
|
| data_starts, data_ends = [], [] |
| md = self.pq.ParquetFile(io.BytesIO(footer)).metadata |
|
|
| |
| |
| column_set = None if columns is None else set(columns) |
| if column_set is not None: |
| schema = md.schema.to_arrow_schema() |
| has_pandas_metadata = ( |
| schema.metadata is not None and b"pandas" in schema.metadata |
| ) |
| if has_pandas_metadata: |
| md_index = [ |
| ind |
| for ind in json.loads( |
| schema.metadata[b"pandas"].decode("utf8") |
| ).get("index_columns", []) |
| |
| if not isinstance(ind, dict) |
| ] |
| column_set |= set(md_index) |
|
|
| |
| for r in range(md.num_row_groups): |
| |
| |
| if row_groups is None or r in row_groups: |
| row_group = md.row_group(r) |
| for c in range(row_group.num_columns): |
| column = row_group.column(c) |
| name = column.path_in_schema |
| |
| |
| split_name = name.split(".")[0] |
| if ( |
| column_set is None |
| or name in column_set |
| or split_name in column_set |
| ): |
| file_offset0 = column.dictionary_page_offset |
| if file_offset0 is None: |
| file_offset0 = column.data_page_offset |
| num_bytes = column.total_compressed_size |
| if file_offset0 < footer_start: |
| data_starts.append(file_offset0) |
| data_ends.append( |
| min(file_offset0 + num_bytes, footer_start) |
| ) |
| return data_starts, data_ends |
|
|