import multiprocessing as mp
import random
import sys
import numpy as np
[docs]def parmap1(f, X, *args, tuple_X=False, nprocs=mp.cpu_count()):
"""
Parallelism over data.
This function mimics ``multiprocessing.Pool.map`` to allow extra arguments to be used
for the function ``f``.
Parameters
----------
f: function
The function that operates on the data.
X: list
Data to be parallelized.
args: args
Extra positional arguments needed by the function ``f``.
tuple_X: bool
This depends on ``X``. It should be set to ``True`` if multiple arguments are
parallelized and set to ``False`` if only one argument is parallelized. See
``Example`` below.
nprocs: int
Number of processors to use.
Return
------
list
A list of results, corresponding to ``X``.
Note
----
The data is put into a job queue, a worker process gets a piece of the data to work
on, the worker pushes the result back to the manager through another queue, and then
get another piece of data until the job queue is empty. So, in principle, there will
not be idle worker and it should be faster than :meth:`kliff.parallel.parmap2`.
Warning
-------
This is implemented using ``multiprocessing.Queue``, which requires the function``f``
to be picklable. If it is not the case (e.g. use KIM library functions), use
:meth:`kliff.parallel.parmap2` that is based on ``multiprocessing.Pipe``.
Example
-------
>>> def func(x, y, z=1):
>>> return x+y+z
>>> X = range(3)
>>> Y = range(3)
>>> parmap1(func, X, 1, nprocs=2) # [2,3,4]
>>> parmap1(func, X, 1, 1, nprocs=2) # [2,3,4]
>>> parmap1(func, zip(X, Y), tuple_X=True, nprocs=2) # [1,3,5]
>>> parmap1(func, zip(X, Y), 1, tuple_X=True, nprocs=2) # [1,3,5]
"""
ctx = get_context()
q_in = ctx.Queue(nprocs)
q_out = ctx.Queue()
processes = []
for _ in range(nprocs):
p = ctx.Process(target=_func1, args=(f, q_in, q_out))
p.daemon = True
p.start()
processes.append(p)
N = 0
for i, x in enumerate(X):
N += 1
if tuple_X:
ix = (i, *x)
else:
ix = (i, x)
q_in.put((ix, args))
[q_in.put((None, None)) for _ in range(nprocs)]
results = [q_out.get() for _ in range(N)]
[p.join() for p in processes]
return [r for i, r in sorted(results)]
def _func1(f, q_in, q_out):
while True:
ix, args = q_in.get()
if ix is None:
break
i = ix[0]
x = ix[1:]
y = f(*x, *args)
q_out.put((i, y))
[docs]def parmap2(f, X, *args, tuple_X=False, nprocs=mp.cpu_count()):
"""
Parallelism over data.
This is to mimic ``multiprocessing.Pool.map``, which requires the function ``f`` to be
picklable. This function does not have this restriction and allows extra arguments to
be used for the function ``f``.
Parameters
----------
f: function
The function that operates on the data.
X: list
Data to be parallelized.
args: args
Extra positional arguments needed by the function ``f``.
tuple_X: bool
This depends on ``X``. It should be set to ``True`` if multiple arguments are
parallelized and set to ``False`` if only one argument is parallelized. See
``Example`` below.
nprocs: int
Number of processors to use.
Return
------
list
A list of results, corresponding to ``X``.
Note
----
This function is implemented using ``multiprocessing.Pipe``. The data is subdivided
into ``nprocs`` groups and then each group of data is distributed to a process. The
results from each group are then assembled together. The data is shuffled to balance
the load in each process. See :meth:`kliff.parallel.parmap1` for another
implementation that uses ``multiprocessing.Queue``.
Example
-------
>>> def func(x, y, z=1):
>>> return x+y+z
>>> X = range(3)
>>> Y = range(3)
>>> parmap2(func, X, 1, nprocs=2) # [2,3,4]
>>> parmap2(func, X, 1, 1, nprocs=2) # [2,3,4]
>>> parmap2(func, zip(X, Y), tuple_X=True, nprocs=2) # [1,3,5]
>>> parmap2(func, zip(X, Y), 1, tuple_X=True, nprocs=2) # [1,3,5]
"""
ctx = get_context()
# shuffle and divide into nprocs equally-numbered parts
if tuple_X:
pairs = [(i, *x) for i, x in enumerate(X)] # to make array_split work
else:
pairs = [(i, x) for i, x in enumerate(X)]
random.shuffle(pairs)
groups = np.array_split(pairs, nprocs)
processes = []
managers = []
for i in range(nprocs):
manager_end, worker_end = ctx.Pipe(duplex=False)
p = ctx.Process(target=_func2, args=(f, groups[i], args, worker_end))
p.daemon = True
p.start()
processes.append(p)
managers.append(manager_end)
results = []
for m in managers:
results.extend(m.recv())
for p in processes:
p.join()
return [r for i, r in sorted(results)]
def _func2(f, iX, args, worker_end):
results = []
for ix in iX:
i = ix[0]
x = ix[1:]
results.append((i, f(*x, *args)))
worker_end.send(results)
[docs]def get_MPI_world_size():
try:
from mpi4py import MPI
mpi4py_available = True
except ImportError as e:
mpi4py_available = False
if mpi4py_available:
return MPI.COMM_WORLD.Get_size()
else:
return 1
[docs]def get_context():
if sys.platform == "darwin":
ctx = mp.get_context("fork")
else:
ctx = mp.get_context()
return ctx