开发者

Python: How can I check the number of pending tasks in a multiprocessing.Pool?

I have a small pool of workers (4) and a very large list of tasks (5000~). I'm using a pool and sending the tasks with map_async(). Because the task I'm running is fairly long, I'm forcing a chunksize of 1 so that one long process can't hold up some shorter ones.

What I'd like to do is periodically check how many tasks are left to be submitted. I know at most 4 will be active, I'm concerned with how many are left to process.

I've googled around and I can't find anybody doing this.

Some simple code to help:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.clo开发者_Go百科se()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break


Looks like jobs._number_left is what you want. _ indicates that it is an internal value that may change at the whim of the developers, but it seems to be the only way to get that info.


You can check the number of pending jobs by seeing Pool._cache attribute assuming that you are using apply_async. This is where ApplyResult is stored until they are available and equals to the number of ApplyResults pending.

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()


No airtight way that I know of, but if you use the Pool.imap_unordered() function instead of map_async, you can intercept the elements that are processed.

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

I'm subtracting process_count, because you can pretty much assume that all processes will be processing with one of two exceptions: 1) if you use an iterator, there may not be further items left to consume and process, and 2) You may have fewer than 4 items left. I didn't code in for the first exception. But it should be pretty easy to do so if you need to. Anyway, your example uses a list so you shouldn't have that problem.

Edit: I also realized you're using a While loop, which makes it look like you're trying to update something periodically, say, every half second or something. The code I gave as an example will not do it that way. I'm not sure if that's a problem.


I have similar requirements: track progress, perform interim work based on the results, stop all processing cleanly at any arbitrary time. How I've dealt with it is to send tasks one at a time with apply_async. A heavily simplified version of what I do:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

Note that I use a Queue instead of returning the results.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜