开发者

Hadoop - How to get the month with the maximum flu index across the Google Flu Trends dataset?

I am trying to write a simple Map Reduce program using Hadoop which will give me the month which is most prone to flu. I am using the google flu trends dataset which can be found here http://www.google.org/flutrends/data.txt.

I have written both the Mapper and the reducer as shown below

public class MaxFluPerMonthMapper extends Mapper<开发者_JAVA百科;LongWritable, Text, IntWritable, IntWritable> {

private static final Log LOG =
        LogFactory.getLog(MaxFluPerMonthMapper.class);

@Override
protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
    String row = value.toString();
    LOG.debug("Received row " + row);
    List<String> columns = Arrays.asList(row.split(","));
    String date = columns.get(0);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    int month = 0;
    try {
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(sdf.parse(date));
        month = calendar.get(Calendar.MONTH);
    } catch (ParseException e) {
        e.printStackTrace();
    }
    for (int i = 1; i < columns.size(); i++) {
        String fluIndex = columns.get(i);
        if (StringUtils.isNotBlank(fluIndex) && StringUtils.isNumeric(fluIndex)) {
            LOG.info("Writing key " + month + " and value " + fluIndex);
            context.write(new IntWritable(month), new IntWritable(Integer.valueOf(fluIndex)));
        }
    }
}

}

Reducer

public class MaxFluPerMonthReducer extends Reducer<IntWritable, IntWritable, Text, IntWritable> {
private static final Log LOG =
        LogFactory.getLog(MaxFluPerMonthReducer.class);

@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
                    throws IOException, InterruptedException {
    LOG.info("Received key " + key.get());
    int sum = 0;
    for (IntWritable intWritable : values) {
        sum += intWritable.get();
    }
    int month = key.get();
    String monthString = new DateFormatSymbols().getMonths()[month];
    context.write(new Text(monthString), new IntWritable(sum));
}

}

With these Mapper and Reducer shown above I am getting the following output

January 545419 February 528022 March 436348 April 336759 May 346482 June 309795 July 312966 August 307346 September 322359 October 428346 November 461195 December 480078

What I want is just a single output giving me January 545419 How can I achieve this? by storing state in reducer or there is anyother solution to it? or my mapper and reducer are wrong for the question I am asking on this dataset?


The problem is that the Reducer has no idea about the other keys (by design). It would be possible to set up another Reducer to find the maximum value given all the data from your current reducer. However, this is overkill since you know you will only have 12 records you have to process, and setting up another Reducer will have more overhead than just running a serial script.

I would suggest writing some other script to process your text output.


You can add one more MapReduce step. Mapper something like this:

public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
    // emit only first row
    if (key == 0)    
    {
        String row = value.toString();
        String[] values = row.split("\t");
        context.write(new Text(values[0]), new Text(values[1]));
    }

    }
}

Reducer must emit all it's input (which will be only one record) directly to output. Number of mappers and reducers should be set to one. If your MapReduce job uses more then one reducer you should use one another intermediate MapReduce step to combine results to one file after your MapReduce job. But this way seems not very effective.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜