Source code for dask_awkward.lib.io.io

from __future__ import annotations

import logging
import math
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass
from functools import partial
from typing import TYPE_CHECKING, Any, cast

import awkward as ak
import dask.config
import numpy as np
from awkward.types.numpytype import primitive_to_dtype
from awkward.typetracer import length_zero_if_typetracer
from dask.base import flatten, tokenize
from dask.highlevelgraph import HighLevelGraph
from dask.local import identity
from dask.utils import funcname, is_integer, parse_bytes
from fsspec.utils import infer_compression

from dask_awkward.layers.layers import (
    AwkwardBlockwiseLayer,
    AwkwardInputLayer,
    AwkwardMaterializedLayer,
    AwkwardTreeReductionLayer,
    ImplementsMocking,
    ImplementsReport,
    IOFunctionWithMocking,
    io_func_implements_mocking,
    io_func_implements_report,
)
from dask_awkward.lib.core import (
    Array,
    empty_typetracer,
    map_partitions,
    new_array_object,
    typetracer_array,
)
from dask_awkward.lib.io.columnar import ColumnProjectionMixin
from dask_awkward.utils import first, second

if TYPE_CHECKING:
    from dask.array.core import Array as DaskArray
    from dask.bag.core import Bag as DaskBag
    from dask.dataframe.core import DataFrame as DaskDataFrame
    from dask.delayed import Delayed
    from fsspec.spec import AbstractFileSystem


logger = logging.getLogger(__name__)


class FromAwkwardFn(ColumnProjectionMixin):
    def __init__(
        self,
        arr: ak.Array,
        behavior: Mapping | None = None,
        attrs: Mapping[str, Any] | None = None,
    ) -> None:
        self.arr = arr
        self.form = arr.layout.form
        self.behavior = behavior if behavior else arr.behavior
        self.attrs = attrs if attrs else arr.attrs

    @property
    def use_optimization(self):
        return True

    def __call__(self, *args, **kwargs):
        start, stop = args[0]
        arr = cast(ak.Array, self.arr[start:stop])
        return ak.Array(arr, behavior=self.behavior, attrs=self.attrs)

    def project_columns(self, columns):
        return type(self)(self.arr, self.behavior, self.attrs)


[docs] def from_awkward( source: ak.Array, npartitions: int, behavior: Mapping | None = None, label: str | None = None, attrs: Mapping[str, Any] | None = None, ) -> Array: """Create an Array collection from a concrete :class:`awkward.Array` object. Parameters ---------- source : ak.Array The concrete awkward array. npartitions : int The total number of partitions for the collection. behavior : dict, optional Custom ak.behavior for the output array. label : str, optional Label for the task. attrs : mapping, optional Custom attributes for the output array. Returns ------- Array Resulting awkward array collection. Examples -------- >>> import dask_awkward as dak >>> import awkward as ak >>> a = ak.Array([[1, 2, 3], [4], [5, 6, 7, 8]]) >>> c = dak.from_awkward(a, npartitions=3) >>> c.partitions[[0, 1]].compute() <Array [[1, 2, 3], [4]] type='2 * var * int64'> """ nrows = len(source) if nrows == 0: locs: tuple[None, ...] | tuple[int, ...] = (None, None) else: chunksize = int(math.ceil(nrows / npartitions)) locs = tuple(list(range(0, nrows, chunksize)) + [nrows]) starts_stops = list(zip(locs[:-1], locs[1:])) meta = typetracer_array(source) return cast( Array, from_map( FromAwkwardFn(source, behavior=behavior, attrs=attrs), starts_stops, label=label or "from-awkward", token=tokenize(source, npartitions), divisions=locs, meta=meta, ), )
class _FromListsFn: def __init__(self, behavior: Mapping | None, attrs: Mapping[str, Any] | None): self.behavior = behavior self.attrs = attrs def __call__(self, x: list) -> ak.Array: return ak.Array(x, behavior=self.behavior, attrs=self.attrs)
[docs] def from_lists( source: list, behavior: Mapping | None = None, attrs: Mapping[str, Any] | None = None, ) -> Array: """Create an Array collection from a list of lists. Parameters ---------- source : list[list[Any]] List of lists, each outer list will become a partition in the collection. behavior : dict, optional Custom ak.behavior for the output array. attrs : mapping, optional Custom attributes for the output array. Returns ------- Array Resulting Array collection. Examples -------- >>> import dask_awkward as dak >>> a = [[1, 2, 3], [4]] >>> b = [[5], [6, 7, 8]] >>> c = dak.from_lists([a, b]) >>> c dask.awkward<from-lists, npartitions=2> >>> c.compute() <Array [[1, 2, 3], [4], [5], [6, 7, 8]] type='4 * var * int64'> """ lists = list(source) divs = (0, *np.cumsum(list(map(len, lists)))) return cast( Array, from_map( _FromListsFn(behavior=behavior, attrs=attrs), lists, meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)), divisions=divs, label="from-lists", ), )
[docs] def from_delayed( source: list[Delayed] | Delayed, meta: ak.Array | None = None, behavior: Mapping | None = None, divisions: tuple[int, ...] | tuple[None, ...] | None = None, prefix: str = "from-delayed", attrs: Mapping[str, Any] | None = None, ) -> Array: """Create an Array collection from a set of :class:`~dask.delayed.Delayed` objects. Parameters ---------- source : list[dask.delayed.Delayed] | dask.delayed.Delayed List of :py:class:`~dask.delayed.Delayed` objects (or a single object). Each Delayed object represents a single partition in the resulting awkward array. meta : ak.Array, optional Metadata (typetracer array) if known, if ``None`` the first partition (first element of the list of ``Delayed`` objects) will be computed to determine the metadata. behavior : dict, optional Custom ak.behavior for the output array. divisions : tuple[int | None, ...], optional Partition boundaries (if known). prefix : str Prefix for the keys in the task graph. attrs : mapping, optional Custom attributes for the output array. Returns ------- Array Resulting Array collection. """ from dask.delayed import Delayed parts = [source] if isinstance(source, Delayed) else source name = f"{prefix}-{tokenize(parts)}" dsk = AwkwardMaterializedLayer( {(name, i): part.key for i, part in enumerate(parts)}, previous_layer_names=[parts[0].key], ) if divisions is None: divs: tuple[int, ...] | tuple[None, ...] = (None,) * (len(parts) + 1) else: divs = divisions if len(divs) != len(parts) + 1: raise ValueError("divisions must be a tuple of length len(source) + 1") hlg = HighLevelGraph.from_collections(name, dsk, dependencies=parts) return new_array_object( hlg, name=name, meta=meta, behavior=behavior, divisions=divs, attrs=attrs )
[docs] def to_delayed(array: Array, optimize_graph: bool = True) -> list[Delayed]: """Convert Arrray the collection to a list of :class:`~dask.delayed.Delayed` objects. One dask.delayed.Delayed object per partition. Parameters ---------- optimize_graph : bool If ``True`` the task graph associated with the collection will be optimized before conversion to the list of Delayed objects. Returns ------- list[dask.delayed.Delayed] List of delayed objects (one per partition). """ from dask.delayed import Delayed keys = array.__dask_keys__() graph = array.__dask_graph__() layer = array.__dask_layers__()[0] if optimize_graph: graph = array.__dask_optimize__(graph, keys) layer = f"delayed-{array.name}" graph = HighLevelGraph.from_collections(layer, graph, dependencies=()) return [Delayed(k, graph, layer=layer) for k in keys]
[docs] def to_dask_bag(array: Array) -> DaskBag: """Convert Array collection to a :class:`dask.bag.Bag` collection.""" from dask.bag.core import Bag return Bag(array.dask, array.name, array.npartitions)
[docs] def to_dask_array( array: Array, *, dtype: Any = None, optimize_graph: bool = True, ) -> DaskArray: """Convert Array collection to a :class:`dask.array.Array` collection. This conversion requires the awkward array to have a rectilinear shape (that is, no lists of variable length lists). Parameters ---------- array : Array The dask awkward array collection. dtype : DType NumPy dtype for the resulting array. optimize_graph : bool Optimize the graph associated with `array` (the ``dask_awkward.Array``) before converting to ``dask.array.Array``. Returns ------- dask.array.Array The new :py:class:`dask.array.Array` collection. """ from dask.array.core import new_da_object if array._meta is None: raise ValueError("Array metadata required for determining dtype") ndim = array.ndim if optimize_graph: keys = array.__dask_keys__() graph = array.__dask_graph__() layer = array.__dask_layers__()[0] graph = array.__dask_optimize__(graph, keys) hlg = HighLevelGraph.from_collections(layer, graph, dependencies=()) array = new_array_object( hlg, name=layer, divisions=array.divisions, meta=array._meta, ) if ndim == 1: new = map_partitions(ak.to_numpy, array, meta=empty_typetracer()) graph = new.dask dtype = dtype or primitive_to_dtype(array._meta.layout.form.type.primitive) if array.known_divisions: divs = np.array(array.divisions) chunks: tuple[tuple[float, ...], ...] = (tuple(divs[1:] - divs[:-1]),) else: chunks = ((np.nan,) * array.npartitions,) return new_da_object( graph, new.name, meta=None, chunks=chunks, dtype=dtype, ) else: # assert ndim > 1 content = array._meta.layout.form.type.content no_primitive = not hasattr(content, "primitive") while no_primitive: content = content.content no_primitive = not hasattr(content, "primitive") dtype = dtype or primitive_to_dtype(content.primitive) name = f"to-dask-array-{tokenize(array)}" nan_tuples_innerdims = ((np.nan,),) * (ndim - 1) chunks = ((np.nan,) * array.npartitions, *nan_tuples_innerdims) zeros = (0,) * (ndim - 1) # eventually convert to HLG (if possible) llg = { (name, i, *zeros): (ak.to_numpy, k) for i, k in enumerate(flatten(array.__dask_keys__())) } graph = HighLevelGraph.from_collections( name, AwkwardMaterializedLayer( llg, previous_layer_names=[array.name], ), dependencies=[array], ) return new_da_object(graph, name, meta=None, chunks=chunks, dtype=dtype)
[docs] def from_dask_array( array: DaskArray, behavior: Mapping | None = None, attrs: Mapping[str, Any] | None = None, ) -> Array: """Convert a Dask Array collection to a Dask Awkard Array collection. Parameters ---------- array : dask.array.Array Array to convert. behavior : dict, optional Custom ak.behavior for the output array. attrs : mapping, optional Custom attributes for the output array. Returns ------- Array The Awkward Array Dask collection. Examples -------- >>> import dask.array as da >>> import dask_awkward as dak >>> x = da.ones(1000, chunks=250) >>> y = dak.from_dask_array(x) >>> y dask.awkward<from-dask-array, npartitions=4> """ from dask.blockwise import blockwise as dask_blockwise token = tokenize(array) name = f"from-dask-array-{token}" meta = typetracer_array(ak.from_numpy(array._meta)) pairs = (array.name, "i") numblocks = {array.name: array.numblocks} layer = dask_blockwise( ak.from_numpy, name, "i", *pairs, numblocks=numblocks, concatenate=True, ) layer = AwkwardBlockwiseLayer.from_blockwise(layer) hlg = HighLevelGraph.from_collections(name, layer, dependencies=[array]) if np.any(np.isnan(array.chunks)): return new_array_object( hlg, name, npartitions=array.npartitions, meta=meta, behavior=behavior, attrs=attrs, ) else: divs = (0, *np.cumsum(array.chunks)) return new_array_object( hlg, name, divisions=divs, meta=meta, behavior=behavior, attrs=attrs )
[docs] def to_dataframe( array: Array, optimize_graph: bool = True, **kwargs: Any, ) -> DaskDataFrame: """Convert :class:`dask_awkward.Array` collection to :class:`~dask.dataframe.DataFrame`. Parameters ---------- array : dask_awkward.Array Array collection to be converted. optimize_graph : bool If ``True``, optimize the Array collection task graph before converting to DataFrame collection. **kwargs : Any Additional arguments passed to :func:`ak.to_dataframe`. Returns ------- dask.dataframe.DataFrame Resulting DataFrame collection. """ import dask from dask.dataframe.core import DataFrame as DaskDataFrame from dask.dataframe.core import new_dd_object if optimize_graph: (array,) = dask.optimize(array) intermediate = map_partitions( ak.to_dataframe, array, meta=empty_typetracer(), label="to-dataframe", **kwargs, ) meta = ak.to_dataframe(length_zero_if_typetracer(array._meta), **kwargs) return cast( DaskDataFrame, new_dd_object( intermediate.dask, intermediate.name, meta, intermediate.divisions, ), )
class PackedArgCallable: """Wrap a callable such that packed arguments can be unrolled. Inspired by dask.dataframe.io.io._PackedArgCallable. """ def __init__( self, func: Callable, args: tuple[Any, ...] | None = None, kwargs: dict[str, Any] | None = None, packed: bool = False, ): self.func = func self.args = args self.kwargs = kwargs self.packed = packed def __call__(self, packed_arg): if not self.packed: packed_arg = (packed_arg,) return self.func( *packed_arg, *(self.args or []), **(self.kwargs or {}), )
[docs] def from_map( func: Callable, *iterables: Iterable, args: tuple[Any, ...] | None = None, label: str | None = None, token: str | None = None, divisions: tuple[int, ...] | tuple[None, ...] | None = None, meta: ak.Array | None = None, **kwargs: Any, ) -> Array | tuple[Array, Array]: """Create an Array collection from a custom mapping. Parameters ---------- func : Callable Function used to create each partition. *iterables : Iterable Iterable objects to map to each output partition. All iterables must be the same length. This length determines the number of partitions in the output collection (only one element of each iterable will be passed to `func` for each partition). args : tuple Tuple of positional arguments to append after mapped arguments. label : str, optional String to use as the function-name label in the output collection-key names. token : str, optional String to use as the "token" in the output collection-key names. divisions : tuple[int, ...] | tuple[None, ...], optional Partition boundaries (if known). meta : Array, optional Collection metadata array, if known (the awkward-array type tracer) **kwargs : Any Keyword arguments passed to `func`. Returns ------- Array Array collection. """ if not callable(func): raise ValueError("`func` argument must be `callable`") lengths = set() iters: list[Iterable] = list(iterables) for i, iterable in enumerate(iters): if not isinstance(iterable, Iterable): raise ValueError( f"All elements of `iterables` must be Iterable, got {type(iterable)}" ) try: lengths.add(len(iterable)) # type: ignore except (AttributeError, TypeError): iters[i] = list(iterable) lengths.add(len(iters[i])) # type: ignore if len(lengths) == 0: raise ValueError("`from_map` requires at least one Iterable input") elif len(lengths) > 1: raise ValueError("All `iterables` must have the same length") if lengths == {0}: raise ValueError("All `iterables` must have a non-zero length") # Check for `produces_tasks` and `creation_info` produces_tasks = kwargs.pop("produces_tasks", False) # creation_info = kwargs.pop("creation_info", None) if produces_tasks or len(iters) == 1: if len(iters) > 1: # Tasks are not detected correctly when they are "packed" # within an outer list/tuple raise ValueError( "Multiple iterables not supported when produces_tasks=True" ) inputs = list(iters[0]) packed = False else: # Structure inputs such that the tuple of arguments pair each 0th, # 1st, 2nd, ... elements together; for example: # from_map(f, [1, 2, 3], [4, 5, 6]) --> [f(1, 4), f(2, 5), f(3, 6)] inputs = list(zip(*iters)) packed = True # Define collection name label = label or funcname(func) token = token or tokenize(func, iters, meta, **kwargs) name = f"{label}-{token}" # Define io_func # FIXME: projection etc. if packed or args or kwargs: func = PackedArgCallable( func, args=args, kwargs=kwargs, packed=packed, ) # Special `io_func` implementations can implement mocking and optionally # support buffer projection. if io_func_implements_mocking(func): io_func = func array_meta = cast(ImplementsMocking, func).mock() # If we know the meta, we can spoof mocking elif meta is not None: io_func = IOFunctionWithMocking(meta, func) array_meta = meta # Without `meta`, the meta will be computed by executing the graph else: io_func = func array_meta = None dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func) hlg = HighLevelGraph.from_collections(name, dsk) if divisions is not None: result = new_array_object(hlg, name, meta=array_meta, divisions=divisions) else: result = new_array_object(hlg, name, meta=array_meta, npartitions=len(inputs)) if io_func_implements_report(io_func): if cast(ImplementsReport, io_func).return_report: res = result.map_partitions( first, meta=array_meta, label=label, output_divisions=1 ) concat_fn = partial( ak.concatenate, axis=0, ) split_every = dask.config.get("awkward.aggregation.split-every", 8) rep_trl_label = f"{label}-report" rep_trl_token = tokenize(result, second, concat_fn, split_every) rep_trl_name = f"{rep_trl_label}-{rep_trl_token}" rep_trl_tree_node_name = f"{rep_trl_label}-tree-node-{rep_trl_token}" rep_part = result.map_partitions( second, meta=empty_typetracer(), label=f"{label}-partitioned-report" ) rep_trl = AwkwardTreeReductionLayer( name=rep_trl_name, name_input=rep_part.name, npartitions_input=rep_part.npartitions, concat_func=concat_fn, tree_node_func=identity, finalize_func=identity, split_every=split_every, tree_node_name=rep_trl_tree_node_name, ) rep_graph = HighLevelGraph.from_collections( rep_trl_name, rep_trl, dependencies=[rep_part] ) rep = new_array_object( rep_graph, rep_trl_name, meta=empty_typetracer(), npartitions=len(rep_trl.output_partitions), ) return res, rep return result
@dataclass class _BytesReadingInstructions: fs: AbstractFileSystem path: str compression: str | None offset: int | None length: int | None delimiter: bytes def expand(self): return ( self.fs, self.path, self.compression, self.offset, self.length, self.delimiter, ) def _bytes_with_sample( fs: AbstractFileSystem, paths: list[str], compression: str | None, delimiter: bytes, not_zero: bool, blocksize: str | int | None, sample: str | int | bool, ) -> tuple[list[list[_BytesReadingInstructions]], bytes]: """Generate instructions for reading bytes from paths in a filesystem. This function is for internal use in from_json and from_text; we create a set of instructions to lazily read bytes from files on disk. Parameters ---------- fs : AbstractFileSystem Filesystem where data lives. paths : list[str] Path to the data. compression : str, optional Compression of the data. delimiter : bytes, optional Delimiter to create chunks on not_zero : bool If ``True`` skip forward 1 byte when seeking for the first delimiter (dropping header). blocksize : str | int Size for each chunk of bytes. sample : str | int | bool Size of sample to eagerly read and return (if False return ``b""``). Returns ------- list[list[_BytesReadingInstructions]] list of lists of instructions (outer list of paths, inner list for chunks of the path). bytes Sample bytes. """ if blocksize is not None: if isinstance(blocksize, str): blocksize = parse_bytes(blocksize) if not is_integer(blocksize): raise TypeError("blocksize must be an integer") blocksize = int(blocksize) if compression == "infer": compression = infer_compression(paths[0]) if blocksize is None: offsets = [[0]] * len(paths) lengths: list = [[None]] * len(paths) else: offsets = [] lengths = [] for path in paths: if compression is not None: raise ValueError( "Cannot do chunked reads on compressed files. " "To read, set blocksize=None" ) size = fs.info(path)["size"] if size is None: raise ValueError( "Backing filesystem couldn't determine file size, cannot " "do chunked reads. To read, set blocksize=None." ) elif size == 0: # skip empty offsets.append([]) lengths.append([]) else: # shrink blocksize to give same number of parts if size % blocksize and size > blocksize: blocksize1 = size / (size // blocksize) else: blocksize1 = blocksize place = 0 off = [0] length = [] # figure out offsets, spreading around spare bytes while size - place > (blocksize1 * 2) - 1: place += blocksize1 off.append(int(place)) length.append(off[-1] - off[-2]) length.append(size - off[-1]) if not_zero: off[0] = 1 length[0] -= 1 offsets.append(off) lengths.append(length) out = [] for path, offset, length in zip(paths, offsets, lengths): values = [ _BytesReadingInstructions( fs, path, compression, offs, leng, delimiter, ) for offs, leng in zip(offset, length) ] out.append(values) sample_bytes = b"" if sample: sample_size = parse_bytes(sample) if isinstance(sample, str) else sample with fs.open(paths[0], compression=compression) as f: # read block without seek (because we start at zero) sample_buff = f.read(sample_size) while True: new = f.read(sample_size) if not new: break if delimiter in new: sample_buff = sample_buff + new.split(delimiter, 1)[0] + delimiter break sample_buff = sample_buff + new sample_bytes = sample_buff rfind = sample_bytes.rfind(delimiter) if rfind > 0: sample_bytes = sample_bytes[:rfind] return out, sample_bytes