is this architecture possible in Hadoop MR?
Is the following architecture possible in Hadoop MapReduce?
A distributed key-value store is used (HBase). So along with values, there would be a timestamp associated with the values. Map & Reduce tasks are executed iteratively. Map, 开发者_开发百科in each iteration should take in values which were added in the previous iteration to the store (perhaps the ones with latest timestamp?). Reduce should take in Map's output as well as the pairs from the store whose key(s) match the key(s) that reduce has to process in the current iteration. The output of reduce goes to the store.
If this is possible, which classes (eg: InputFormat, run() of Reduce) should be extended so that instead of the regular operation the above operation takes place. If this is not possible, are there any alternatives to achieve the same?
So your "store" in iteration n-1 could be this:
key (timestamp | value)
a 1|x, b 2|x, c 3|x, d 4|x
In iteration n these pairs where added: ... b 5|x, d 6|x
The mapper would find these 2 records, because timestamp > 4 and put it into the intermediate results
The reducer now would find, that for these two records, there are another two matching records in the n-1 store: b 2|x, d 4|x
So the ouput of the reduce phase would be (regardless of the order): b 5|x, d 6|x, b 2|x, d 4|x
Is that what you want?
So if I really understood you right, I would design it like the following: You would use an IdentityMapper, no special logic needed here.
I would bring the key and the timestamp together as the key for the pair. Leaving your value as the value of the pair:
- HadoopKey = {key|timestamp}
- HadoopValue = {value)
You now have to implement your own comparator, so that pairs with the same original key, but a different timestamp will be recognized as having the same key and therefore go together. (GroupingComparator)
And it is important, that pairs for the reducer are ordered by timestamp, descending. (KeyComparator)
Have a look at the
- RawComparator class,
- Jobconf's setOutputValueGroupingComparator() &
- setOutputKeyComparatorClass() method
- and "Hadoop - The definitive guide", Chapter 4, page 100
- or just ask, if you need assistance ;-)
The Reducer will receive all pairs with the same key - oops, a spoiler..., these should have been sorted by timestamp now. If the first - and youngest - timestamp is eligible for this iteration, then all key-value-pairs for this reducer-call are emitted. If the timestamp disqualifies, then no pairs with this key are emitted.
I think this should do it.
精彩评论