using list with python multiprocessing
can anyone help me out with sharing a list between multiple python processes. The problem is to get self.ID_List and self.mps_in_process working in the following code.
import time, random
from multiprocessing import Process #, Manager, Array, Queue
class MP_Stuff():
def __init__(self, parent, id):
time.sleep(1 + random.random()*10) # simulate data processing
parent.killMP(id)
class ParamHandler():
def doFirstMP(self, IDs):
self.mps_in_process = []
self.ID_List = IDs
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
def doMP(self):
for tmp in range(3): # nr of concurrent processes
if len(self.ID_List) > 0:
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
def killMP(self, kill_id):
self.mps_in_process.remove(kill_id)
self.doMP()
if __name__ == '__main__':
ID_List = [1,2,3,4,5,6]
paramSet = ParamHandler()
paramSet.doFirstMP(ID_List)
Very shortly, what the code does, is that some data (here, random time in MP_Stuff) is processed according to data id in self.ID_List. In order to know how much data id's are in process self.mps_in_process is used (nr processes is hardcoded here, but actually it's dynamic).
The problem is to share mps_in_process and ID_List across multiple processes. Current code goes into pretty much endless loop. What goes wrong is actually well described in multiprocessing library:
"if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that Process.start() was called."
However, I'm not able to figure out how to get mps_in_process and ID_List working. I cannot use Queue, as the way elements are taken out from mps_in_process is random. I cannot use Array, because .pop(0) does not work. I cannot use Manager().list(), because .remove() and len(ID_List) do not work then. Using threading instead of multiprocessing is no solution, becaus开发者_运维知识库e later freeze_support() must be used.
Therefore, any help how to share list among processes is very welcome!
The Manager is working fine (including len()). The issue with your code is that in your main process, you don't wait until the processing ends, so the main process ends and the manager is no longer accessible. Also I don't know about atomicity of ListProxy's pop, so maybe a lock would be handy.
The solution is p.join()
.
However, I am confused why it is enough to do p.join
at the end of doFirstMP
. I would be happy if somebody could explain why join on the first p returns after all computation is done and not after the first doMP returns.
My code:
import time, random
from multiprocessing import Process, Manager
class MP_Stuff():
def __init__(self, parent, id):
time.sleep(1 + random.random()*5) # simulate data processing
print id , "done"
parent.killMP(id)
class ParamHandler():
def doFirstMP(self, IDs):
self.mps_in_process = []
self.ID_List = Manager().list(IDs)
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
p.join()
print "joined"
def doMP(self):
for tmp in range(3): # nr of concurrent processes
print self.ID_List
if len(self.ID_List) > 0:
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
def killMP(self, kill_id):
print "kill", kill_id
self.mps_in_process.remove(kill_id)
self.doMP()
if __name__ == '__main__':
ID_List = [1,2,3,4,5,6]
paramSet = ParamHandler()
paramSet.doFirstMP(ID_List)
Unfortunately you have already specified your options.
Both Array()
and Manager().list()
should be able to do it, although you might need a little extra work.
- You can emulate a
len(ID_List)
by storing the amount in aValue()
and incrementing/decrementing it. - The
remove()
can easily be emulated with a loop and a delete after it (albeit slower ofcourse).
精彩评论