What if you don't have an array or dataframe? Instead of having blocks where the function is applied to each block, you can decorate functions with @delayed
and have the functions themselves be lazy.
NOTE: For this example we will create a fake dataset and store it on disk. You can ignore this bit.
import numpy as np
import dask
import dask.array as da
ddf = dask.datasets.timeseries(start="2010-01-01", end="2020-01-01", freq="1H", partition_freq="1Y")
annual_cycle = np.sin(2 * np.pi * (ddf.index.dayofyear.values / 365.25 - 0.28)).compute_chunk_sizes()
temperature_values = 10 + 15 * annual_cycle + 3 * da.random.normal(size=annual_cycle.size)
ddf["temperature"] = temperature_values
ddf.to_csv("data")
['/home/julia/talks/inside-dask/data/0.part', '/home/julia/talks/inside-dask/data/1.part', '/home/julia/talks/inside-dask/data/2.part', '/home/julia/talks/inside-dask/data/3.part', '/home/julia/talks/inside-dask/data/4.part', '/home/julia/talks/inside-dask/data/5.part', '/home/julia/talks/inside-dask/data/6.part', '/home/julia/talks/inside-dask/data/7.part', '/home/julia/talks/inside-dask/data/8.part']
This is example matches the one in Not Delayed. But this one has Dask.
!rm -rf transformed_data_lazy
import os
import time
import random
import pandas as pd
import dask
os.mkdir("transformed_data_lazy")
@dask.delayed
def read_a_file(filename):
time.sleep(random.random())
df = pd.read_csv(f"data/{filename}", parse_dates=["timestamp"], index_col="timestamp")
return df
@dask.delayed
def do_a_transformation(df):
time.sleep(random.random())
df["temperature_F"] = df["temperature"] * 9/5 + 32
return df
@dask.delayed
def write_it_back_out(df, filename):
time.sleep(random.random())
path = f"transformed_data_lazy/{filename}"
df.to_csv(path)
return path
filenames = os.listdir("data")
outputs = []
for filename in filenames:
df = read_a_file(filename)
df = do_a_transformation(df)
path = write_it_back_out(df, filename)
outputs.append(path)
dask.compute(outputs)
(['transformed_data_lazy/4.part', 'transformed_data_lazy/2.part', 'transformed_data_lazy/5.part', 'transformed_data_lazy/1.part', 'transformed_data_lazy/7.part', 'transformed_data_lazy/8.part', 'transformed_data_lazy/3.part', 'transformed_data_lazy/0.part', 'transformed_data_lazy/6.part'],)
dask.visualize(outputs)
Of course objects can also be converted to delayed
. Here we can convert from a dask.array
to a numpy.array
of delayed objects.
import dask.array as da
arr = da.random.random(size=(1_000, 1_000), chunks=(250, 500))
arr_delayed = arr.to_delayed()
arr_delayed
array([[Delayed(('random_sample-a6cb6df82add15c8da77411c0a69bfb0', 0, 0)), Delayed(('random_sample-a6cb6df82add15c8da77411c0a69bfb0', 0, 1))], [Delayed(('random_sample-a6cb6df82add15c8da77411c0a69bfb0', 1, 0)), Delayed(('random_sample-a6cb6df82add15c8da77411c0a69bfb0', 1, 1))], [Delayed(('random_sample-a6cb6df82add15c8da77411c0a69bfb0', 2, 0)), Delayed(('random_sample-a6cb6df82add15c8da77411c0a69bfb0', 2, 1))], [Delayed(('random_sample-a6cb6df82add15c8da77411c0a69bfb0', 3, 0)), Delayed(('random_sample-a6cb6df82add15c8da77411c0a69bfb0', 3, 1))]], dtype=object)
Delayed objects can be used like blocks, but they don't have any sense of what they represent, so there are fewer guard rails.
arr_delayed[0, 1].sum().compute()
62527.91871535222
arr.blocks[0, 1].sum().compute()
62527.91871535222
arr_delayed[0, 1] + "a"
Delayed('add-2c85d7f9ba40b6a2b9404ceb02cb2202')
arr.blocks[0, 1] + "a"
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-8-808812e8336d> in <module> ----> 1 arr.blocks[0, 1] + "a" TypeError: unsupported operand type(s) for +: 'Array' and 'str'