开发者

Configure Map Side join for multiple mappers in Hadoop Map/Reduce

I have a question about configuring Map/Side inner join for multiple mappers in Hadoop. Suppose I have two very large data sets A and B, I use the same partition and sort algorithm to split them into smaller parts. For A, assume I have a(1) to a(10), and for B I have b(1) to b(10). It is assured that a(1) and b(1) contain the same keys, a(2) and b(2) have the same keys, and so on. I would like to setup 10 mappers, specifically, mapper(1) to mapper(10). To my understanding, Map/Side join is a pre-processing task prior to the mapper, therefore, I would like to join a(1) and b(1) for mapper(1), to join a(2) and b(2) for mapper(2), and so on.

After reading some reference materials, it is still not clear to me how to configure these ten mappers. I understand that using CompositeInputFormat I would be able to join two files, but it seems only configuring one mapper and joining the 20 files pair after pair (in 10 sequential tasks). How to configure al开发者_开发技巧l these ten mappers and join ten pairs at the same time in a genuine Map/Reduce (10 tasks in parallel)? To my understanding, ten mappers would require ten CompositeInputFormat settings because the files to join are all different. I strongly believe this is practical and doable, but I couldn't figure out what exact commands I should use.

Any hint and suggestion is highly welcome and appreciated.

Shi


Thanks a lot for the replies, David and Thomas!

I appreciate your emphasis about the pre-requirements on Map-side Join. Yes, I am aware about the sort, API, etc. After reading your comments, I think my actual problem is what is the correct expression for joining multiple splits of two files in CompositeInputFormat. For example, I have dataA and dataB sorted and reduced in 2 files respectively:

/A/dataA-r-00000

/A/dataA-r-00001

/B/dataB-r-00000

/B/dataB-r-00001

The expression command I am using now is:

inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/A/dataA-r-00000"),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/B/dataB-r-00000"))

It works but as you mentioned, it only starts two mappers (because the inner join prevents from splitting) and could be very inefficient if the files are big. If I want to use more mappers (say another 2 mappers to join dataA-r-00001 and dataB-r-00001), how should I construct the expression, is it something like:

String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00000'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00000'), tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00001'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00001'))" ;

But I think that could be mistaken, because the command above actually perform inner join of four files (which will result in nothing in my case because file *r-00000 and *r-00001 have non-overlapping keys).

Or I could just use the two dirs as inputs, like:

String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/'))" ;

The inner join will match the pairs automatically according to the file endings, say "00000" to "00000", "00001" to "00001"? I am stuck at this point because I need to construct the expression and pass it to

conf.set("mapred.join.expr", joinexpression);

So in one word, how should I build the proper expression if I want to use more mappers to join multiple pairs of files simultaneously?


There are map- and reduce side joins. You proposed to use a map side join, which is executed inside a mapper and not before it. Both sides must have the same key and value types. So you can't join a LongWritable and a Text, although they might have the same value.

There are subtle more things to note:

  • input files have to be sorted, so it has likely to be a reducer output
  • You can control the number of mappers in your join-map-phase by setting the number of reducers in the job that should've sorted the datasets

The whole procedure basically works like this: You have dataset A and dataset B, both share the same key, let's say LongWritable.

  1. Run two jobs that sort the two datasetsby their keys, both jobs HAVE TO set the number of reducers to an equal number, say 2.
  2. this will result in 2 sorted files for each dataset
  3. now you setup your job that joins the datasets, this job will spawn with 2 mappers. It could be more if you're setting the reduce numbers higher in the previous job.
  4. do whatever you like in the reduce step.

If the number of the files to be joined is not equal it will result in an exception during job setup.

Setting up a join is kind of painful, mainly because you have to use the old API for mapper and reducer if your version is less than 0.21.x.

This document describes very well how it works. Scroll all the way to the bottom, sadly this documentation is somehow missing in the latest Hadoop docs.

Another good reference is "Hadoop the Definitive Guide", which explains all of this in more detail and with examples.


I think you're missing the point. You don't control the number of mappers. It's the number of reducers that you have control over. Simply emit the correct keys from your mapper. Then run 10 reducers.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜