Dask-MPI with Batch Jobs
Contents
Dask-MPI with Batch Jobs¶
Dask, with Dask Distributed, is an incredibly powerful engine behind interactive sessions (see Dask-MPI with Interactive Jobs). However, there are many scenarios where your work is pre-defined and you do not need an interactive session to execute your tasks. In these cases, running in batch-mode is best.
Dask-MPI makes running in batch-mode in an MPI environment easy by providing an API to the
same functionality created for the dask-mpi
Command-Line Interface (CLI). However, in batch mode, you
need the script running your Dask Client to run in the same environment in which your Dask
cluster is constructed, and you want your Dask cluster to shut down after your Client script
has executed.
To make this functionality possible, Dask-MPI provides the initialize()
method as part of
its Application Program Interface (API). The initialize()
function, when run from within an MPI environment (i.e.,
created by the use of mpirun
or mpiexec
), launches the Dask Scheduler on MPI rank 0
and the Dask Workers on MPI ranks 2 and above. On MPI rank 1, the initialize()
function
“passes through” to the Client script, running the Dask-based Client code the user wishes to
execute.
For example, if you have a Dask-based script named myscript.py
, you would be able to
run this script in parallel, using Dask, with the following command.
mpirun -np 4 python myscript.py
This will run the Dask Scheduler on MPI rank 0, the user’s Client code on MPI rank 1, and
2 workers on MPI rank 2 and MPI rank 3. To make this work, the myscript.py
script must
have (presumably near the top of the script) the following code in it.
from dask_mpi import initialize
initialize()
from distributed import Client
client = Client()
The Dask Client will automatically detect the location of the Dask Scheduler running on MPI rank 0 and connect to it.
When the Client code is finished executing, the Dask Scheduler and Workers (and, possibly, Nannies) will be terminated.
Tip
Running Batch Jobs with Job Schedulers
It is common in High-Performance Computing (HPC) environments to request the necessary
computing resources with a job scheduler, such LSF, PBS, or SLURM. In such environments,
is is advised that the mpirun ... python myscript.py
command be placed in a job
submission script such that the resources requested from the job scheduler match the
resources used by the mpirun
command.
For more details on the initialize()
method, see the Application Program Interface (API).
Connecting to Dashboard¶
Due to the fact that Dask might be initialized on a node that isn’t the login node a simple port forwarding can be insufficient to connect to a dashboard.
To find out which node is the one hosting the dashboard append initialization code with location logging:
from dask_mpi import initialize
initialize()
from dask.distributed import Client
from distributed.scheduler import logger
import socket
client = Client()
host = client.run_on_scheduler(socket.gethostname)
port = client.scheduler_info()['services']['dashboard']
login_node_address = "supercomputer.university.edu" # Provide address/domain of login node
logger.info(f"ssh -N -L {port}:{host}:{port} {login_node_address}")
Then in batch job output file search for the logged line and use in your terminal:
ssh -N -L PORT_NUMBER:node03:PORT_NUMBER supercomputer.university.edu
The Bokeh Dashboard will be available at localhost:PORT_NUMBER
.