Run in parallel
How to choose your preferred parallelisation backend, how to parallelise any function using parallel.map, parallel.imap, and parallel.as_completed as standalone utilities, and how to enable parallel execution in app pipelines with parallel=True and par_kw.
Data level parallelism
scinexus supports parallel computation for the common case where the same calculation needs to be applied to many independent data items. A master process splits the work among available CPU cores, each worker processes its share, and results are collected.
Warning
Parallelism is not always faster. You should see a performance gain when the computation time per task significantly exceeds the overhead of distributing work. If individual tasks are very fast, the overhead of inter-process communication can dominate.
If individual output files are small, storing results in a single file (e.g. a .sqlitedb database) is more efficient than writing many small files.
Choosing a parallel backend
scinexus supports three parallel backends. The default uses only the Python standard library and requires no extra installs.
| Backend | Install | Best for |
|---|---|---|
"multiprocess" |
included | scripts, CI, environments where you control dependencies |
"loky" |
pip install "scinexus[loky]" |
Jupyter notebooks, interactive sessions, long-running pools |
"mpi" |
pip install "scinexus[mpi]" |
HPC clusters with multiple nodes |
Set the backend once, typically at the top of your script or notebook:
import scinexus
scinexus.set_parallel_backend("loky")
Note
The "loky" backend uses loky which provides reusable process pools and robust pickling via cloudpickle. This makes it the recommended choice for Jupyter notebooks, where the stdlib ProcessPoolExecutor can fail to serialise closures and lambda functions.
Getting a specific backend without changing the default
If your code requires a particular backend, pass the backend argument to get_parallel_backend. This returns an instance of the requested backend without changing the global default, so other packages that depend on the current setting are unaffected:
from scinexus import get_parallel_backend
backend = get_parallel_backend(backend="loky")
Parallel computation on a single computer
Using app.apply_to()
If you have a composed app with a writer, use apply_to() with the parallel and par_kw keyword arguments:
result = app.apply_to(dstore, parallel=True, par_kw=dict(max_workers=4))
Using app.as_completed()
If you have a composed app without a writer, use as_completed(). This returns a generator, so wrap it with list() or iterate over it:
results = list(app.as_completed(dstore, parallel=True, par_kw=dict(max_workers=4)))
Using scinexus.parallel directly
For parallelising any function (not just apps), use the functions in scinexus.parallel.
parallel.as_completed -- results in completion order
Returns results as they finish. The order may differ from the input order. It also tends to balance work better across compute nodes than imap or map.
from scinexus import parallel
result = list(parallel.as_completed(is_prime, PRIMES, max_workers=4))
The first argument is the function to call, the second is the iterable of inputs. Each input element is passed as a single argument to the function. The data is broken into chunks across workers automatically.
Note
If you don't specify max_workers, all available CPUs are used.
parallel.imap -- preserving input order (generator)
Returns results in the same order as the input, yielding one at a time:
from scinexus import parallel
for result in parallel.imap(process_item, items, max_workers=4):
handle(result)
parallel.map -- preserving input order (list)
Same as imap but returns a list:
from scinexus import parallel
results = parallel.map(process_item, items, max_workers=4)
Complete example
import math
from scinexus import parallel
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
PRIMES = [
112272535095293,
112582705942171,
115280095190773,
115797848077099,
117450548693743,
993960000099397,
]
if __name__ == "__main__":
results = parallel.map(is_prime, PRIMES, max_workers=4)
for number, prime in zip(PRIMES, results):
print(f"{number} is prime: {prime}")
Parallel computation on multiple computers (MPI)
On systems with multiple nodes (e.g. an HPC cluster), use MPI via the mpi4py library. You need to install an MPI implementation (e.g. OpenMPI) and the mpi4py Python package
pip install mpi4py
or installing scinexus with mpi extra.
Set the backend to MPI:
import scinexus
scinexus.set_parallel_backend("mpi")
Or pass use_mpi=True to any of the parallel functions:
from scinexus import parallel
results = parallel.map(is_prime, PRIMES, use_mpi=True, max_workers=PBS_NCPUS)
Or with app pipelines:
result = app.apply_to(dstore, parallel=True, par_kw=dict(use_mpi=True, max_workers=4))
To run an MPI script, invoke it via mpiexec:
mpiexec -n $PBS_NCPUS python3 -m mpi4py.futures my_script.py
Note
You can use MPI for parallel execution on a single computer too. This can be useful for testing your code locally before migrating to a larger system.
MPI script structure
MPI scripts must guard the main logic behind if __name__ == "__main__"::
import os
from scinexus import parallel
PBS_NCPUS = int(os.environ["PBS_NCPUS"])
def process(data): ...
if __name__ == "__main__":
results = parallel.map(process, my_data, use_mpi=True, max_workers=PBS_NCPUS)
Custom backends
You can integrate any parallel engine by subclassing Parallel:
from scinexus.parallel import Parallel, set_parallel_backend
class DaskBackend(Parallel):
def __init__(self, client):
self._client = client
def imap(self, f, s, max_workers=None, **kwargs):
futures = self._client.map(f, list(s))
yield from self._client.gather(futures)
def as_completed(self, f, s, max_workers=None, **kwargs):
from dask.distributed import as_completed
futures = self._client.map(f, list(s))
for future in as_completed(futures):
yield future.result()
def is_master_process(self):
from dask.distributed import get_worker
try:
get_worker()
return False
except ValueError:
return True
def get_rank(self):
return 0
def get_size(self):
return sum(self._client.nthreads().values())
set_parallel_backend(DaskBackend(client))
The above is a hypothetical wrapping of dask!
The example for adding dask as a parallelisation backend is not tested! Also note that this would require installing "dask[distributed]".