开发者

Is there a multithreaded map() function? [closed]

Closed. This question needs to be more focused. It is not currently accepting answers.

Want to improve this question? Update the question so it focuses on one problem only by editing this post.

Closed 3 years ago.

Improve this question

I have a function that is side-effect free. I would like to run it for every element in an array and return an array 开发者_JAVA技巧with all of the results.

Does Python have something to generate all of the values?


Try the Pool.map function from multiprocessing:

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

It's not multithreaded per-se, but that's actually good since multithreading is severely crippled in Python by the GIL.


Try concurrent.futures.ThreadPoolExecutor.map in Python Standard Library (New in version 3.2).

Similar to map(func, *iterables) except:

  • the iterables are collected immediately rather than lazily;
  • func is executed asynchronously and several calls to func may be made concurrently.

A simple example (modified from ThreadPoolExecutor Example):

import concurrent.futures
import urllib.request

URLS = [
  'http://www.foxnews.com/',
  'http://www.cnn.com/',
  'http://europe.wsj.com/',
  'http://www.bbc.co.uk/',
]

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    # Do something here
    # For example
    with urllib.request.urlopen(url, timeout=timeout) as conn:
      try:
        data = conn.read()
      except Exception as e:
        # You may need a better error handler.
        return b''
      else:
        return data

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    # map
    l = list(executor.map(lambda url: load_url(url, 60), URLS))

print('Done.')


Python now has the concurrent.futures module, which is the simplest way of getting map to work with either multiple threads or multiple processes.

https://docs.python.org/3/library/concurrent.futures.html


You can use the multiprocessing python package (http://docs.python.org/library/multiprocessing.html). The cloud python package, available from PiCloud (http://www.picloud.com), offers a multi-processing map() function as well, which can offload your map to the cloud.


Below is my map_parallel function. It works just like map, except it can run each element in parallel in a separate thread (but see note below). This answer builds upon another SO answer.

import threading
import logging
def map_parallel(f, iter, max_parallel = 10):
    """Just like map(f, iter) but each is done in a separate thread."""
    # Put all of the items in the queue, keep track of order.
    from queue import Queue, Empty
    total_items = 0
    queue = Queue()
    for i, arg in enumerate(iter):
        queue.put((i, arg))
        total_items += 1
    # No point in creating more thread objects than necessary.
    if max_parallel > total_items:
        max_parallel = total_items

    # The worker thread.
    res = {}
    errors = {}
    class Worker(threading.Thread):
        def run(self):
            while not errors:
                try:
                    num, arg = queue.get(block = False)
                    try:
                        res[num] = f(arg)
                    except Exception as e:
                        errors[num] = sys.exc_info()
                except Empty:
                    break

    # Create the threads.
    threads = [Worker() for _ in range(max_parallel)]
    # Start the threads.
    [t.start() for t in threads]
    # Wait for the threads to finish.
    [t.join() for t in threads]

    if errors:
        if len(errors) > 1:
            logging.warning("map_parallel multiple errors: %d:\n%s"%(
                len(errors), errors))
        # Just raise the first one.
        item_i = min(errors.keys())
        type, value, tb = errors[item_i]
        # Print the original traceback
        logging.info("map_parallel exception on item %s/%s:\n%s"%(
            item_i, total_items, "\n".join(traceback.format_tb(tb))))
        raise value
    return [res[i] for i in range(len(res))]

NOTE: One thing to be careful of is Exceptions. Like normal map, the above function raises an exception if one of it's sub-thread raises an exception, and will stop iteration. However, due to the parallel nature, there's no guarantee that the earliest element will raise the first exception.


Maybe try the Unladen Swallow Python 3 implementation? That might be a major project, and not guaranteed to be stable, but if you're inclined it could work. Then list or set comprehensions seem like the proper functional structure to use.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜