Computing set intersection and set difference of the records of two files with hadoop
Sorry for cross-posting this on the hadoop user mailing list and here, but this is getting an urgent matter for me.
My problem is as follows: I have two input files, and I want to determine
- a) The number of lines which only occur in file 1
- b) The number of lines which only occur in file 2
- c) The number of lines common to both (e.g. in regard to string equality)
Example:
File 1:
a
b
c
File 2:
a
d
Desired output for each case:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
Basically my approach is as follows: I wrote my own LineRecordReader, so that the mapper receives a pair consisting of the line (text) and a byte indicating the source file (either 0 or 1). The mapper only returns the pair again so actually it does nothing. However, the side eff开发者_高级运维ect is, that the combiner receives a
Map<Line, Iterable<SourceId>>
(where SourceId is either 0 or 1).
Now, for each line I can get the set of sources it appears in. Therefore, I could write a combiner that counts for each case (a, b, c) the number of lines (Listing 1)
The combiner then outputs a 'summary' only on cleanup (is that safe?). So this summary looks like:
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
In the reducer I then only sum up the values for these summaries. (So the output of the reducer looks just as that of the combiner).
However, the main problem is, that I need to treat both source files as a single virtual file which yield records of the form (line, sourceId) // sourceId either 0 or 1
And I am not sure how to achieve that. So the question is whether I can avoid preprocessing and merging the files beforehand, and do that on-the-fly with a something like a virtually-merged-file-reader and custom record reader. Any code example is much appreciated.
Best regards, Claus
Listing 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}
Okay, I must admit that I didn't really catch the gist of what you've tried so far, but I have a simple approach to do the stuff you may need.
Have a look at the filemapper. This one is going to get the filename and submit it with each line of the input.
public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {
static Text fileName;
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, fileName);
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String name = ((FileSplit) context.getInputSplit()).getPath().getName();
fileName = new Text(name);
}
}
Now we have a bunch of key / values that look like this (in regard to your example)
a File 1
b File 1
c File 1
a File 2
d File 2
Obviously reducing them will get you an input like this:
a File 1,File 2
b File 1
c File 1
d File 2
What you need to do in your reducer could look like this:
public class FileReducer extends Reducer<Text, Text, Text, Text> {
enum Counter {
LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
HashSet<String> set = new HashSet<String>();
for (Text t : values) {
set.add(t.toString());
}
// if we have only two files and we have just two records in our hashset
// the line is contained in both files
if (set.size() == 2) {
context.getCounter(Counter.LINES_IN_COMMON).increment(1);
} else {
// sorry this is a bit dirty...
String t = set.iterator().next();
// determine which file it was by checking for the name:
if(t.toString().equals("YOUR_FIRST_FILE_NAME")){
context.getCounter(Counter.LINES_IN_FIRST).increment(1);
} else {
context.getCounter(Counter.LINES_IN_SECOND).increment(1);
}
}
}
}
You have to replace the string inside the if statement to your filenames.
I think that using the job counter is a bit clearer than using own primitives and writing them to context in cleanup. You can retrieve the counters for a job by calling this stuff after the completion:
Job job = new Job(new Configuration());
//setup stuff etc omitted..
job.waitForCompletion(true);
// do the same line with the other enums
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue();
Never the less, if you need the numbers of the lines in common etc in your HDFS, then go for your solution.
Hope that helped you.
精彩评论