Python: Something like `map` that works on threads [closed]
We don’t allow questions seeking recommendations for books, tools, software libraries, and more. You can edit the question so it can be answered with facts and citations.
Closed 6 years ago.
Improve this questionI was sure there was something like this in the standard library, but it seems I was wrong.
I have a bunch of urls that I want to urlopen
in parallel. I want some开发者_StackOverflow社区thing like the builtin map
function, except the work is done in parallel by a bunch of threads.
Is there a good module that does this?
There is a map
method in multiprocessing.Pool. That does multiple processes.
And if multiple processes aren't your dish, you can use multiprocessing.dummy which uses threads.
import urllib
import multiprocessing.dummy
p = multiprocessing.dummy.Pool(5)
def f(post):
return urllib.urlopen('http://stackoverflow.com/questions/%u' % post)
print p.map(f, range(3329361, 3329361 + 5))
Someone recommended I use the futures
package for this. I tried it and it seems to be working.
http://pypi.python.org/pypi/futures
Here's an example:
"Download many URLs in parallel."
import functools
import urllib.request
import futures
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
with futures.ThreadPoolExecutor(50) as executor:
future_list = executor.run_to_futures(
[functools.partial(load_url, url, 30) for url in URLS])
Here is my implementation of threaded map:
from threading import Thread
from queue import Queue
def thread_map(f, iterable, pool=None):
"""
Just like [f(x) for x in iterable] but each f(x) in a separate thread.
:param f: f
:param iterable: iterable
:param pool: thread pool, infinite by default
:return: list if results
"""
res = {}
if pool is None:
def target(arg, num):
try:
res[num] = f(arg)
except:
res[num] = sys.exc_info()
threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)]
else:
class WorkerThread(Thread):
def run(self):
while True:
try:
num, arg = queue.get(block=False)
try:
res[num] = f(arg)
except:
res[num] = sys.exc_info()
except Empty:
break
queue = Queue()
for i, arg in enumerate(iterable):
queue.put((i, arg))
threads = [WorkerThread() for _ in range(pool)]
[t.start() for t in threads]
[t.join() for t in threads]
return [res[i] for i in range(len(res))]
The Python module Queue
might help you. Use one thread that uses Queue.put()
to push all urls into the queue and the worker threads simply get()
the urls one by one.
Python Docs: queue — A synchronized queue class
I'd wrap it up in a function (untested):
import itertools
import threading
import urllib2
import Queue
def openurl(url, queue):
def starter():
try:
result = urllib2.urlopen(url)
except Ecxeption, exc:
def raiser():
raise exc
queue.put((url, raiser))
else:
queue.put((url, lambda:result))
threadind.Thread(target=starter).start()
myurls = ... # the list of urls
myqueue = Queue.Queue()
map(openurl, myurls, itertools.repeat(myqueue))
for each in myurls:
url, getresult = queue.get()
try:
result = getresult()
except Exception, exc:
print 'exception raised:' + str(exc)
else:
# do stuff with result
精彩评论