Hadoop reducer string manipulation doesn't work
Hi Text manipulation in Reduce phase seems not working correctly. I suspect problem could be in my code rather then hadoop itself but you never know... If you can spot any gotchas let me know. I wasted a day trying to figure out what’s wrong with this code.
my sample input file called simple.psv
12345 abc@bbc.com|m|1975
12346 bbc@cde.com|m|1981
my Mapper and reducer code
package simplemapreduce;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
/**
*
* @author
*/
public class Main {
public static class SimpleMap extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text inputs,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
String inputString = inputs.toString();
//System.out.println("CRM Map record:"+inputString);
StringTokenizer tokenizer = new StringTokenizer(inputString);
Text userid = new Text();
if (tokenizer.hasMoreTokens()) {
userid.set(tokenizer.nextToken());
Text data = new Text();
if (tokenizer.hasMoreTokens()) {
data.set(tokenizer.nextToken());
} else {
data.set("");
}
output.collect(userid, data);
}
}
}
/**
* A reducer class that just emits its input.
*/
public static class SimpleReduce extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
while (values.hasNext()) {
Text txt = values.next();
开发者_JAVA技巧 String inputString = "<start>" + txt.toString() + "<end>";
Text out = new Text();
out.set(inputString);
//System.out.println(inputString);
output.collect(key, out);
}
}
}
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: SimpleMapReduce <input path> <output path>");
System.exit(1);
}
JobConf conf = new JobConf(Main.class);
conf.setJobName("Simple Map reducer");
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(SimpleMap.class);
conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setNumReduceTasks(1);
JobClient.runJob(conf);
}
}
my sample launch script called simple.sh
#!/bin/bash
hadoop jar SimpleMapReduce.jar \
/full/path/to/input/simple.tsv /user/joebloggs/output
expected output
12345 <start>abc@bbc.com|m|1975<end>
12346 <start>bbc@cde.com|m|1981<end>
actual output
12345 <start><start>abc@bbc.com|m|1975<end><end>
12346 <start><start>bbc@cde.com|m|1981<end><end>
I tested this on Amazon s3 as well on Linux if you could spot the problem and let me know what it is... that will really save some hair on my head!
The basic flow of data through the system is:
Input -> Map -> Reduce -> output.
As a performance optimization the combiner has been added to allow a computer (one of the many in the hadoop cluster) to do a partial aggregation of the data before it is transmitted to the system where the actual reducer is run.
In the word count example it is fine to start with these values :
1 1 1 1 1 1 1 1 1 1
combine them into
3 4 2 1
and the reduce them into the final result
10
So the combiner is essentially a performance optimization. If you do not specify a combiner it will not change the information going through (i.e. it's an "identity reducer"). So you can only use the SAME class as both the combiner and reducer if the dataset remains valid that way. In your case: that is not true --> your data is now invalid.
You do:
conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);
So this makes the output of your mapper go through your reducer twice. The first one adds: "start" & "end" The second one adds "start" & "end" again.
Simple solution:
// conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);
HTH
I had a problem wherein the reducer wont get all the data sent by the mapper. The reducer would only get upto the specific portion output.collect will emit. For Eg. for the Input Data:
12345 abc@bbc.com|m|1975
12346 bbc@cde.com|m|1981
if I say
output.collect(key,mail_id);
Then it will not get the next two fields - sex and year of birth.
// conf.setCombinerClass(SimpleReduce.class);
Solved the problem.
精彩评论