Source code for dask_mpi.initialize

import asyncio
import atexit
import sys

import dask
from distributed import Nanny, Scheduler
from distributed.utils import import_term

from .exceptions import WorldTooSmallException
from .execute import send_close_signal


[docs] def initialize( interface=None, nthreads=1, local_directory=None, memory_limit="auto", nanny=False, dashboard=True, dashboard_address=":8787", protocol=None, worker_class="distributed.Worker", worker_options=None, comm=None, exit=True, ): """ Initialize a Dask cluster using mpi4py Using mpi4py, MPI rank 0 launches the Scheduler, MPI rank 1 passes through to the client script, and all other MPI ranks launch workers. All MPI ranks other than MPI rank 1 block while their event loops run. In normal operation these ranks exit once rank 1 ends. If exit=False is set they instead return an bool indicating whether they are the client and should execute more client code, or a worker/scheduler who should not. In this case the user is responsible for the client calling send_close_signal when work is complete, and checking the returned value to choose further actions. Parameters ---------- interface : str Network interface like 'eth0' or 'ib0' nthreads : int Number of threads per worker local_directory : str or None Directory to place worker files memory_limit : int, float, or 'auto' Number of bytes before spilling data to disk. This can be an integer (nbytes), float (fraction of total memory), or 'auto'. nanny : bool Start workers in nanny process for management (deprecated, use worker_class instead) dashboard : bool Enable Bokeh visual diagnostics dashboard_address : str Bokeh port for visual diagnostics protocol : str Protocol like 'inproc' or 'tcp' worker_class : str Class to use when creating workers worker_options : dict Options to pass to workers comm: mpi4py.MPI.Intracomm Optional MPI communicator to use instead of COMM_WORLD exit: bool Whether to call sys.exit on the workers and schedulers when the event loop completes. Returns ------- is_client: bool Only returned if exit=False. Inidcates whether this rank should continue to run client code (True), or if it acts as a scheduler or worker (False). """ if comm is None: from mpi4py import MPI comm = MPI.COMM_WORLD world_size = comm.Get_size() if world_size < 3: raise WorldTooSmallException( f"Not enough MPI ranks to start cluster, found {world_size}, " "needs at least 3, one each for the scheduler, client and a worker." ) rank = comm.Get_rank() if not worker_options: worker_options = {} if rank == 0: async def run_scheduler(): async with Scheduler( interface=interface, protocol=protocol, dashboard=dashboard, dashboard_address=dashboard_address, ) as scheduler: comm.bcast(scheduler.address, root=0) comm.Barrier() await scheduler.finished() asyncio.run(run_scheduler()) if exit: sys.exit() else: return False else: scheduler_address = comm.bcast(None, root=0) dask.config.set(scheduler_address=scheduler_address) comm.Barrier() if rank == 1: if exit: atexit.register(send_close_signal) return True else: async def run_worker(): WorkerType = import_term(worker_class) if nanny: WorkerType = Nanny raise DeprecationWarning( "Option nanny=True is deprectaed, use worker_class='distributed.Nanny' instead" ) opts = { "interface": interface, "protocol": protocol, "nthreads": nthreads, "memory_limit": memory_limit, "local_directory": local_directory, "name": rank, **worker_options, } async with WorkerType(**opts) as worker: await worker.finished() asyncio.run(run_worker()) if exit: sys.exit() else: return False