You can manually apply a function to each block by iterating over the blocks in the array.
import dask.array as da
arr = da.random.random(size=(1_000, 1_000), chunks=(250, 500))
result = 0
for block in arr.blocks:
result += block.sum()
result.visualize()
For dataframes this looks very similar. Here we iterate over partitions in a list comprehension.
import dask
ddf = dask.datasets.timeseries()
result = sum(partition.groupby("name").sum() for partition in ddf.partitions)
result.visualize()
result.visualize(optimize_graph=True)
Another approach to this problem would be to add neighbors together repeatedly until you only have one left.
outputs = [partition.groupby("name").sum() for partition in ddf.partitions]
while len(outputs) > 1:
start = len(outputs) % 2 # 1 if odd, 0 if even
outputs = [
*outputs[:start],
*[outputs[i] + outputs[i + 1] for i in range(start, len(outputs), 2)]
]
output = outputs[0]
output.visualize()
That graph optimizes to the same as our original.
output.visualize(optimize_graph=True)