Multiprocessing and pickling functions

1 September 2010 21:42

I've had the good fortune to get to experiment with Python's excellent multiprocessing module recently. It's an interesting case of a language feature largely designed to take advantage of a performance characteristic of some operating systems; namely, highly efficient forking in unix-style systems. It exists on Windows too, but as a developer you have to be aware of the relatively high cost of producing new processes.

The multiprocessing module is Python's answer to the parallelisation problem. Because of the GIL, cPython can only have one thread actually executing at any instance. (Some Python implementations may be able to avoid the GIL, but there is a performance cost and maybe as significantly, no compatibility with existing C extension modules). But since threaded programming is hard, using separate processes and having a simple but explicit method of transferring data between them is very useful.

I found multiprocessing very easy to work with. After a while manually scheduling tasks into processes and polling them to see whether they had completed, I figured that using multiprocessing.Pool would do most of the work for me, and most importantly give me callbacks instead of polling. Here's a simple example of using multiprocessing.Pool:

from multiprocessing import Pool

def f(x):
    return x**2

def printer(x):
    print x

pool = Pool()

pool.map_async(f, range(10), callback=printer)
pool.close()
pool.join() # Essential otherwise main process can exit before results returned

This example works. What I was initially doing didn't. The reason what I was doing didn't work was that the library I was writing didn't have a fixed list of functions to pass to map_async. Because of that, I had to dynamically produce a function to use as a callback, as I can't work out what to do with the result without knowing which function is came from.

[It would have been possible to create a Pool for one function at a time, and record that function in a shared variable, but that's pretty ugly too. Besides, each function took some other arguments beside that one being iterated over by map, so I needed a closure on those arguments anyway.]

So what's the problem with dynamically producing a function? Well, map_async passes the function to the subprocess executing it via pickling. Python functions can't truly be pickled, but so long as they are importable in the standard way they appear to be, though they are actually passed by name. So the problem I had was that I got errors like this:

pickle.PicklingError: Can't pickle <function b at 0x5014f0>: it's not found as pickletest.b

What's going on here is that the function I have dynamically created, b, which has acquired module pickletest (presumably since that's where the function that created it lives), isn't actually importable as pickletest.b. A read of the pickle module source showed that what matters is:

  1. the module (__module__ on the function) is importable
  2. the module contains the function with name matching the function's __name__ attribute

Clearly if I create a function in the main process which isn't in the subprocess that isn't going to work. But if I create it before forking and creating the worker pool, it seems a bit silly to be prevented from using it just because it can't be pickled.

The solution I came up with was to create a module to use as a namespace to hold the functions I dynamically generated. The module has to be a physical file (not created by instantiating types.ModuleType), but what it contains can be added at run-time (before forking), and the rules above for what can be pickled are still met.

Note that due to the way Python's imports work, if the module is already imported then re-running import doesn't actually do anything (though it seems that it does have to be imported once: importing a module created from types.ModuleType fails). So, putting all that together I came up with something along these lines:

from multiprocessing import Pool
import mapfn
import resultfn

def square(x):
    return x**2

def cube(x):
    return x**3

def makeContextFunctions(*fns):
    for f in fns:
        newName = f.__name__
        def mapper(x):
            return f(x)
        mapper.__name__ = newName
        mapper.__module__ = 'mapfn'
        setattr(mapfn, newName, mapper)
        f.mapper = mapper

        def resultFn(x):
            print "%s returned result %s" % (f.__name__, x)
        resultFn.__name__ = newName
        resultFn.__module__ = resultfn
        setattr(resultfn, newName, resultFn)
        f.resultFn = resultFn

makeContextFunctions(square)
makeContextFunctions(cube)

pool = Pool()

pool.map_async(square.mapper, range(10), callback=square.resultFn)
pool.map_async(cube.mapper, range(10), callback=cube.resultFn)
pool.close()
pool.join() # Essential otherwise main process can exit before results returned

What happens here is that after dynamically creating functions I set their __name__ and __module__ attributes, set the functions to be attributes of a module, and, to make finding them easier, actually set them as attributes of the function which they are based on.

One problem with the code above is that the function names I am dealing with are not guaranteed to be unique, since the functions come from many modules. I tried to replicate the module structure the functions were found in, but that failed to due to pickle failing to import dynamically created modules. I ended up encoding the full function path by replacing - with -- and . with -_-.

Finally I realised that the code responsible for doing the dance above to make pickling work shouldn't be confused with the functions to make the mapper and callback functions. So I ended up writing a function with took three functions are arguments, the latter two which expect the first as an argument. And it worked first time. It should provide me with good code for interview questions!

Leave a comment