Is there a multithreaded map() function? [closed]
Want to improve this question? Update the question so it focuses on one problem only by editing this post.
Closed 3 years ago.
Improve this questionI have a function that is side-effect free. I would like to run it for every element in an array and return an array 开发者_JAVA技巧with all of the results.
Does Python have something to generate all of the values?
Try the Pool.map function from multiprocessing:
http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers
It's not multithreaded per-se, but that's actually good since multithreading is severely crippled in Python by the GIL.
Try concurrent.futures.ThreadPoolExecutor.map in Python Standard Library (New in version 3.2).
Similar to map(func, *iterables) except:
- the iterables are collected immediately rather than lazily;
- func is executed asynchronously and several calls to func may be made concurrently.
A simple example (modified from ThreadPoolExecutor Example):
import concurrent.futures
import urllib.request
URLS = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
]
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
# Do something here
# For example
with urllib.request.urlopen(url, timeout=timeout) as conn:
try:
data = conn.read()
except Exception as e:
# You may need a better error handler.
return b''
else:
return data
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
# map
l = list(executor.map(lambda url: load_url(url, 60), URLS))
print('Done.')
Python now has the concurrent.futures module, which is the simplest way of getting map to work with either multiple threads or multiple processes.
https://docs.python.org/3/library/concurrent.futures.html
You can use the multiprocessing python package (http://docs.python.org/library/multiprocessing.html). The cloud python package, available from PiCloud (http://www.picloud.com), offers a multi-processing map() function as well, which can offload your map to the cloud.
Below is my map_parallel
function. It works just like map
, except it can run each element in parallel in a separate thread (but see note below). This answer builds upon another SO answer.
import threading
import logging
def map_parallel(f, iter, max_parallel = 10):
"""Just like map(f, iter) but each is done in a separate thread."""
# Put all of the items in the queue, keep track of order.
from queue import Queue, Empty
total_items = 0
queue = Queue()
for i, arg in enumerate(iter):
queue.put((i, arg))
total_items += 1
# No point in creating more thread objects than necessary.
if max_parallel > total_items:
max_parallel = total_items
# The worker thread.
res = {}
errors = {}
class Worker(threading.Thread):
def run(self):
while not errors:
try:
num, arg = queue.get(block = False)
try:
res[num] = f(arg)
except Exception as e:
errors[num] = sys.exc_info()
except Empty:
break
# Create the threads.
threads = [Worker() for _ in range(max_parallel)]
# Start the threads.
[t.start() for t in threads]
# Wait for the threads to finish.
[t.join() for t in threads]
if errors:
if len(errors) > 1:
logging.warning("map_parallel multiple errors: %d:\n%s"%(
len(errors), errors))
# Just raise the first one.
item_i = min(errors.keys())
type, value, tb = errors[item_i]
# Print the original traceback
logging.info("map_parallel exception on item %s/%s:\n%s"%(
item_i, total_items, "\n".join(traceback.format_tb(tb))))
raise value
return [res[i] for i in range(len(res))]
NOTE: One thing to be careful of is Exceptions. Like normal map
, the above function raises an exception if one of it's sub-thread raises an exception, and will stop iteration. However, due to the parallel nature, there's no guarantee that the earliest element will raise the first exception.
Maybe try the Unladen Swallow Python 3 implementation? That might be a major project, and not guaranteed to be stable, but if you're inclined it could work. Then list or set comprehensions seem like the proper functional structure to use.
精彩评论