开发者

multiprocessing Pool.imap broken?

I've tried both the multiprocessing included in the python2.6 Ubuntu package (__version__ says 0.70a1) and the latest from PyPI (2.6.2.1). In both cases I don't know how to use imap correctly - it causes the entire interpreter to stop responding to ctrl-C's (map works fine though). pdb shows next() is hanging开发者_开发百科 on the condition variable wait() call in IMapIterator, so nobody is waking us up. Any hints? Thanks in advance.

$ cat /tmp/go3.py
import multiprocessing as mp
print mp.Pool(1).map(abs, range(3))
print list(mp.Pool(1).imap(abs, range(3)))

$ python /tmp/go3.py
[0, 1, 2]
^C^C^C^C^C^\Quit


First notice that this works:

import multiprocessing as mp
import multiprocessing.util as util
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))

The difference is that pool does not get finalized when the call to pool.imap() ends.

In contrast,

print(list(mp.Pool(1).imap(abs, range(3))))

causes the Pool instance to be finalized soon after the imap call ends. The lack of a reference causes the Finalizer (called self._terminate in the Pool class) to be called. This sets in motion a sequence of commands which tears down the task handler thread, result handler thread, worker subprocesses, etc.

This all happens so quickly, that at least on a majority of runs, the task sent to the task handler does not complete.

Here are the relevant bits of code:

From /usr/lib/python2.6/multiprocessing/pool.py:

class Pool(object):
    def __init__(self, processes=None, initializer=None, initargs=()):
        ...
        self._terminate = Finalize(
            self, self._terminate_pool,
            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                  self._task_handler, self._result_handler, self._cache),
            exitpriority=15
            )

/usr/lib/python2.6/multiprocessing/util.py:

class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
        ...
        if obj is not None:
            self._weakref = weakref.ref(obj, self)   

The weakref.ref(obj,self) causes self() to be called when obj is about to be finalized.

I used the debug command util.log_to_stderr(util.SUBDEBUG) to learn the sequence of events. For example:

import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)

print(list(mp.Pool(1).imap(abs, range(3))))

yields

[DEBUG/MainProcess] created semlock with handle 3077013504
[DEBUG/MainProcess] created semlock with handle 3077009408
[DEBUG/MainProcess] created semlock with handle 3077005312
[DEBUG/MainProcess] created semlock with handle 3077001216
[INFO/PoolWorker-1] child process calling self.run()
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x9d6e62c>, <multiprocessing.queues.SimpleQueue object at 0x9cf04cc>, <multiprocessing.queues.SimpleQueue object at 0x9d6e40c>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1217967248)>, <Thread(Thread-2, started daemon -1226359952)>, {0: <multiprocessing.pool.IMapIterator object at 0x9d6eaec>}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
...

and compare that with

import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))

which yields

[DEBUG/MainProcess] created semlock with handle 3078684672
[DEBUG/MainProcess] created semlock with handle 3078680576
[DEBUG/MainProcess] created semlock with handle 3078676480
[DEBUG/MainProcess] created semlock with handle 3078672384
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[0, 1, 2]
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[SUBDEBUG/MainProcess] calling <Finalize object, callback=_terminate_pool, args=(<Queue.Queue instance at 0xb763e60c>, <multiprocessing.queues.SimpleQueue object at 0xb76c94ac>, <multiprocessing.queues.SimpleQueue object at 0xb763e3ec>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1218274448)>, <Thread(Thread-2, started daemon -1226667152)>, {}), exitprority=15>
...
[DEBUG/MainProcess] finalizing pool


In my case, I was calling the pool.imap() without expecting a return value and not getting it to work. However, if I tried it with pool.map() it worked fine. The issue was exactly as the previous answer stated: there was no finalizer called, so the process was effectively dumped before it was started.

The solution was to evoke a finalizer such as a list() function. This caused it to work correctly, since it now requires fulfillment to be handed to the list function, and thus the process was executed. In brief, it is explained below (this is, of course, simplified. For now, just pretend it's something useful):

from multiprocessing import Pool
from shutil import copy
from tqdm import tqdm

filedict = { r"C:\src\file1.txt": r"C:\trg\file1_fixed.txt",
             r"C:\src\file2.txt": r"C:\trg\file2_fixed.txt",
             r"C:\src\file3.txt": r"C:\trg\file3_fixed.txt",
             r"C:\src\file4.txt": r"C:\trg\file4_fixed.txt" }

# target process
def copyfile(srctrg):  
    copy(srctrg[0],srctrg[1])
    return True

# a couple of trial processes for illustration
with Pool(2) as pool:

    # works fine with map, but cannot utilize tqdm() since no iterator object is returned 
    pool.map(copyfile,list(filedict.items()))

    # will not work, since no finalizer is called for imap
    tqdm(pool.imap(copyfile,list(filedict.items())))    # NOT WORKING

    # this works, since the finalization is forced for the process
    list(tqdm(pool.imap(copyfile,list(filedict.items()))))

In my case, the simple solution was to enclose the entire tqdm(pool.imap(...)) in a list() in order to force the execution.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜