开发者

How to pick random (small) data samples using Map/Reduce?

I want to write a map/reduce job to select a number of random samples from a large dataset based on a row level condition. I want to minimize the number of intermediate keys.

Pseud开发者_开发问答ocode:

for each row 
  if row matches condition
    put the row.id in the bucket if the bucket is not already large enough

Have you done something like this? Is there any well known algorithm?

A sample containing sequential rows is also good enough.

Thanks.


Mappers: Output all qualifying values, each with a random integer key.

Single reducer: Output the first N values, throwing away the keys.

The sorter will randomize the mapper output order for you. You don't know how many qualifiying values a mapper will find, so each mapper has to output all qualifying values from its partition.

In general, I like to build up simple mapper/reducer tools like this which use as much of the Hadoop machinery as possible; I end up reusing them in different tasks.


Karl's approach works just fine, but we can greatly reduce the amount of data produced by the mappers.

Let K the number of samples you want. We'll assume that this is small enough to hold in memory on one of your nodes. We'll assign a random value to each matching row, and then use a modification of the selection algorithm to find the K smallest values.

At the setup part of each mapper, create a priority queue; a Fibonnacci heap is a good choice for this. We'll be using floats as the priorities; if you have a huge amount of data, doubles may be more appropriate to avoid there being ties. For each row that matches your condition, insert that row into the priority queue with a randomly chosen float between 0 and 1 as the priority. If you have more than K things in your queue, remove the highest valued item (this is opposite of the terminology of a standard Fibonnacci heap).

Finally, at the end of the mapper, emit everything in your queue. For each item you emit, use as the key the priority as a FloatWritable and some representation of the corresponding row as the value (the row ID, or perhaps the entire row contents). You will emit only K values per mapper (or less if there were fewer than K matching rows in that mapper).

In your single reducer, Hadoop will automatically scan through the keys in order from lowest to highest. Emit the rows corresponding to the first K keys you see (the K lowest), then quit.

This works because each matching row has the same probability of having one of the K smallest float values. We keep track of the K smallest floats for each mapper to make sure we don't miss any, and then send them to the reducer to find the K smallest overall.


Bkkbrad's approach is perhaps the most efficient in that the number of records sent out from each mapper is (at most) K. On the other hand, note that it assumes that the sample itself (i.e. K elements) fit in the memory of a single reducer.

When this is not the case, you may be tempted to simply use a fully distributed approach where each matching row is assigned by the mapper a random integer in {1,..,K} and then the reduce phase picks one element for each key (also see this question). The problem with this approach though is that it may be the case that by chance no row is assigned to certain keys, in which case the final sample will have less than K elements. Even though this happens with small probability if K is much smaller than the total number of rows N, it will happen with constant probability if K is a constant fraction of N (say when K=N/3).

A solution that works is the following: suppose we have B buckets and generate a random ordering of the elements first by putting each element in a random bucket and then generating a random ordering in each bucket. The elements in the first bucket are considered smaller (with respect to the ordering) than the elements in the second bucket and so on. Then, if we want to pick a sample of size K, we can collect all of the elements in the first j buckets if they overall contain a number of elements t less than K, and then pick the remaining K-t elements from the next bucket. Here B is a parameter such that N/B elements fit into memory. The key aspect is that buckets can be processed in parallel.

Mapper: Output all qualifying rows, each with a random key (j, r), where j is a random integer in {1,..,B} and r is a random float. In addition, keep track of the number of elements with key less than j (for 1<=j<=B) and transmit this information to the reducers.

Shuffle: Partition over j, and secondary sort over r.

Reducer: Consider bucket j and assume the reducer knows how many elements are in buckets less than j and how many in bucket j (by aggregating the info received by the mappers). If the number of elements in buckets less or equal than j is less or equal than K then output all elements in bucket j; if the number of elements with bucket strictly less than j is t < K then run a reservoir sampling to pick K−t random elements from the bucket; in the remaining case, that is when the number of elements in buckets less than j is at least K, don't output anything.

I'm not aware of a simpler solution for this problem, but it would be nice if there was one.

You can find more details here on my blog.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜