So far we've been dealing with Dask's high-level interface, and the task graph. Here we'll briefly talk about the how the task graphs get executed. For a longer discussion of this topic, take a look at this notebook.
Let's create a cluster locally
from dask.distributed import Client
client = Client()
client
Client-37c30bf5-f632-11eb-9e3b-80fa5b765b9a
Connection method: Cluster object | Cluster type: LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
c6080768
Status: running | Using processes: True |
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 12 | Total memory: 31.08 GiB |
Scheduler-0522c2b7-07c7-4a46-ab96-e7abfe959fe9
Comm: tcp://127.0.0.1:44903 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 12 |
Started: Just now | Total memory: 31.08 GiB |
Comm: tcp://127.0.0.1:41231 | Total threads: 3 |
Dashboard: http://127.0.0.1:41349/status | Memory: 7.77 GiB |
Nanny: tcp://127.0.0.1:44927 | |
Local directory: /home/julia/talks/inside-dask/dask-worker-space/worker-6d11svey |
Comm: tcp://127.0.0.1:43837 | Total threads: 3 |
Dashboard: http://127.0.0.1:42991/status | Memory: 7.77 GiB |
Nanny: tcp://127.0.0.1:42707 | |
Local directory: /home/julia/talks/inside-dask/dask-worker-space/worker-b55hkde8 |
Comm: tcp://127.0.0.1:43305 | Total threads: 3 |
Dashboard: http://127.0.0.1:36265/status | Memory: 7.77 GiB |
Nanny: tcp://127.0.0.1:34749 | |
Local directory: /home/julia/talks/inside-dask/dask-worker-space/worker-xp6qugyy |
Comm: tcp://127.0.0.1:40871 | Total threads: 3 |
Dashboard: http://127.0.0.1:39141/status | Memory: 7.77 GiB |
Nanny: tcp://127.0.0.1:37863 | |
Local directory: /home/julia/talks/inside-dask/dask-worker-space/worker-7h7_am_c |
Let's use the example from the notebook before to trigger a computation.
import dask.dataframe as dd
ddf = dd.read_csv("data/*", parse_dates=["timestamp"]).set_index("timestamp")
mean_temperature = ddf.temperature.mean()
output = (ddf.temperature - mean_temperature).resample("1M").agg(["min", "max"])
You can also directly run functions on every worker bypassing the scheduler entirely.
import os
client.run(os.listdir, "data")
{'tcp://127.0.0.1:40871': ['4.part', '2.part', '5.part', '1.part', '7.part', '8.part', '3.part', '0.part', '6.part'], 'tcp://127.0.0.1:41231': ['4.part', '2.part', '5.part', '1.part', '7.part', '8.part', '3.part', '0.part', '6.part'], 'tcp://127.0.0.1:43305': ['4.part', '2.part', '5.part', '1.part', '7.part', '8.part', '3.part', '0.part', '6.part'], 'tcp://127.0.0.1:43837': ['4.part', '2.part', '5.part', '1.part', '7.part', '8.part', '3.part', '0.part', '6.part']}
Do stuff on a worker every time the worker changes state.
import os
from distributed import WorkerPlugin
class CopyFile(WorkerPlugin):
"""A WorkerPlugin to copy a local file to workers.
Parameters
----------
filepath: str
A path to the file to copy to workers
Examples
--------
>>> client.register_worker_plugin(CopyFile(".env"))
"""
def __init__(self, filepath):
"""
Initialize the plugin by reading in the data from the given file.
"""
self.filename = os.path.basename(filepath)
with open(filepath, "rb") as f:
self.data = f.read()
async def setup(self, worker):
with open(self.filename, "wb+") as f:
f.write(self.data)
return os.listdir()
NOTE: This is slightly goofy since this is a LocalCluster
and it'll only work on my machine, since I put a picture of a cat up a few levels up.
client.register_worker_plugin(CopyFile("../../cat.jpg"))
{'tcp://127.0.0.1:40871': {'status': 'OK'}, 'tcp://127.0.0.1:41231': {'status': 'OK'}, 'tcp://127.0.0.1:43305': {'status': 'OK'}, 'tcp://127.0.0.1:43837': {'status': 'OK'}}