Which strategy to use with multiprocessing in python
I am completely new to multiprocessing. I have been reading documentation about multiprocessing module. I read about Pool, Threads, Queues etc. but I am completely lost.
What I want to do with multiprocessing is that, convert my humble http downloader, to work with multiple workers. What I am doing at the moment is, download a page, parse to page to get interesting links. Continue until all interesting links are downloaded. Now, I want to implement this with multiprocessing. But I have no idea at the moment, how to organize this work flow. I had two thoughts about this. Firstly, I thought about having two queues. One queue for links that needs to be downloaded, other for links to be parsed. One worker, downloads the pages, and adds them to queue which is for items that needs to be parsed. And other process parses a page, and adds the links it finds interesting to the other queue. Problems I expect from this approach are; first of all, why download one page at a time and parse a page at a time. Moreover, how do one process know that there are items to be added to queue later, after it exhausted all items from queue.
Another approach I thought about using is that. Have a function, that can be called with an url as an argument. This function downloads the document and starts parsing it for the links. Every time it encounters an interesting link, it instantly creates a new thread running identica开发者_StackOverflow社区l function as itself. The problem I have with this approach is, how do I keep track of all the processes spawned all around, how do I know if there is still processes to running. And also, how do I limit maximum number of processes.
So I am completely lost. Can anyone suggest a good strategy, and perhaps show some example codes about how to go with the idea.
Here is one approach, using multiprocessing. (Many thanks to @Voo, for suggesting many improvements to the code).
import multiprocessing as mp
import logging
import Queue
import time
logger=mp.log_to_stderr(logging.DEBUG) # or,
# logger=mp.log_to_stderr(logging.WARN) # uncomment this to silence debug and info messages
def worker(url_queue,seen):
while True:
url=url_queue.get()
if url not in seen:
logger.info('downloading {u}'.format(u=url))
seen[url]=True
# Replace this with code to dowload url
# urllib2.open(...)
time.sleep(0.5)
content=url
logger.debug('parsing {c}'.format(c=content))
# replace this with code that finds interesting links and
# puts them in url_queue
for i in range(3):
if content<5:
u=2*content+i-1
logger.debug('adding {u} to url_queue'.format(u=u))
time.sleep(0.5)
url_queue.put(u)
else:
logger.debug('skipping {u}; seen before'.format(u=url))
url_queue.task_done()
if __name__=='__main__':
num_workers=4
url_queue=mp.JoinableQueue()
manager=mp.Manager()
seen=manager.dict()
# prime the url queue with at least one url
url_queue.put(1)
downloaders=[mp.Process(target=worker,args=(url_queue,seen))
for i in range(num_workers)]
for p in downloaders:
p.daemon=True
p.start()
url_queue.join()
- A pool of (4) worker processes are created.
- There is a
JoinableQueue
, calledurl_queue
. - Each worker gets a url from the
url_queue
, finds new urls and adds them to theurl_queue
. - Only after adding new items does it call
url_queue.task_done()
. - The main process calls
url_queue.join()
. This blocks the main process untiltask_done
has been called for every task in theurl_queue
. - Since the worker processes have the
daemon
attribute set to True, they too end when the main process ends.
All the components used in this example are also explained in Doug Hellman's excellent Python Module of the Week tutorial on multiprocessing.
What you're describing is essentially graph traversal; Most graph traversal algorithms (That are more sophisticated than depth first), keep track of two sets of nodes, in your case, the nodes are url's.
The first set is called the "closed set", and represents all of the nodes that have already been visited and processed. If, while you're processing a page, you find a link that happens to be in the closed set, you can ignore it, it's already been handled.
The second set is unsurprisingly called the "open set", and includes all of the edges that have been found, but not yet processed.
The basic mechanism is to start by putting the root node into the open set (the closed set is initially empty, no nodes have been processed yet), and start working. Each worker takes a single node from the open set, copies it to the closed set, processes the node, and adds any nodes it discovers back to the open set (so long as they aren't already in either the open or closed sets). Once the open set is empty, (and no workers are still processing nodes) the graph has been completely traversed.
Actually implementing this in multiprocessing
probably means that you'll have a master task that keeps track of the open and closed sets; If a worker in a worker pool indicates that it is ready for work, the master worker takes care of moving the node from the open set to the closed set and starting up the worker. the workers can then pass all of the nodes they find, without worrying about if they are already closed, back to the master; and the master will ignore nodes that are already closed.
精彩评论