Source code for dask_awkward.lib.io.parquet

from __future__ import annotations

import abc
import itertools
import logging
import math
import operator
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Literal, TypeVar, cast

import awkward as ak
import awkward.operations.ak_from_parquet as ak_from_parquet
import dask
from awkward.forms.form import Form
from dask.base import tokenize
from dask.blockwise import BlockIndex
from dask.highlevelgraph import HighLevelGraph
from fsspec import AbstractFileSystem
from fsspec.core import get_fs_token_paths, url_to_fs

from dask_awkward.layers.layers import AwkwardMaterializedLayer
from dask_awkward.lib.core import Array, Scalar, map_partitions, new_scalar_object
from dask_awkward.lib.io.columnar import ColumnProjectionMixin
from dask_awkward.lib.io.io import from_map
from dask_awkward.lib.unproject_layout import unproject_layout

if TYPE_CHECKING:
    pass

log = logging.getLogger(__name__)


T = TypeVar("T")


def report_failure(exception, *args, **kwargs):
    return ak.Array(
        [
            {
                "columns": [],
                "args": [repr(a) for a in args],
                "kwargs": [[k, repr(v)] for k, v in kwargs.items()],
                "exception": type(exception).__name__,
                "message": str(exception),
            }
        ]
    )


def report_success(columns, *args, **kwargs):
    return ak.Array(
        [
            {
                "columns": columns,
                "args": [repr(a) for a in args],
                "kwargs": [[k, repr(v)] for k, v in kwargs.items()],
                "exception": None,
                "message": None,
            }
        ]
    )


class FromParquetFn(ColumnProjectionMixin):
    def __init__(
        self,
        *,
        fs: AbstractFileSystem,
        form: Any,
        listsep: str = "list.item",
        unnamed_root: bool = False,
        original_form: Form | None = None,
        report: bool = False,
        allowed_exceptions: tuple[type[BaseException], ...] = (OSError,),
        behavior: Mapping | None = None,
        attrs: Mapping[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        self.fs = fs
        self.form = form
        self.listsep = listsep
        self.unnamed_root = unnamed_root
        self.columns = self.form.columns(self.listsep)
        if self.unnamed_root:
            self.columns = [f".{c}" for c in self.columns]
        self.original_form = original_form
        self.report = report
        self.allowed_exceptions = allowed_exceptions
        self.behavior = behavior
        self.attrs = attrs
        self.kwargs = kwargs

    @abc.abstractmethod
    def __call__(self, *args, **kwargs): ...

    @abc.abstractmethod
    def project_columns(self, columns): ...

    @property
    def return_report(self) -> bool:
        return self.report

    @property
    def use_optimization(self) -> bool:
        return "parquet" in dask.config.get(
            "awkward.optimization.columns-opt-formats",
            default=[],
        )

    def __repr__(self) -> str:
        s = (
            "\nFromParquetFn(\n"
            f"  form={repr(self.form)}\n"
            f"  listsep={self.listsep}\n"
            f"  unnamed_root={self.unnamed_root}\n"
            f"  columns={self.columns}\n"
            f"  behavior={self.behavior}\n"
        )
        for key, val in self.kwargs.items():
            s += f"  {key}={val}\n"
        s = f"{s})"
        return s

    def __str__(self) -> str:
        return self.__repr__()


class FromParquetFileWiseFn(FromParquetFn):
    def __init__(
        self,
        *,
        fs: AbstractFileSystem,
        form: Any,
        listsep: str = "list.item",
        unnamed_root: bool = False,
        original_form: Form | None = None,
        behavior: Mapping | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(
            fs=fs,
            form=form,
            listsep=listsep,
            unnamed_root=unnamed_root,
            original_form=original_form,
            behavior=behavior,
            **kwargs,
        )

    def read_fn(self, source: Any) -> Any:
        layout = ak_from_parquet._load(
            [source],
            parquet_columns=self.columns,
            subrg=[None],
            subform=self.form,
            highlevel=False,
            fs=self.fs,
            behavior=self.behavior,
            attrs=self.attrs,
            **self.kwargs,
        )
        return ak.Array(
            unproject_layout(self.original_form, layout),
            attrs=self.attrs,
            behavior=self.behavior,
        )

    def __call__(self, *args, **kwargs):
        source = args[0]
        if self.return_report:
            try:
                result = self.read_fn(source)
                return result, report_success(self.columns, source)
            except self.allowed_exceptions as err:
                return self.mock_empty(), report_failure(err, source)

        return self.read_fn(source)

    def project_columns(self, columns):
        return FromParquetFileWiseFn(
            fs=self.fs,
            form=self.form.select_columns(columns),
            listsep=self.listsep,
            unnamed_root=self.unnamed_root,
            original_form=self.form,
            report=self.report,
            attrs=self.attrs,
            behavior=self.behavior,
            **self.kwargs,
        )


class FromParquetFragmentWiseFn(FromParquetFn):
    def __init__(
        self,
        *,
        fs: AbstractFileSystem,
        form: Any,
        listsep: str = "list.item",
        unnamed_root: bool = False,
        original_form: Form | None = None,
        behavior: Mapping | None = None,
        attrs: Mapping[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(
            fs=fs,
            form=form,
            listsep=listsep,
            unnamed_root=unnamed_root,
            original_form=original_form,
            behavior=behavior,
            attrs=attrs,
            **kwargs,
        )

    def __call__(self, pair: Any) -> ak.Array:
        subrg, source = pair
        if isinstance(subrg, int):
            subrg = [[subrg]]
        layout = ak_from_parquet._load(
            [source],
            parquet_columns=self.columns,
            subrg=subrg,
            subform=self.form,
            highlevel=False,
            fs=self.fs,
            behavior=self.behavior,
            attrs=self.attrs,
            **self.kwargs,
        )
        return ak.Array(
            unproject_layout(self.original_form, layout),
            behavior=self.behavior,
            attrs=self.attrs,
        )

    def project_columns(self, columns):
        return FromParquetFragmentWiseFn(
            fs=self.fs,
            form=self.form.select_columns(columns),
            unnamed_root=self.unnamed_root,
            original_form=self.form,
            report=self.report,
            behavior=self.behavior,
            attrs=self.attrs,
            **self.kwargs,
        )


[docs] def from_parquet( path: str | list[str], *, columns: str | list[str] | None = None, max_gap: int = 64_000, max_block: int = 256_000_000, footer_sample_size: int = 1_000_000, generate_bitmasks: bool = False, highlevel: bool = True, behavior: Mapping | None = None, attrs: Mapping[str, Any] | None = None, ignore_metadata: bool = True, scan_files: bool = False, split_row_groups: bool | None = False, storage_options: dict[str, Any] | None = None, report: bool = False, ) -> Array | tuple[Array, Array]: """Create an Array collection from a Parquet dataset. See :func:`ak.from_parquet` for more information. Parameters ---------- path Local directory containing parquet files, remote URL directory containing Parquet files, or explicit list of Parquet files, passed to fsspec for resolution. May contain glob patterns. columns See :func:`ak.from_parquet` max_gap See :func:`ak.from_parquet` max_block See :func:`ak.from_parquet` footer_sample_size See :func:`ak.from_parquet` generate_bitmasks See :func:`ak.from_parquet` highlevel Argument specific to awkward-array that is always ``True`` for dask-awkward. behavior See :func:`ak.from_parquet` ignore_metadata If ``True``, ignore Parquet metadata file (if it exists). scan_files Scan files when parsing metadata. split_row_groups If True, each row group becomes a partition. If False, each file becomes a partition. If None, the existence of a ``_metadata`` file and ignore_metadata=False implies True, else ``False``. storage_options Storage options passed to fsspec. Returns ------- Array Collection represented by the Parquet data on disk. """ if not highlevel: raise ValueError("dask-awkward only supports highlevel=True") fs, token, paths = get_fs_token_paths( path, mode="rb", storage_options=storage_options, ) label = "from-parquet" token = tokenize( token, paths, columns, max_gap, max_block, footer_sample_size, generate_bitmasks, behavior, ignore_metadata, scan_files, split_row_groups, behavior, attrs, ) ( parquet_columns, subform, actual_paths, fs, subrg, row_counts, metadata, ) = ak_from_parquet.metadata( path, storage_options, row_groups=None, columns=columns, ignore_metadata=ignore_metadata, scan_files=scan_files, ) listsep = "list.item" unnamed_root = False for c in parquet_columns: if ".list.element" in c: listsep = "list.element" break if c.startswith("."): unnamed_root = True if split_row_groups is None: split_row_groups = row_counts is not None and len(row_counts) > 1 if split_row_groups is False or subrg is None: # file-wise return from_map( FromParquetFileWiseFn( fs=fs, form=subform, listsep=listsep, unnamed_root=unnamed_root, max_gap=max_gap, max_block=max_block, footer_sample_size=footer_sample_size, generate_bitmasks=generate_bitmasks, behavior=behavior, attrs=attrs, report=report, ), actual_paths, label=label, token=token, ) else: # row-group wise if set(subrg) == {None}: rgs_paths = {path: 0 for path in actual_paths} for i in range(metadata.num_row_groups): fp = metadata.row_group(i).column(0).file_path rgs_path = [p for p in rgs_paths if fp in p][ 0 ] # returns 1st if fp is empty rgs_paths[rgs_path] += 1 subrg = [list(range(rgs_paths[_])) for _ in actual_paths] rgs = [metadata.row_group(i) for i in range(metadata.num_row_groups)] divisions = [0] + list( itertools.accumulate([rg.num_rows for rg in rgs], operator.add) ) pairs = [] for isubrg, path in zip(subrg, actual_paths): pairs.extend([(irg, path) for irg in isubrg]) return cast( Array, from_map( FromParquetFragmentWiseFn( fs=fs, form=subform, listsep=listsep, unnamed_root=unnamed_root, max_gap=max_gap, max_block=max_block, footer_sample_size=footer_sample_size, generate_bitmasks=generate_bitmasks, behavior=behavior, attrs=attrs, ), pairs, label=label, token=token, divisions=tuple(divisions), ), )
def _metadata_file_from_data_files(path_list, fs, out_path): """ Aggregate _metadata and _common_metadata from data files Maybe only used in testing (similar to fastparquet's merge) path_list: list[str] Input data files fs: AbstractFileSystem instance out_path: str Root directory of the dataset """ import pyarrow.parquet as pq meta = None out_path = out_path.rstrip("/") for path in path_list: assert path.startswith(out_path) with fs.open(path, "rb") as f: _meta = pq.ParquetFile(f).metadata _meta.set_file_path(path[len(out_path) + 1 :]) if meta: meta.append_row_groups(_meta) else: meta = _meta _write_metadata(fs, out_path, meta) def _metadata_file_from_metas(fs, out_path, *metas): """Agregate metadata from arrow objects and write""" meta = metas[0] for _meta in metas[1:]: meta.append_row_groups(_meta) _write_metadata(fs, out_path, meta) def _write_metadata(fs, out_path, meta): """Output metadata files""" metadata_path = "/".join([out_path, "_metadata"]) with fs.open(metadata_path, "wb") as fil: meta.write_metadata_file(fil) metadata_path = "/".join([out_path, "_metadata"]) with fs.open(metadata_path, "wb") as fil: meta.write_metadata_file(fil) class _ToParquetFn: def __init__( self, fs: AbstractFileSystem, path: str, npartitions: int, prefix: str | None = None, storage_options: dict | None = None, **kwargs: Any, ): self.fs = fs self.path = path self.prefix = prefix self.zfill = math.ceil(math.log(npartitions, 10)) self.storage_options = storage_options self.fs.mkdirs(self.path, exist_ok=True) self.protocol = ( self.fs.protocol if isinstance(self.fs.protocol, str) else self.fs.protocol[0] ) self.kwargs = kwargs def __call__(self, data, block_index): filename = f"part{str(block_index[0]).zfill(self.zfill)}.parquet" if self.prefix is not None: filename = f"{self.prefix}-{filename}" filename = self.fs.unstrip_protocol(f"{self.path}{self.fs.sep}{filename}") return ak.to_parquet( data, filename, **self.kwargs, storage_options=self.storage_options )
[docs] def to_parquet( array: Array, destination: str, list_to32: bool = False, string_to32: bool = True, bytestring_to32: bool = True, emptyarray_to: Any | None = None, categorical_as_dictionary: bool = False, extensionarray: bool = False, count_nulls: bool = True, compression: str | dict | None = "zstd", compression_level: int | dict | None = None, row_group_size: int | None = 64 * 1024 * 1024, data_page_size: int | None = None, parquet_flavor: Literal["spark"] | None = None, parquet_version: Literal["1.0"] | Literal["2.4"] | Literal["2.6"] = "2.4", parquet_page_version: Literal["1.0"] | Literal["2.0"] = "1.0", parquet_metadata_statistics: bool | dict = True, parquet_dictionary_encoding: bool | dict = False, parquet_byte_stream_split: bool | dict = False, parquet_coerce_timestamps: Literal["ms"] | Literal["us"] | None = None, parquet_old_int96_timestamps: bool | None = None, parquet_compliant_nested: bool = False, parquet_extra_options: dict | None = None, storage_options: dict[str, Any] | None = None, write_metadata: bool = False, compute: bool = True, prefix: str | None = None, ) -> Scalar | None: """Write data to Parquet format. This will create one output file per partition. See the documentation for :func:`ak.to_parquet` for more information; there are many optional function arguments that are described in that documentation. Parameters ---------- array The :obj:`dask_awkward.Array` collection to write to disk. destination Where to store the output; this can be a local filesystem path or a remote filesystem path. list_to32 See :func:`ak.to_parquet` string_to32 See :func:`ak.to_parquet` bytestring_to32 See :func:`ak.to_parquet` emptyarray_to See :func:`ak.to_parquet` categorical_as_dictionary See :func:`ak.to_parquet` extensionarray See :func:`ak.to_parquet` count_nulls See :func:`ak.to_parquet` compression See :func:`ak.to_parquet` compression_level See :func:`ak.to_parquet` row_group_size See :func:`ak.to_parquet` data_page_size See :func:`ak.to_parquet` parquet_flavor See :func:`ak.to_parquet` parquet_version See :func:`ak.to_parquet` parquet_page_version See :func:`ak.to_parquet` parquet_metadata_statistics See :func:`ak.to_parquet` parquet_dictionary_encoding See :func:`ak.to_parquet` parquet_byte_stream_split See :func:`ak.to_parquet` parquet_coerce_timestamps See :func:`ak.to_parquet` parquet_old_int96_timestamps See :func:`ak.to_parquet` parquet_compliant_nested See :func:`ak.to_parquet` parquet_extra_options See :func:`ak.to_parquet` storage_options Storage options passed to ``fsspec``. write_metadata Write Parquet metadata. compute If ``True``, immediately compute the result (write data to disk). If ``False`` a Scalar collection will be returned such that ``compute`` can be explicitly called. prefix An addition prefix for output files. If ``None`` all parts inside the destination directory will be named ``"partN.parquet"``; if defined, the names will be ``f"{prefix}-partN.parquet"``. Returns ------- Scalar | None If ``compute`` is ``False`` a :obj:`dask_awkward.Scalar` object is returned such that it can be computed later. If ``compute`` is ``True``, the collection is immediately computed (and data will be written to disk) and ``None`` is returned. Examples -------- >>> import awkward as ak >>> import dask_awkward as dak >>> a = ak.Array([{"a": [1, 2, 3]}, {"a": [4, 5]}]) >>> d = dak.from_awkward(a, npartitions=2) >>> d.npartitions 2 >>> dak.to_parquet(d, "/tmp/my-output", prefix="data") >>> import os >>> os.listdir("/tmp/my-output") ['data-part0.parquet', 'data-part1.parquet'] """ # TODO options we need: # - byte stream split for floats if compression is not None or lzma # - partitioning # - dict encoding always off fs, path = url_to_fs(destination, **(storage_options or {})) name = f"write-parquet-{tokenize(fs, array, destination)}" map_res = map_partitions( _ToParquetFn( fs=fs, path=path, npartitions=array.npartitions, prefix=prefix, list_to32=list_to32, string_to32=string_to32, bytestring_to32=bytestring_to32, emptyarray_to=emptyarray_to, categorical_as_dictionary=categorical_as_dictionary, extensionarray=extensionarray, count_nulls=count_nulls, compression=compression, compression_level=compression_level, row_group_size=row_group_size, data_page_size=data_page_size, parquet_flavor=parquet_flavor, parquet_version=parquet_version, parquet_page_version=parquet_page_version, parquet_metadata_statistics=parquet_metadata_statistics, parquet_dictionary_encoding=parquet_dictionary_encoding, parquet_byte_stream_split=parquet_byte_stream_split, parquet_coerce_timestamps=parquet_coerce_timestamps, parquet_old_int96_timestamps=parquet_old_int96_timestamps, parquet_compliant_nested=parquet_compliant_nested, parquet_extra_options=parquet_extra_options, ), array, BlockIndex((array.npartitions,)), label="to-parquet", meta=array._meta, ) map_res.dask.layers[map_res.name].annotations = {"ak_output": True} dsk = {} if write_metadata: final_name = name + "-metadata" dsk[(final_name, 0)] = (_metadata_file_from_metas, fs, path) + tuple( map_res.__dask_keys__() ) else: final_name = name + "-finalize" dsk[(final_name, 0)] = (lambda *_: None, map_res.__dask_keys__()) graph = HighLevelGraph.from_collections( final_name, AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]), dependencies=[map_res], ) out = new_scalar_object(graph, final_name, dtype="f8") if compute: out.compute() return None else: return out