why in python map() and multiprocessing.Pool.map() got different answers?
I had a strange problem. I have a file of the format:
START
1
2
STOP
lllllllll
START
3
5
6
STOP
and I want to read the lines between START
and STOP
as blocks, and use my_f
to process each block.
def block_generator(file):
with open(file) as lines:
for line in lines:
if line == 'START':
block=itertools.takewhile(lambda x:x!='STOP',lines)
yield block
and in my main function I tried to use map()
to get the work done. It worked.
blocks=block_generator(file)
map(my_f,blocks)
will actually give me what I want. But when I tried the same thing with multiprocessing.Pool.map()
, it gave me an error said takewhile() wanted to take 2 arguments, was given 0.
blocks=block_generator(file)
p=multiprocessing.Pool(4)
p.map(my_f,blocks)
Is this a bug?
- The file have more than 1000000 blocks, each has less than 100 lines.
- I accept the answer form untubu.
- But maybe I will simple spl开发者_StackOverflowit the file and use n instance of my original script without multiprocessing to processing them then cat the results together. This way you can never be wrong as long as the script works on a small file.
How about:
import itertools
def grouper(n, iterable, fillvalue=None):
# Source: http://docs.python.org/library/itertools.html#recipes
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)
def block_generator(file):
with open(file) as lines:
for line in lines:
if line == 'START':
block=list(itertools.takewhile(lambda x:x!='STOP',lines))
yield block
blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
p.map(my_f,chunk)
Using grouper
will limit the amount of the file consumed by p.map
. Thus the whole file need not be read into memory (fed into the task queue) at once.
I claim above that when you call p.map(func,iterator)
, the entire iterator is consumed immediatedly to fill a task queue. The pool workers then get tasks from the queue and work on the jobs concurrently.
If you look inside pool.py and trace through the definitions, you will see
the _handle_tasks
thread gets items from self._taskqueue
, and enumerates that at once:
for i, task in enumerate(taskseq):
...
put(task)
The conclusion is, the iterator passed to p.map
gets consumed at once. There is no waiting for the one task to end before the next task is gotten from the queue.
As further corroboration, if you run this:
demonstration code:
import multiprocessing as mp
import time
import logging
def foo(x):
time.sleep(1)
return x*x
def blocks():
for x in range(1000):
if x%100==0:
logger.info('Got here')
yield x
logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG)
pool=mp.Pool()
print pool.map(foo, blocks())
You will see the Got here
message printed 10 times almost immediately, and then a long pause due to the time.sleep(1)
call in foo
. This manifestly shows the iterator is fully consumed long before the pool processes gets around to finishing the tasks.
Basically, when you iterate over a file like you are, each time you read a new line from the file you move the file pointer ahead one line.
So, when you do
block=itertools.takewhile(lambda x:x!='STOP',lines)
every time the iterator returned by takewhile
gets a new item from lines
, it moves the file pointer.
It's generally bad to advance an iterator you're already looping over in the for
loop. However, the for
loop is suspended temporarily on every yield
, and map
exhausts the takewhile
before continuing the for
loop, so you get the desired behavior.
When you have the for
loop and the takewhile
s running at the same time, the file pointer rapidly gets moved to the end, and you get an error.
Try this instead, it should be faster than wrapping the takewhile
in a list
:
from contextlib import closing
from itertools import repeat
def block_generator(filename):
with open(filename) as infile:
for pos in (infile.tell() for line in infile if line == 'START'):
yield pos
def my_f_wrapper(pos, filename):
with open(filename) as infile:
infile.seek(pos)
block=itertools.takewhile(lambda x:x!='STOP', infile)
my_f(block)
blocks = block_generator(filename)
p.imap(my_f_wrapper, blocks, repeat(filename))
Basically, you want each my_f
to be operating independently on the file, so you need to open the file independently for each one.
I can't think of a way that doesn't require the file to be iterated over twice, once by the for
loop and once by the takewhile
s all put together, while still processing the file in parallel. In your original version, the takewhile
s advanced the file pointer for the for
loop, so it was very efficient.
If you weren't iterating over lines, but just bytes, I'd recommend using mmap for this, but it would make things a lot more complicated if you're working with lines of text.
Edit: An alternative would be to have block_generator
go through the file and find all the positions of START
and STOP
, then feed them in pairs to the wrapper. That way, the wrapper wouldn't have to compare the lines to STOP
, it would just have to use tell()
on the file to make sure it wasn't at STOP
. I'm not sure whether or not this would be faster.
精彩评论