How would you suggest performing "Join" with Hadoop streaming?
I have two files, in the following f开发者_开发问答ormats:
field1, field2, field3
field4, field1, field5
A different field number indicates a different meaning.
I want to join the two files using Hadoop Streaming based on the mutual field (field1
in the above example) so the output will be field1, field2, field3, field4, field5
(other orderings are ok as along as they have all the fields).
Hadoop has a library called KeyFieldBasedPartitioner http://hadoop.apache.org/mapreduce/docs/r0.21.0/api/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.html
Using this as an option in your job launch as the partitioner for your streaming job allows you to break your mapper output into Key/Value pairs and have the keys get hashed up together going to the same reducer and sorting including the values http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#More+Usage+Examples
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D mapreduce.map.output.key.field.separator=. \
-D mapreduce.partition.keypartitioner.options=-k1,2 \
-D mapreduce.job.reduces=12 \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
Here, -D stream.map.output.field.separator=. and -D stream.num.map.output.key.fields=4 are explained here http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Customizing+How+Lines+are+Split+into+Key%2FValue+Pairs basically they are how you have outputted your mapper fields to define the key/value pairs.
The map output keys of the above MapReduce job normally have four fields separated by ".". However, the MapReduce framework will partition the map outputs by the first two fields of the keys using the -D mapreduce.partition.keypartitioner.options=-k1,2 option. Here, -D mapreduce.map.output.key.field.separator=. specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.
This is effectively equivalent to specifying the first two fields as the primary key and the next two fields as the secondary. The primary key is used for partitioning, and the combination of the primary and secondary keys is used for sorting.
In order to-do a join it is as simple as outputting the fields from your mapper and setting the options on your configuration launch for the fields that are the keys and the reducer will have all of your values joined by key appropriately. If you want to take data from multiple sources just keep adding more -input on the command line... if they are different input lengths then in your mapper you can recognize that and create a standard format output from mapper.
精彩评论