Hadoop MapReduce with a recursive Map
I need to do a MapReduce application in Java, that need to be auto-recursive, that means for each line of input file processed it must check all the lines of the input/Map entries for a condition, verified by a function. Or, by other words, Reducer should call/read the all Map for each pair (key, value) received.
What would be the best way of implement this on Hadoop framework?
I can do this programmatically by reading the input n times or load the input into an hashmap, but i think it may be possible to do it all in the MapReduce paradigm. Thanks for any help/tip! EDIT: More details, I have (as a result of other jobs) a list of partition of开发者_运维技巧 the space of problem with (index,counts) and want to make as output (index, sumOfNearestNeighborsCounts), so for each index, i want to access the map again and for each NearestNeighbor index sum the counts of occurrences. (See also Costi Ciudatu comment)For each index key, you need to emit ALL the possible neighbour indices (that you should be able to produce mathematically).
So, let's take a simple (linear) example. You have a 1-dimensional space with {I1, I2, I3, I4}
. Neighbour will simply mean "previous or next element": I1 is neighbor to I2 but not to I3.
For every index coming to the mapper, emit one key for each possible neighbour of that index (including itself ! -- we'll define that every index is a possible neighbour of itself but with a special and absurd negative value for count, I'll explain why):
<I1, count(I1)> -> <I0, count(I1)>
-> <I1, -1>
-> <I2, count(I1)>
<I2, count(I2)> -> <I1, count(I2)>
-> <I2, -1>
-> <I3, count(I2)>
Now in the reducer you will get the following values for each key:
I0: [ count(I1) ]
I1: [ count(I2), -1 ]
I2: [ count(I1), -1, count(I3) ]
...
In your reducer, iterate all the values of the neighbours like this:
boolean doesExist = false;
int sum = 0;
for (IntWritable value : values) {
int count = value.get();
if (count < 0) {
doesExist = true;
} else {
sum += count;
}
}
if (doesExist) {
context.write(key, new IntWritable(sum));
}
This way you will exclude (in the above example) I0 and I4, which do not exist and they will have no negative value in their lists.
Now, to get closer to your use case, if you need the actual index values also during the iteration (and not only the the counts for all the neighbours), you can do the following:
Instead of emitting simple numbers from the mapper, output some wrapper beans containing both the index and its count. This way you'll be able to exclude some neighbours based on some business constraints or whatever, but you'll always work with only the list of (possible) neighbours for every given index, not with the whole input set:
<I1, count(I1)> -> <I0, {I1, count(I1)}>
-> <I1, {I1, count(I1)}>
-> <I2, {I1, count(I1)}>
... and so on
Now, in the reducer you will get:
I0: [ {I1, count(I1)} ]
I1: [ {I1, count(I1)}, {I2, count(I2)} ]
I2: [ {I1, count(I1)}, {I2, count(I2)}, {I3, count(I3)} ]
As you can notice, you don't need the artificial -1
count any more, as for the doesExist
test you can now check if any wrapper bean in the values list has the same index as the key index.
Even if the number of possible neighbours grows exponentially with the number of dimensions (as you already mentioned), I would say this approach would still perform far better than reading the whole input for every key/value pair and it's a lot better suited in the map/reduce paradigm.
In your map phase, output a key for each neighbor, and then sum in the reduce. Pseudocode:
function map(index, count):
for neighbor in neighbors(index):
emit(neighbor, count)
function reduce(index, counts):
total = sum(counts)
emit(index, total)
It isn't "recursive", but it should solve your specific problem if I understand it correctly.
精彩评论