Optimization#
When you ask Dask to compute a collection (with the compute
method
on the collection, the dask.compute()
function, etc.), Dask
will (by default) optimize the task graph before beginning to execute
the graph (the optimize_graph
argument exists to toggle this
behavior, but setting this to False
is really meant for
debugging). Core Dask has a number of optimizations implemented that
we benefit from downstream in dask-awkward. You can read more about
Dask optimization in general at this section of the Dask docs.
dask-awkward Optimizations#
There are two optimizations implemented in the dask-awkward code. One
is the layer-chains
optimization that fuses adjacent task graph
layers together (if they are compatible with each other). This is a
relatively simple optimization that just simplifies the task graph.
The other optimization is the columns
(or “necessary columns”)
optimization; which is a bit more technical and described in a
follow-up section.
One can configure which optimizations to run at compute-time; read more optimization. More information can be found in the configuration section of the docs.
Necessary Columns#
We have one dask-awkward specific optimization that targets efficient data access from disk. We call it the “necessary columns” optimization. This optimization will execute the task graph without operating on real data. The data-less execution of the graph helps determine which parts of a dataset sitting on disk are actually required to read in order to successfully complete the compute.
Once we’ve determined which parts of the data are necessary, we can
pass that information to awkward’s input functions at the data reading
layers of our task graph. With Parquet, this is the columns=
argument of ak.from_parquet()
. With JSON, we construct a
JSONSchema that contains only the necessary parts of the data that we
want, and we pass that to the schema=
argument of
ak.from_json()
.
Note
Two file formats are supported by the necessary columns optimization: Parquet and JSON. The optimization is on by default for reading Parquet, but it is opt-in for JSON. One can control via configuration which formats will use the columns optimization when read from disk. For example, the following code snippet shows how to opt-in to using the necessary columns optimization via JSONSchema
import dask_awkward as dak
import dask.config
ds = dak.from_json("/path/to/data")
thing = dak.max(ds.field1, axis=1)
with dask.config.set({"awkward.optimization.columns-opt-formats": ["json"]}):
thing.compute()
Let’s look at a simple example dataset: an awkward array with two top
level fields (foo
and bar
), with one field having two
subfields (bar.x
and bar.y
). Imagine this dataset is going to
be read off disk in Parquet format. In this format we’ll have a column
of integers for foo
, a column of integers for bar.x
and a
column of floats for bar.y
.
[
{"foo": 5, "bar": {"x": [-1, -2], "y": -2.2}},
{"foo": 6, "bar": {"x": [-3], "y": 3.3}},
{"foo": 7, "bar": {"x": [-5, -6, -7], "y": -4.4}},
{"foo": 8, "bar": {"x": [8, 9, 10, 11, 12], "y": 5.5}},
...
]
If our task graph is of the form:
>>> import dask_awkward as dak
>>> ds = dak.from_parquet("/path/to/data")
>>> result = ds.bar.x / ds.foo
We have five layers in the graph:
Reading parquet from the path
/path/to/data
Access the field
foo
Access the field
bar
Access the field
x
frombar
Array division
We can see this at the REPL by inspecting the .dask
property of
the collection:
>>> result.dask
HighLevelGraph with 5 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x134a4fc10>
0. read-parquet-f4e4296edcc1309191080cae9018ab4c
1. foo-791f3e559c4061a8c9df2e87a0524069
2. bar-edf7073f1aab48e986099f7c67e81be9
3. x-47d0bdfde8d53e07444a58204428ff2f
4. divide-b85b7c773695128b08311b3a75b0002b
Notice that we never actually need the bar.y
column of floats.
Upon calling result.compute()
, step (1) in our list of layers
above (reading parquet) will be updated such that the parquet read
will only grab foo
and bar.x
.
Note
This is done by replacing the original input layer with a new
layer instance that will pass in the named argument
columns=["foo", "bar.x"]
to the concrete awkward
ak.from_parquet()
function at compute time.
You can see which columns are determined to be necessary by calling
dask_awkward.report_necessary_columns()
on the collection of interest
(it returns a mapping that pairs an input layer with the list of
necessary columns):
>>> import dask_awkward as dak
>>> dak.report_necessary_columns(result)
{"some-layer-name": ["foo", "bar.x"]}
The optimization is performed by relying on upstream Awkward-Array
typetracers. It is possible for this optimization to fail. The
default configuration is such that a warning will be thrown if the
optimization fails. If you’d instead like to silence the warning or
raise an exception, the configuration parameter can be adjusted. Here
are the options for the awkward.optimization.on-fail
configuration
parameter:
"pass"
: fail silently; the optimization is skipped (can reduce performance by reading unncessary data from disk)."raise"
: fail by raising an exception: this will stop the process at compute time."warn"
(the default): fail with a warning but let the compute continue without the necessary columns optimization (can reduce performance by reading unnecessary data from disk).
One can also use the columns=
argument (with
from_parquet()
, for example) to manually define
which columns should be read from disk. The
report_necessary_columns()
function can be used to
determine how one should use the columns=
argument. Using our
above example, we write
>>> import dask_awkward as dak
>>> import dask.config
>>> ds = dak.from_parquet("/path/to/data", columns=["bar.x", "foo"])
>>> result = ds.bar.x / ds.foo
>>> with dask.config.set({"awkward.optimization.enabled": False}):
... result.compute()
...
With this code we can save a little bit of overhead by not running the necessary columns optimization after already defining, by hand, the minimal set (one should be sure about what is needed with this workflow).
Note
Under the hood, the columns optimization is implemented as a buffers optimization; dask-awkward determines the
buffers necessary to read from a columnar source, before translating these to column names. Some IO sources might
not support report_necessary_columns()
, e.g. if the source directly reads buffers from a container.
For these IO sources, report_necessary_buffers()
can be used instead.