开发者

Computing set intersection and set difference of the records of two files with hadoop

Sorry for cross-posting this on the hadoop user mailing list and here, but this is getting an urgent matter for me.

My problem is as follows: I have two input files, and I want to determine

  • a) The number of lines which only occur in file 1
  • b) The number of lines which only occur in file 2
  • c) The number of lines common to both (e.g. in regard to string equality)

Example:

File 1:
a
b
c

File 2:
a
d

Desired output for each case:

lines_only_in_1: 2         (b, c)
lines_only_in_2: 1         (d)
lines_in_both:   1         (a)

Basically my approach is as follows: I wrote my own LineRecordReader, so that the mapper receives a pair consisting of the line (text) and a byte indicating the source file (either 0 or 1). The mapper only returns the pair again so actually it does nothing. However, the side eff开发者_高级运维ect is, that the combiner receives a

Map<Line, Iterable<SourceId>>

(where SourceId is either 0 or 1).

Now, for each line I can get the set of sources it appears in. Therefore, I could write a combiner that counts for each case (a, b, c) the number of lines (Listing 1)

The combiner then outputs a 'summary' only on cleanup (is that safe?). So this summary looks like:

lines_only_in_1   2531
lines_only_in_2   3190
lines_in_both      901

In the reducer I then only sum up the values for these summaries. (So the output of the reducer looks just as that of the combiner).

However, the main problem is, that I need to treat both source files as a single virtual file which yield records of the form (line, sourceId) // sourceId either 0 or 1

And I am not sure how to achieve that. So the question is whether I can avoid preprocessing and merging the files beforehand, and do that on-the-fly with a something like a virtually-merged-file-reader and custom record reader. Any code example is much appreciated.

Best regards, Claus

Listing 1:

public static class SourceCombiner
    extends Reducer<Text, ByteWritable, Text, LongWritable> {

    private long countA = 0;
    private long countB = 0;
    private long countC = 0; // C = lines (c)ommon to both sources

    @Override
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
        Set<Byte> fileIds = new HashSet<Byte>();
        for (ByteWritable val : values) {
            byte fileId = val.get();

            fileIds.add(fileId);
        }

        if(fileIds.contains((byte)0)) { ++countA; }
        if(fileIds.contains((byte)1)) { ++countB; }
        if(fileIds.size() >= 2) { ++countC; }
    }

    protected void cleanup(Context context)
            throws java.io.IOException, java.lang.InterruptedException
    {
        context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
        context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
        context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
    }
}


Okay, I must admit that I didn't really catch the gist of what you've tried so far, but I have a simple approach to do the stuff you may need.

Have a look at the filemapper. This one is going to get the filename and submit it with each line of the input.

    public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {

        static Text fileName;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, fileName);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {

            String name = ((FileSplit) context.getInputSplit()).getPath().getName();
            fileName = new Text(name);
        }
    }

Now we have a bunch of key / values that look like this (in regard to your example)

    a File 1
    b File 1
    c File 1

    a File 2
    d File 2

Obviously reducing them will get you an input like this:

    a File 1,File 2
    b File 1
    c File 1
    d File 2

What you need to do in your reducer could look like this:

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    enum Counter {
        LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        HashSet<String> set = new HashSet<String>();
        for (Text t : values) {
            set.add(t.toString());
        }

        // if we have only two files and we have just two records in our hashset
        // the line is contained in both files
        if (set.size() == 2) {
            context.getCounter(Counter.LINES_IN_COMMON).increment(1);
        } else {
            // sorry this is a bit dirty...
            String t = set.iterator().next();
            // determine which file it was by checking for the name:
            if(t.toString().equals("YOUR_FIRST_FILE_NAME")){
                context.getCounter(Counter.LINES_IN_FIRST).increment(1);
            } else {
                context.getCounter(Counter.LINES_IN_SECOND).increment(1);
            }
        }
    }

}

You have to replace the string inside the if statement to your filenames.

I think that using the job counter is a bit clearer than using own primitives and writing them to context in cleanup. You can retrieve the counters for a job by calling this stuff after the completion:

Job job = new Job(new Configuration());
//setup stuff etc omitted..
job.waitForCompletion(true);
// do the same line with the other enums
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue();

Never the less, if you need the numbers of the lines in common etc in your HDFS, then go for your solution.

Hope that helped you.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜