Multiprocessing in Python
Most of the codes I develop run in parallel using MPI (Message Passing Interface) using the python wrapper,
mpi4py. There is a reason why highly scalable programs use this approach, and that is because each processor handles its own chunk of memory and communicates with other processors only when it's needed. PETSc, for example, is a behemoth computing framework entirely written in the MPI computing philosophy. Despite MPI's efficiency, there are some barriers:
- MPICH or OpenMPI must be already compiled on the system
- Python needs
mpi4pyto communicate in parallel
- Programming with MPI is sometimes extremely tedious
- The problem may not benefit from distributed memory
If any of these are true, then OpenMP is a simple alternative. The OpenMP philosophy is to use a “master” process to spawn a bunch of worker processes that each share the same memory allocation. Clearly, this is approach is unsuitable if computation spans more than one computation node. On the positive side, most programming languages already have some implementation of OpenMP without the need for additional software. We explore such an implementation withihn the
multiprocessing module in Python.
There are 2 main objects in the multiprocessing module, which can be imported as:
from multiprocessing import Pool, Queue
I have found
Queue to be the most intuitive. It sets up a queue of tasks to be executed by each processor. When one task completes, the free processor takes another task from the queue until there are no tasks remaining.
from multiprocessing import Queue, Process, cpu_count n = n_evaluations processes =  q_in = Queue(1) q_in = Queue() nprocs = cpu_count() # initialise the processes for i in range(nprocs): p = Process(target=function, args=args, kwargs=kwargs) processes.append(p) for p in processes: p.daemon = True p.start() # put items in the queue sent = [q_in.put((i, var)) for i in range(n)] # get the results for i in range(len(sent)): i, res = q_out.get() # wait until each processor has finished [p.join() for p in processes]
nprocsto the number of processes (the number of CPUs) and pass
ito the queue to reconstruct the ordering of the results.
In the above example, the
Queue object can only be defined from the
__main__ namespace. However, after some tinkering I got this to work within a Python
from multiprocessing import Queue, Process, cpu_count class UptownFunc: def __init__(self): pass def _func_queue(self, func, q_in, q_out, var, *args, **kwargs): """ Retrive processes from the queue """ while True: pos, var = q_in.get() if pos is None: break pass_args = [var] pass_args.extend(args) res = func(*pass_args, **kwargs) q_out.put((pos, res)) return def parallelise_function(self, var, func, *args, **kwargs): """ Split evaluations of func across processors """ n = len(var) processes =  q_in = Queue(1) q_out = Queue() nprocs = cpu_count() for i in range(nprocs): pass_args = [func, q_in, q_out] pass_args.extend(args) p = Process(target=self._func_queue,\ args=tuple(pass_args),\ kwargs=kwargs) processes.append(p) for p in processes: p.daemon = True p.start() # put items in the queue sent = [q_in.put((i, var[i])) for i in range(n)] [q_in.put((None, None)) for _ in range(nprocs)] # get the results results =  ordering =  for j in range(len(sent)): i, res = q_out.get() results.append(res) ordering.append(i) # wait until each processor has finished [p.join() for p in processes] # reorder results return results[ordering]
And that's it! Pass any function to
parallelise_function and supply a list of
var to evaulate in parallel along with optional arguments and keywords.
If this looks familiar it's because I've taken it directly from PyCurious: a tool to calculate the Curie depth from windows of the magnetic anomaly. The above code snippet parallelises the computation of Curie depth across each window.