开发者

How can I get an integer index for a key in hadoop?

Intuitively, hadoop is doing something like this to distribute keys to mappers, using python-esque pseudocode.

# data is a dict with many key-value pairs
keys = data.keys()
key_set_size = len(keys) / num_mappers
index = 0
mapper_keys = []
for i in range(num_mappers):
  end_index = index + key_set_size
  send_to_mapper(keys[int(index):int(end_index)], i)
  index = end_index
# And something vaguely similar for the reducer (but not exactly).

It seems like somewhere hadoop knows 开发者_Go百科the index of each key it is passing around, since it distributes them evenly among the mappers (or reducers). My question is: how can I access this index? I'm looking for a range of integers [0, n) mapping to all my n keys; this is what I mean by an "index".

I'm interested in the ability to get the index from within either the mapper or reducer.


After doing more research on this question, I don't believe it is possible to do exactly what I want. Hadoop does not seem to have such an index that is user-visible after all, although it does try to distribute work evenly among the mappers (so such an index is theoretically possible).


Actually, your reducer (each individual one) gets an array of items back that correspond to the reduce key. So do you want the offset of items within the reduce key in your reducer, or do you want the overall offset of the particular item in the global array of all lines being processed? To get an indeex in your mapper, you can simply prepend a line number to each line of the file before the file gets to the mapper. This will tell you the "global index". However keep in mind that with 1 000 000 items, item 662 345 could be processed before item 10 000.


If you are using the new MR API then the org.apache.hadoop.mapreduce.lib.partition.HashPartitioner is the default partitioner or else org.apache.hadoop.mapred.lib.HashPartitioner is the default partitioner. You can call the getPartition() on either of the HashPartitioner to get the partition number for the key (which you mentioned as index).

Note that the HashPartitioner class is only used to distribute the keys to the Reducer. When it comes to a mapper, each input split is processed by a map task and the keys are not distributed.

Here is the code from HashPartitioner for the getPartition(). You can write a simple Java program for the same.

public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

Edit: Including another way to get the index.

The following code from should also work. To be included in the map or the reduce function.

public void configure(JobConf job) {
partition = job.getInt( "mapred.task.partition", 0);
}

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜