Implementing a special type of multiprocessing queue in Python
Imagine an inverted binary tree with nodes A, B, C, D, E, F on level 0. nodes G,H,I on level 1, node J on level 2, and node K on level 3.
Level 1: G = func(A,B), H = func(C,D), I = func(E,F)
Level 2: J = func(G,H)
Level 3: K = func(J,I).
Each pair of nodes on Level 0 must be processed in order, Each pair of nodes on Level 1 can be processed in any order but the result must on the next level must be processed as shown, and so forth until we end up with the final result, K.
The actual problem is a computational geometry problem in which a sequence of solids are fused together. A is adjacent to B which is adjacent to C, and so on. The resulting fuse of A and B (G) is a开发者_JS百科djacent to the fuse of C and D (H). The resulting fuse of J and I (K) is the final result. Thus you can't fuse G and I since they are not adjacent. If the number of nodes on a level is not a power of 2, you end up with a dangling entity that must be processed one level further.
Since the fuse process is computationally expensive and memory intensive but very parallel, I would like to use the Python multiprocessing package and some form of queue. After calculating G = func(A,B), I would like to push the result G onto the queue for the subsequent J = func(G,H) computation. When the queue is empty, the last result is the final result. Keep in mind that the mp.queue will not necessarily produce results FIFO, since I = func(E,F) may finish before H = func(C,D)
I have come up with a few (bad) solutions but I'm sure there is an elegant solution just beyond my grasp. Suggestions?
I couldn't come up with a smart design for a queue, but you can easily replace the queue with one more process, which in my example I called WorkerManager
. This process gathers results from all Worker
processes and starts new workers only if there are two adjacent data packs waiting to be processed. This way, you'll never try to join non-adjacent results, so you can ignore "levels" and fire the computation of next pair as soon as it's ready.
from multiprocessing import Process, Queue
class Result(object):
'''Result from start to end.'''
def __init__(self, start, end, data):
self.start = start
self.end = end
self.data = data
class Worker(Process):
'''Joins two results into one result.'''
def __init__(self, result_queue, pair):
self.result_queue = result_queue
self.pair = pair
super(Worker, self).__init__()
def run(self):
left, right = self.pair
result = Result(left.start, right.end,
'(%s, %s)' % (left.data, right.data))
self.result_queue.put(result)
class WorkerManager(Process):
'''
Takes results from result_queue, pairs them
and assigns workers to process them.
Returns final result into final_queue.
'''
def __init__(self, result_queue, final_queue, start, end):
self._result_queue = result_queue
self._final_queue = final_queue
self._start = start
self._end = end
self._results = []
super(WorkerManager, self).__init__()
def run(self):
while True:
result = self._result_queue.get()
self._add_result(result)
if self._has_final_result():
self._final_queue.put(self._get_final_result())
return
pair = self._find_adjacent_pair()
if pair:
self._start_worker(pair)
def _add_result(self, result):
self._results.append(result)
self._results.sort(key=lambda result: result.start)
def _has_final_result(self):
return (len(self._results) == 1
and self._results[0].start == self._start
and self._results[0].end == self._end)
def _get_final_result(self):
return self._results[0]
def _find_adjacent_pair(self):
for i in xrange(len(self._results) - 1):
left, right = self._results[i], self._results[i + 1]
if left.end == right.start:
self._results = self._results[:i] + self._results[i + 2:]
return left, right
def _start_worker(self, pair):
worker = Worker(self._result_queue, pair)
worker.start()
if __name__ == '__main__':
DATA = [Result(i, i + 1, str(i)) for i in xrange(6)]
result_queue = Queue()
final_queue = Queue()
start = 0
end = len(DATA)
man = WorkerManager(result_queue, final_queue, start, end)
man.start()
for res in DATA:
result_queue.put(res)
final = final_queue.get()
print final.start
# 0
print final.end
# 6
print final.data
# For example:
# (((0, 1), (2, 3)), (4, 5))
For my example, I used a simple Worker
that returns given data in parentheses, separated by a comma, but you could put any computation in there. In my case, final result was (((0, 1), (2, 3)), (4, 5))
which means that the algorithm computed (0, 1)
and (2, 3)
before computing ((0, 1), (2, 3))
and then joined the result with (4, 5)
. I hope this is what you were looking for.
精彩评论