Fast way to read interleaved data?
I've got a file containing several channels of data. The file is sampled at a base rate, and each channel is sampled at that base rate divided by some number -- it seems to always be a power of 2, though I don't think that's important.
So, if I have channels a, b, and c, sampled at divders of 1, 2, and 4, my stream will look like:
a0 b0 c0 a1 a2 b1 a3 a4 b2 c1 a5 ...
For added fun, the channels can independently be floats or ints (though I know for each one), and the data stream does not necessarily end on a power of 2: the example stream would be valid without further extension. The values are sometimes big and sometimes little-endian, though I know what I'm dealing with up-front.
I've got code that properly unpacks these and fills numpy arrays with the correct values, but it's slow: it looks something like (hope I'm not glossing over too much; just giving an idea of the algorithm):
for sample_num in range(total_samples):
channels_to_sample = [ch for ch in all_channels if ch.samples_for(sample_num)]
format_str = ... # build format string from channels_to_sample
data = struct.unpack( my_file.read( ... ) ) # read and unpack the data
# iterate over data tuple and put values in chann开发者_运维问答els_to_sample
for val, ch in zip(data, channels_to_sample):
ch.data[sample_num / ch.divider] = val
And it's slow -- a few seconds to read a 20MB file on my laptop. Profiler tells me I'm spending a bunch of time in Channel#samples_for()
-- which makes sense; there's a bit of conditional logic there.
My brain feels like there's a way to do this in one fell swoop instead of nesting loops -- maybe using indexing tricks to read the bytes I want into each array? The idea of building one massive, insane format string also seems like a questionable road to go down.
Update
Thanks to those who responded. For what it's worth, the numpy indexing trick reduced the time required to read my test data from about 10 second to about 0.2 seconds, for a speedup of 50x.
The best way to really improve the performance is to get rid of the Python loop over all samples and let NumPy do this loop in compiled C code. This is a bit tricky to achieve, but it is possible.
First, you need a bit of preparation. As pointed out by Justin Peel, the pattern in which the samples are arranged repeats after some number of steps. If d_1, ..., d_k are the divisors for your k data streams and b_1, ..., b_k are the sample sizes of the streams in bytes, and lcm is the least common multiple of these divisors, then
N = lcm*sum(b_1/d_1+...+b_k/d_k)
will be the number of bytes which the pattern of streams will repeat after. If you have figured out which stream each of the first N bytes belongs to, you can simply repeat this pattern.
You can now build the array of stream indices for the first N bytes by something similar to
stream_index = []
for sample_num in range(lcm):
stream_index += [i for i, ch in enumerate(all_channels)
if ch.samples_for(sample_num)]
repeat_count = [b[i] for i in stream_index]
stream_index = numpy.array(stream_index).repeat(repeat_count)
Here, d
is the sequence d_1, ..., d_k and b
is the sequence b_1, ..., b_k.
Now you can do
data = numpy.fromfile(my_file, dtype=numpy.uint8).reshape(-1, N)
streams = [data[:,stream_index == i].ravel() for i in range(k)]
You possibly need to pad the data a bit at the end to make the reshape()
work.
Now you have all the bytes belonging to each stream in separate NumPy arrays. You can reinterpret the data by simply assigning to the dtype
attribute of each stream. If you want the first stream to be intepreted as big endian integers, simply write
streams[0].dtype = ">i"
This won't change the data in the array in any way, just the way it is interpreted.
This may look a bit cryptic, but should be much better performance-wise.
Replace channel.samples_for(sample_num)
with a iter_channels(channels_config)
iterator that keeps some internal state and lets you read the file in one pass. Use it like this:
for (chan, sample_data) in izip(iter_channels(), data):
decoded_data = chan.decode(sample_data)
To implement the iterator, think of a base clock with a period of one. The periods of the various channels are integers. Iterate the channels in order, and emit a channel if the clock modulo its period is zero.
for i in itertools.count():
for chan in channels:
if i % chan.period == 0:
yield chan
The grouper()
recipe along with itertools.izip()
should be of some help here.
精彩评论