Sometimes the task graph that Dask constructs is sub-optimal. For instance if you are aggregating data and then use that aggregated data in the next operation and that data requires a lot of memory, then it might make sense to manipulate the task graph. Read more about manipulating task graphs.
Naive approach to normalizing by mean temperature.
import dask.dataframe as dd
ddf = dd.read_csv("data/*", parse_dates=["timestamp"]).set_index("timestamp")
mean_temperature = ddf.temperature.mean()
output = (ddf.temperature - mean_temperature).resample("1M").agg(["min", "max"])
output.visualize()
If we instead use bind
, then the task graph is restructured to first go all the way through the mean_temperature
calculation, and only after that, to start on the full computation.
NOTE: An alternate approach would be to call mean_temperature.compute()
and pass the evaluated value into the final operation. The important difference is that using bind
keeps the operation fully lazy (all part of one task graph).
from dask.graph_manipulation import bind
temperature_b = bind(ddf.temperature, mean_temperature)
output_b = (temperature_b - mean_temperature).resample("1M").agg(["min", "max"])
output_b.visualize()