MapReduce - how do I calculate relative values (average, top k and so)?
I'm looking for a way to calculate "global" or "relative" values during a MapReduce process - an average, sum, top etc. Say I have a list of workers, with their IDs associated with their salaries (and a bunch of other stuff). At some stage of the processing, I'd like t开发者_JAVA技巧o know who are the workers who earn the top 10% of salaries. For that I need some "global" view of the values, which I can't figure out.
If I have all values sent into a single reducer, it has that global view, but then I loose concurrency, and it seems awkward. Is there a better way?
(The framework I'd like to use is Google's, but I'm trying to figure out the technique - no framework specific tricks please)
My first thought is to do something like this:
MAP: Use some dummy value as the key, maybe the empty string for efficiency, and create class that holds both a salary and an employee ID. In each Mapper, create an array that holds 10 elements. Fill it up with the first ten salaries you see, sorted (so location 0 is the highest salary, location 9 is the 10th highest). For every salary after that, see if it is in the top ten and if it is, insert it in the correct location and then move the lower salaries down, as appropriate.
Combiner/Reducer: merge sort the lists. I'd basically do the same thing as in the mapper by creating a ten element array and then loop over all the arrays that match the key, merging them in according to the same comparison/replace/move down sequence as in the mapper
If you run this with one reducer, it should ensure that the top 10 salaries are output.
I don't see a way to do this while using more than one reducer. If you use a combiner, then the reducer should only have to merge a ten-element array for each node that ran mappers (which should be manageable unless you're running on thousands of nodes).
[Edit: I misunderstood. Update for the Top 10% ] For doing something that relates to the "total" there is no other way than determining the total first and then do the calculations.
So the "Top 10% salaries" could be done roughly as follows:
Determine total:
- MAP: Identity
- REDUCE: Aggregate the info of all records that go through and create a new "special" record with the "total". Note that you would like to scale
This can also be done by letting the MAP output 2 records (data, total) and the reducer then only touches the "total" records by aggregating.
Use total:
- MAP: Prepare for SecondarySort
- SHUFFLE/SORT: Sort the records so that the records with the "total" are first into the reducer.
- REDUCE: Depending on your implementation the reducer may get a bunch of these total records (aggregate them) and for all subsequent records determine where they are in relation to everything else.
The biggest question on this kind of processing is: Will is scale?
Remember you are breaking the biggest "must have" for scaling out: Independent chunks of information. This makes them dependent around the "total" value. I expect a technically different way of making the "total" value available to the second step is essential in making this work on "big data".
The "Hadoop - The definitive Guide" book by Tom White has a very good chapter on Secondary Sort.
I would do something like this
The mapper will use an UUID as part of the key, created in the setup() method of the mapper. The mapper emits as key, the UUID appended with either 0 or the salary. The mapper accumulates the count and total.
In the cleanup() method, the mapper emits UUID appended with 0 as the key and the count and total as the value. In the map() method, the mapper emits the UUID appended with salary as the key and salary as the value.
Since the keys are sorted, the first call to combiner will have the count and total as the value. The combiner could store them as class members. We could also find out what 10% of total count is and save that as well as class member (call it top). We initialize a list and save it as a class member.
Subsequent calls to combiner will contain the salary as the value, arriving in sorted order. We add the value to the list and at the same time increment a counter. When the counter reaches the value top, we don't store any more values in our list. We ignore values in rest of the combiner calls.
In the combiner cleanup(), we do the emit. The combiner will emit only the UUID as the key. The value will contain count and total followed by the top 10% of the values. So the output of the combiner will have partial results, based on the subset of the data that passed through the mapper.
The reducer will be called as many times as the number of mappers in this case, because each mapper/combiner emits only one key.
The reducer will accumulate the counts, totals and the top 10% values in the reduce() method. In the cleanup() method, the average is calulated. The top 10% is also calculated in the cleanup() method from the aggregation of top 10% arriving in each call of the reducer. This is basically a merge sort.
The reducer cleanup() method could do multiple emits, so that average is in the first row, followed by the top 10% of salaries in the subsequent rows.
Finally, to ensure that final aggregate statistics are global, you have to set the number of reducers to one.
Since there is data accumulation and sorting in the reducer, although on partial data set, there may be memory issues.
精彩评论