Conda Package Management

There are a number of different ways to leverage the capabilities of Conda package management for custom data and post-processing analysis using Python. This tutorial will go over how to set up Conda environments for various applications on Rescale.

Please contact us if you’re having trouble.

Miniconda vs. Anaconda

There are two “types” of a Conda environment available on Rescale, one that contains a number of pre-installed packages (Anaconda), and one that is an empty environment (Miniconda). There are different advantages to using either based on the needs of the user. The Miniconda environment is a much smaller snapshot to load onto the cluster, so start up times will be shorter. This makes it an ideal choice if only one or two different Python packages are required for the analysis. If you are using a custom Python post-processing script in addition to a CFD simulation for example, then a Miniconda environment should be sufficient. If you know your analysis requires many of the pre-installed packages in the Anaconda environment, the additional cluster start up time is likely worth it, as you would need to install all the packages once the cluster is up and running otherwise. Regardless of which environment you select the Conda commands will behave the same, so commands listed here should work for both Miniconda and Anaconda.

In the command window for the Rescale UI, you can add the line:

conda install -y <package-name>

The -y flag is necessary to finish the installation so it is important to include it. You can also specify the version of the package in the command with:

conda install -y <package-name>=<version-num>

Conda install instructions from Anaconda distributor

All packages that are installed this way are now available to be called in a Python script executed on that cluster. For example:

conda install -y numpy=1.15.4python myscript_with_numpy_calls.py

If you would like to run a Python script for post processing that uses additional Python packages you can add Miniconda as an additional software, and Conda install all the required packages. For example if one wanted to add some postprocessing visualization with matplotlib to a SU2 run, the “Software Settings” page will look like:

for the SU2 tile, and:

for the Miniconda command.

In general, parallelization of Python scripts does not automatically extend to multinode configurations, and is usually restricted to launching multiple serial processes on the same node. To truly take advantage of the capabilities of running large clusters on Rescale, some type of parallel backend needs to be created and managed. Once again Conda can assist in getting this set up and the required packages installed. One of the primary advantages of using the Conda package management system is that it can install and configure packages outside of Python. This becomes very advantageous when installing packages that require compatibility with low level libraries, including MPI which is necessary for packages such as mpi4py. The are two applications that will be explored, one where the parallelization is handled through an ipcluster and is used to accelerate training of a machine learning model, and another with more raw MPI calls using mpi4py.

Before beginning the set up of a Python parallel environment there are some details with regards to how a multi-node cluster is set up on Rescale that should be discussed. Rather than installing all requisite packages on each node individual it is recommended to clone the Conda env to the NFS mounted ~/work/shared directory. To do this you can add the following commands to the job:

conda create -p ~/work/shared/uenv --clone uenvconda activate ~/work/shared/uenv... ... Run workflow ......rm -rf ~/work/shared/uenv

NOTE: This is only necessary if a more than one node is used for the cluster.

Then if you run a conda install -p ~/work/shared/uenv -y <package-name> afterwards that package will be available to all compute nodes. For example:

conda create -p ~/work/shared/uenv --clone uenvconda activate ~/work/shared/uenvconda install -p ~/work/shared/uenv -y <package-name> python myscript_with_package-name_calls.py rm -rf ~/work/shared/uenv

Please note that you do need to add the prefix flag -p ~/work/shared/uenv when Conda installing packages on a multinode cluster so they will be installed in the correct environment. Also, if you clone the virtual environment to the shared directory you will need to activate it when ssh’ing into the cluster interactively. It is important to remove the Conda environment from the shared directory once the job is completed, rm -rf ~/work/shared/uenv, otherwise there will be many unnecessary file uploads. Again, if only running on a single node the prefix flag -p ~/work/shared/uenv is not necessary.

The following instructions assume the user will be running on a multi-node cluster, so the Conda environment is located at ~/work/shared/uenv. A few packages are necessary to spin up an Ipyparallel backend, the order of installation can matter so it is recommended to always start with mpi4py and ipython.

conda install -p ~/work/shared/uenv -y mpi4pyconda install -p ~/work/shared/uenv -y ipythonconda install -p ~/work/shared/uenv -y ipyparallelconda install -p ~/work/shared/uenv -y numpyconda install -p ~/work/shared/uenv -y joblib

The first step is to create a new default ipython directory in the NFS drive so parallel configuration files can be past to all nodes.

export IPYTHONDIR=~/work/shared/.ipython

Now create a new profile, in this case we assume the environment has been cloned into ~/work/shared/uenv so we call the ipython executable located there.

~/work/shared/uenv/bin/ipython profile create --parallel

To start the controller use the following command on the head node. It is tempting to try and start with the -n flag equal to the total number of cores available on the cluster, but it should be restricted to the total number of cores on the head node. We will add the rest of the cores as individual “engines”.

ipcluster start -n $RESCALE_CORES_PER_NODE --engines=MPI --ip='*'

Once the controller is up, ipengines need to be started on the worker nodes. If this is automated, a sleep command or some type of callback on the successful startup of the controller is needed before the engines are started. The engines are connected to the controller using the information passed through the ipcontroller-engine.json file. One of the primary reason for exporting the IPYTHONDIR variable was so this file would be available on all nodes by default.

You actually need to start a separate engine for each core on each worker nodes. If you have created the profile using profile create command described above, the ipcontroller-engine.json file will be located in ~/work/shared/.ipython/profile_default/security/. To start an individual engine you can use the command:

ipengine --mpi --file=~/work/shared/.ipython/profile_default/security/ipcontroller-engine.json 

This part should really be scripted to automate starting the engines on all the worker nodes, a script to do this is shown below. It will take the machine file on the cluster and start up engines on each core of each of the worker nodes.

#!/bin/bash headnode=$(head -n 1 $HOME/machinefile)for host in $(cat $HOME/machinefile); do   if ! [ "$host" == "$headnode" ]; then    for i in $(seq 1 $RESCALE_CORES_PER_NODE); do       ssh "$host" "conda activate ~/work/shared/uenv; nohup ipengine --mpi --file=~/work/shared/.ipython/profile_default/security/ipcontroller-engine.json" >> "engine_output.log" &      sleep 1    done   fi  done

It is important to activate the “shared” virtual environment even when passing the command over SSH. The addition of the nohup command lets you background the process remotely.

Once the ipcluster is up and running, connecting to it should be relatively simple. Here is a snippet that should demonstrate how to connect and run some basic operations.

from ipyparallel import Clientimport numpy as np # Connect to the default profile rc = Client() # Create a direct view dv = rc[:] # Add 100 sets of 3 numbers in parallel using all engines  dv.map_sync(lambda x, y, z: x + y + z, range(100), range(100), range(100)) # Load balanced view bview = rc.load_balanced_view() # Can be used for jobs that take different amounts of time to complete   bview.map_sync(lambda x: sum(x), np.random.random((10, 100000)))

Behind the scenes, many machine learning algorithms are incredibly parallelizable, and GPUs are often used as they can handle these parallel workloads very well. GPUs are also attractive as logistically it can be easier to have a single GPU available, than an HPC cluster with hundreds of cores. However, with Rescale the resource constraint is eliminated and CPU parallelization is also possible! This is not to suggest CPU parallelization is a better option in general, GPUs are certainly often a good choice, but it useful to be aware of what is possible. There are a number of functions in the scikit-learn tool set that have built in support for CPU parallelization. This makes scaling out these models very easy… provided you have the parallel environment configured correctly. Luckily, if you have followed the steps outlined above, you should be good plug in a couple lines of code and start running these models on as large of a Rescale cluster as you would like. This should look something like:

...
rc = Client()
bview = rc.load_balanced_view()
register_parallel_backend('ipyparallel', lambda: IPythonParallelBackend(view=bview))
with parallel_backend('ipyparallel'):    
search.fit(digits.data, digits.target)
...

An example of a Support Vector Classification (SVC) algorithm applied to the classic MNIST example where the bulk of the computationally intensive portion is performed using an ipcluster is shown below.

from sklearn.externals.joblib import Parallel, parallel_backend, register_parallel_backend
from sklearn.model_selection import RandomizedSearchCV
from sklearn.datasets import load_digits
from sklearn.svm import SVC
import numpy as np
from ipyparallel import Client
from ipyparallel.joblib import IPythonParallelBackend 

digits = load_digits() 

rc = Client()
bview = rc.load_balanced_view()
register_parallel_backend('ipyparallel', lambda: IPythonParallelBackend(view=bview)) 
param_space = {     
'C': np.logspace(-6, 6, 300),     
'gamma': np.logspace(-8, 8, 300),     
'tol': np.logspace(-4, -1, 300),     
'class_weight': [None, 'balanced'], 
} 

model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=10, n_iter=1000, verbose=1, n_jobs=-1)

with parallel_backend('ipyparallel'):    
search.fit(digits.data, digits.target)  
  
rc.shutdown()

For this problem the fit time scales incredibly well with the number of CPU’s used for the ipcluster even across multiple nodes.

Number of CoresTime to Fit
4 cores27.4 min
9 cores11.7 min
18 cores6.0 min
36 cores*3.1 min

*run on multiple nodes

It is recommended to script the ipcluster setup process and then call the python script separately. The example job linked below contains the example discussed where the entire process is scripted.



Use mpi4py to Add Rudimentary MPI Calls to Python Scripts

Another way to take advantage of the ability to launch large clusters on Rescale is to build out Python code with functions from mpi4py. This does require the code to be constructed with these calls from the onset. It’s less of a plug and play type solution than an ipcluster, but it can be good option in certain scenarios. Again, when used on clusters with more than one node, you should follow the instructions for cloning the Conda environment here.

from mpi4py import MPI
import numpy as np
 
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
 
# Point-to-Point Communication 
if rank == 0:
    data = np.arange(1000, dtype='i')
    comm.Send([data, MPI.INT], dest=1, tag=77)
elif rank == 1:
    data = np.empty(1000, dtype='i')
    comm.Recv([data, MPI.INT], source=0, tag=77)
    
# MPI Broadcast      
if rank == 0:
    data = np.arange(100, dtype='i')
else:
    data = np.empty(100, dtype='i')
comm.Bcast(data, root=0)
for i in range(100):
    assert data[i] == i    
 
# Collective I/O with NumPy arrays 
amode = MPI.MODE_WRONLY|MPI.MODE_CREATE
fh = MPI.File.Open(comm, "./datafile.contig", amode)
 
buffer = np.empty(10, dtype=np.int)
buffer[:] = comm.Get_rank()
 
offset = comm.Get_rank()*buffer.nbytes
fh.Write_at_all(offset, buffer)
 
fh.Close()

Only basic examples are presented here but more information can be found at: mpi4py docs and basic tutorials

To run the script you need to invoke an mpirun or mpiexec call as such:

mpiexec -n 4 python script_with_mpi4py.py