Multiprocessing Queue in Python
I'm trying to use a queue with the multiprocessing library in Python. After executing 开发者_JS百科the code below (the print statements work), but the processes do not quit after I call join on the Queue and there are still alive. How can I terminate the remaining processes?
Thanks!
def MultiprocessTest(self):
print "Starting multiprocess."
print "Number of CPUs",multiprocessing.cpu_count()
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = multiprocessing.JoinableQueue()
for i in range(num_procs):
p = multiprocessing.Process(target=worker)
p.daemon = True
p.start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
print "q close"
q.join()
#q.close()
print "Finished everything...."
print "num active children:",multiprocessing.active_children()
try this:
import multiprocessing
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
for item in iter( q.get, None ):
do_work(item)
q.task_done()
q.task_done()
q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
procs.append( multiprocessing.Process(target=worker) )
procs[-1].daemon = True
procs[-1].start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
q.join()
for p in procs:
q.put( None )
q.join()
for p in procs:
p.join()
print "Finished everything...."
print "num active children:", multiprocessing.active_children()
Your workers need a sentinel to terminate, or they will just sit on the blocking reads. Note that using sleep on the Q instead of join on the P lets you display status information etc.
My preferred template is:
def worker(q,nameStr):
print 'Worker %s started' %nameStr
while True:
item = q.get()
if item is None: # detect sentinel
break
print '%s processed %s' % (nameStr,item) # do something useful
q.task_done()
print 'Worker %s Finished' % nameStr
q.task_done()
q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
nameStr = 'Worker_'+str(i)
p = multiprocessing.Process(target=worker, args=(q,nameStr))
p.daemon = True
p.start()
procs.append(p)
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
for i in range(num_procs):
q.put(None) # send termination sentinel, one for each process
while not q.empty(): # wait for processing to finish
sleep(1) # manage timeouts and status updates etc.
Here is a sentinel-free method for the relatively simple case where you put a number of tasks on a JoinableQueue
, then launch worker processes that consume the tasks and exit once they read the queue "dry". The trick is to use JoinableQueue.get_nowait()
instead of get()
. get_nowait()
, as the name implies, tries to get a value from the queue in a non-blocking manner and if there's nothing to be gotten then a queue.Empty
exception is raised. The worker handles this exception by exiting.
Rudimentary code to illustrate the principle:
import multiprocessing as mp
from queue import Empty
def worker(q):
while True:
try:
work = q.get_nowait()
# ... do something with `work`
q.task_done()
except Empty:
break # completely done
# main
worknum = 4
jq = mp.JoinableQueue()
# fill up the task queue
# let's assume `tasks` contains some sort of data
# that your workers know how to process
for task in tasks:
jq.put(task)
procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ]
for p in procs:
p.start()
for p in procs:
p.join()
The advantage is that you do not need to put the "poison pills" on the queue so the code is a bit shorter.
IMPORTANT : in more complex situations where producers and consumers use the same queue in an "interleaved" manner and the workers may have to wait for new tasks to come along, the "poison pill" approach should be used. My suggestion above is for simple cases where the workers "know" that if the task queue is empty, then there's no point hanging around any more.
You have to clear the queue before joining the process, but q.empty() is unreliable.
The best way to clear the queue is to count the number of successful gets or loop until you receive a sentinel value, just like a socket with a reliable network.
The code below may not be very relevant but I post it for your comments/feedbacks so we can learn together. Thank you!
import multiprocessing
def boss(q,nameStr):
source = range(1024)
for item in source:
q.put(nameStr+' '+str(item))
q.put(None) # send termination sentinel, one for each process
def worker(q,nameStr):
while True:
item = q.get()
if item is None: # detect sentinel
break
print '%s processed %s' % (nameStr,item) # do something useful
q = multiprocessing.Queue()
procs = []
num_procs = 4
for i in range(num_procs):
nameStr = 'ID_'+str(i)
p = multiprocessing.Process(target=worker, args=(q,nameStr))
procs.append(p)
p = multiprocessing.Process(target=boss, args=(q,nameStr))
procs.append(p)
for j in procs:
j.start()
for j in procs:
j.join()
精彩评论