Source code for dask_awkward.lib.io.parquet

from __future__ import annotations

import abc
import itertools
import logging
import math
import operator
from typing import Any, Sequence

import awkward as ak
import fsspec
from awkward.forms.form import Form
from awkward.operations import ak_from_parquet, to_arrow_table
from awkward.operations.ak_from_parquet import _load
from dask.base import tokenize
from dask.blockwise import BlockIndex
from dask.highlevelgraph import HighLevelGraph
from fsspec import AbstractFileSystem
from fsspec.core import get_fs_token_paths

from dask_awkward.lib.core import (
    Array,
    Scalar,
    map_partitions,
    new_scalar_object,
    typetracer_array,
)
from dask_awkward.lib.io.io import from_map
from dask_awkward.lib.unproject_layout import unproject_layout

log = logging.getLogger(__name__)


class _FromParquetFn:
    def __init__(
        self,
        *,
        fs: AbstractFileSystem,
        schema: Any,
        listsep: str = "list.item",
        unnamed_root: bool = False,
        original_form: Form | None = None,
    ) -> None:
        self.fs = fs
        self.schema = schema
        self.listsep = listsep
        self.unnamed_root = unnamed_root
        self.columns = self.schema.columns(self.listsep)
        if self.unnamed_root:
            self.columns = [f".{c}" for c in self.columns]
        self.original_form = original_form

    @abc.abstractmethod
    def __call__(self, source: Any) -> ak.Array:
        ...

    @abc.abstractmethod
    def project_columns(
        self,
        columns: Sequence[str] | None,
        orignal_form: Form | None = None,
    ) -> _FromParquetFn:
        ...

    def __repr__(self) -> str:
        s = (
            "\nFromParquetFn(\n"
            f"  schema={repr(self.schema)}\n"
            f"  listsep={self.listsep}\n"
            f"  unnamed_root={self.unnamed_root}\n"
            f"  columns={self.columns}\n"
        )
        return s

    def __str__(self) -> str:
        return self.__repr__()


class _FromParquetFileWiseFn(_FromParquetFn):
    def __init__(
        self,
        *,
        fs: AbstractFileSystem,
        schema: Any,
        listsep: str = "list.item",
        unnamed_root: bool = False,
        original_form: Form | None = None,
    ) -> None:
        super().__init__(
            fs=fs,
            schema=schema,
            listsep=listsep,
            unnamed_root=unnamed_root,
            original_form=original_form,
        )

    def __call__(self, source: Any) -> Any:
        array = _file_to_partition(
            source,
            self.fs,
            self.columns,
            self.schema,
        )
        return ak.Array(unproject_layout(self.original_form, array.layout))

    def project_columns(
        self,
        columns: Sequence[str] | None,
        original_form: Form | None = None,
    ) -> _FromParquetFileWiseFn:
        if columns is None:
            return self

        new_schema = self.schema.select_columns(columns)
        new = _FromParquetFileWiseFn(
            fs=self.fs,
            schema=new_schema,
            listsep=self.listsep,
            unnamed_root=self.unnamed_root,
            original_form=original_form,
        )

        log.debug(f"project_columns received: {columns}")
        log.debug(f"new schema is {repr(new_schema)}")
        log.debug(f"new schema columns are: {new_schema.columns(self.listsep)}")
        log.debug(new)

        return new


class _FromParquetFragmentWiseFn(_FromParquetFn):
    def __init__(
        self,
        *,
        fs: AbstractFileSystem,
        schema: Any,
        listsep: str = "list.item",
        unnamed_root: bool = False,
        original_form: Form | None = None,
    ) -> None:
        super().__init__(
            fs=fs,
            schema=schema,
            listsep=listsep,
            unnamed_root=unnamed_root,
            original_form=original_form,
        )

    def __call__(self, pair: Any) -> ak.Array:
        subrg, source = pair
        if isinstance(subrg, int):
            subrg = [[subrg]]
        array = _file_to_partition(
            source,
            self.fs,
            self.columns,
            self.schema,
            subrg=subrg,
        )
        return ak.Array(unproject_layout(self.original_form, array.layout))

    def project_columns(
        self,
        columns: Sequence[str] | None,
        original_form: Form | None = None,
    ) -> _FromParquetFragmentWiseFn:
        if columns is None:
            return self
        return _FromParquetFragmentWiseFn(
            fs=self.fs,
            schema=self.schema.select_columns(columns),
            unnamed_root=self.unnamed_root,
            original_form=original_form,
        )


[docs]def from_parquet( path: Any, storage_options: dict | None = None, ignore_metadata: bool = True, scan_files: bool = False, columns: Sequence[str] | None = None, filters: Any | None = None, split_row_groups: Any | None = None, ) -> Array: """Create an Array collection from a Parquet dataset. Parameters ---------- url : str Location of data, including protocol (e.g. ``s3://``) storage_options : dict For creating filesystem (see ``fsspec`` documentation). ignore_metadata : bool Ignore parquet metadata associated with the input dataset (the ``_metadata`` file). scan_files : bool TBD columns : list[str], optional Select columns to load filters : list[list[tuple]], optional Parquet-style filters for excluding row groups based on column statistics split_row_groups: bool, optional If True, each row group becomes a partition. If False, each file becomes a partition. If None, the existence of a ``_metadata`` file and ignore_metadata=False implies True, else False. Returns ------- Array Array collection from the parquet dataset. """ fs, tok, paths = get_fs_token_paths( path, mode="rb", storage_options=storage_options ) label = "from-parquet" token = tokenize( tok, paths, ignore_metadata, columns, filters, scan_files, split_row_groups ) # same as ak_metadata_from_parquet results = ak_from_parquet.metadata( path, storage_options, row_groups=None, columns=columns, ignore_metadata=ignore_metadata, scan_files=scan_files, ) parquet_columns, subform, actual_paths, fs, subrg, row_counts, metadata = results listsep = "list.item" unnamed_root = False for c in parquet_columns: if ".list.element." in c: listsep = "list.element" break if c.startswith("."): unnamed_root = True if split_row_groups is None: split_row_groups = row_counts is not None and len(row_counts) > 1 meta = ak.Array( subform.length_zero_array(highlevel=False).to_typetracer(forget_length=True) ) if split_row_groups is False or subrg is None: # file-wise return from_map( _FromParquetFileWiseFn( fs=fs, schema=subform, listsep=listsep, unnamed_root=unnamed_root, ), actual_paths, label=label, token=token, meta=typetracer_array(meta), ) else: # row-group wise if set(subrg) == {None}: rgs_paths = {path: 0 for path in actual_paths} for i in range(metadata.num_row_groups): fp = metadata.row_group(i).column(0).file_path rgs_path = [p for p in rgs_paths if fp in p][ 0 ] # returns 1st if fp is empty rgs_paths[rgs_path] += 1 subrg = [list(range(rgs_paths[_])) for _ in actual_paths] rgs = [metadata.row_group(i) for i in range(metadata.num_row_groups)] divisions = [0] + list( itertools.accumulate([rg.num_rows for rg in rgs], operator.add) ) pairs = [] for isubrg, path in zip(subrg, actual_paths): pairs.extend([(irg, path) for irg in isubrg]) return from_map( _FromParquetFragmentWiseFn( fs=fs, schema=subform, listsep=listsep, unnamed_root=unnamed_root, ), pairs, label=label, token=token, divisions=tuple(divisions), meta=typetracer_array(meta), )
def _file_to_partition(path, fs, columns, schema, subrg=None): """read a whole parquet file to awkward""" return _load( actual_paths=[path], fs=fs, parquet_columns=columns, subrg=subrg or [None], footer_sample_size=2**15, max_gap=2**10, max_block=2**22, generate_bitmasks=False, metadata=None, highlevel=True, subform=schema, behavior=None, ) def _metadata_file_from_data_files(path_list, fs, out_path): """ Aggregate _metadata and _common_metadata from data files Maybe only used in testing (similar to fastparquet's merge) path_list: list[str] Input data files fs: AbstractFileSystem instance out_path: str Root directory of the dataset """ import pyarrow.parquet as pq meta = None out_path = out_path.rstrip("/") for path in path_list: assert path.startswith(out_path) with fs.open(path, "rb") as f: _meta = pq.ParquetFile(f).metadata _meta.set_file_path(path[len(out_path) + 1 :]) if meta: meta.append_row_groups(_meta) else: meta = _meta _write_metadata(fs, out_path, meta) def _metadata_file_from_metas(fs, out_path, *metas): """Agregate metadata from arrow objects and write""" meta = metas[0] for _meta in metas[1:]: meta.append_row_groups(_meta) _write_metadata(fs, out_path, meta) def _write_metadata(fs, out_path, meta): """Output metadata files""" metadata_path = "/".join([out_path, "_metadata"]) with fs.open(metadata_path, "wb") as fil: meta.write_metadata_file(fil) metadata_path = "/".join([out_path, "_metadata"]) with fs.open(metadata_path, "wb") as fil: meta.write_metadata_file(fil) def _write_partition( data, path, # dataset root fs, filename, # relative path within the dataset # partition_on=Fa, # must be top-level leaf (i.e., a simple column) return_metadata=False, # whether making global _metadata compression=None, # TBD head=False, # is this the first piece # custom_metadata=None, ): import pyarrow.parquet as pq t = to_arrow_table( data, list_to32=True, string_to32=True, bytestring_to32=True, categorical_as_dictionary=True, extensionarray=False, ) md_list = [] with fs.open(fs.sep.join([path, filename]), "wb") as fil: pq.write_table( t, fil, compression=compression, metadata_collector=md_list, ) # Return the schema needed to write global _metadata if return_metadata: _meta = md_list[0] _meta.set_file_path(filename) d = {"meta": _meta} if head: # Only return schema if this is the "head" partition d["schema"] = t.schema return [d] else: return [] class _ToParquetFn: def __init__( self, fs: AbstractFileSystem, path: Any, return_metadata: bool = False, compression: Any | None = None, head: Any | None = None, npartitions: int | None = None, prefix: str | None = None, ): self.fs = fs self.path = path self.return_metadata = return_metadata self.compression = compression self.head = head self.prefix = prefix self.zfill = ( math.ceil(math.log(npartitions, 10)) if npartitions is not None else 1 ) self.fs.mkdirs(self.path, exist_ok=True) def __call__(self, data, block_index): filename = f"part{str(block_index[0]).zfill(self.zfill)}.parquet" if self.prefix is not None: filename = f"{self.prefix}-{filename}" return _write_partition( data, self.path, self.fs, filename, return_metadata=self.return_metadata, compression=self.compression, head=self.head, )
[docs]def to_parquet( data: Array, path: Any, storage_options: dict[str, Any] | None = None, write_metadata: bool = False, compute: bool = True, prefix: str | None = None, ) -> Scalar | None: """Write data to Parquet format. Parameters ---------- data : dask_awkward.Array Array to write to parquet. path : str Root directory of location to write to storage_options : dict Arguments to pass to fsspec for creating the filesystem (see ``fsspec`` documentation). write_metadata : bool Whether to create _metadata and _common_metadata files compute : bool Whether to immediately start writing or to return the dask collection which can be computed at the user's discression. Returns ------- None or dask_awkward.Scalar If `compute` is ``False``, a :py:class:`dask_awkward.Scalar` representing the process will be returned, if `compute` is ``True`` then the return is ``None``. """ # TODO options we need: # - compression per data type or per leaf column ("path.to.leaf": "zstd" format) # - byte stream split for floats if compression is not None or lzma # - partitioning # - parquet 2 for full set of time and int types # - v2 data page (for possible later fastparquet implementation) # - dict encoding always off fs, _ = fsspec.core.url_to_fs(path, **(storage_options or {})) name = f"write-parquet-{tokenize(fs, data, path)}" map_res = map_partitions( _ToParquetFn(fs, path=path, npartitions=data.npartitions, prefix=prefix), data, BlockIndex((data.npartitions,)), label="to-parquet", meta=data._meta, ) map_res.dask.layers[map_res.name].annotations = {"ak_output": True} dsk = {} if write_metadata: final_name = name + "-metadata" dsk[(final_name, 0)] = (_metadata_file_from_metas, fs, path) + tuple( map_res.__dask_keys__() ) else: final_name = name + "-finalize" dsk[(final_name, 0)] = (lambda *_: None, map_res.__dask_keys__()) graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=[map_res]) out = new_scalar_object(graph, final_name, meta=None) if compute: out.compute() return None else: return out