开发者

merge output files after reduce phase

In mapreduce each reduce task write its output to a file named part-r-nnnnn 开发者_Go百科where nnnnn is a partition ID associated with the reduce task. Does map/reduce merge these files? If yes, how?


Instead of doing the file merging on your own, you can delegate the entire merging of the reduce output files by calling:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

Note This combines the HDFS files locally. Make sure you have enough disk space before running


No, these files are not merged by Hadoop. The number of files you get is the same as the number of reduce tasks.

If you need that as input for a next job then don't worry about having separate files. Simply specify the entire directory as input for the next job.

If you do need the data outside of the cluster then I usually merge them at the receiving end when pulling the data off the cluster.

I.e. something like this:

hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt


That's the function you can use to Merge Files in HDFS

public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException {
    FileSystem fs = FileSystem.get(config);
    Path srcPath = new Path(src);
    Path dstPath = new Path(dest);

    // Check if the path already exists
    if (!(fs.exists(srcPath))) {
        logger.info("Path " + src + " does not exists!");
        return false;
    }

    if (!(fs.exists(dstPath))) {
        logger.info("Path " + dest + " does not exists!");
        return false;
    }
    return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null);
}


For text files only and HDFS as both the source and destination, use the below command:

hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file

This will concatenate all the files in input_hdfs_dir and will write the output back to HDFS at output_hdfs_file. Do keep in mind that all the data will be brought back to the local system and then again uploaded to hdfs, although no temporary files are created and this happens on the fly using UNIX pe.

Also, this won't work with non-text files such as Avro, ORC etc.

For binary files, you could do something like this (if you have Hive tables mapped on the directories):

insert overwrite table tbl select * from tbl

Depending on your configuration, this could also create more than files. To create a single file, either set the number of reducers to 1 explicitly using mapreduce.job.reduces=1 or set the hive property as hive.merge.mapredfiles=true.


The part-r-nnnnn files are generated after the reduce phase designated by 'r' in between. Now the fact is if you have one reducer running, you will have an output file like part-r-00000. If the number of reducers are 2 then you're going to have part-r-00000 and part-r-00001 and so on. Look, if the output file is too large to fit into the machine memory since the hadoop framework has been designed to run on Commodity Machines, then the file gets splitted. As per the MRv1, you have a limit of 20 reducers to work on your logic. You may have more but the same needs to be customised in the configuration files mapred-site.xml. Talking about your question; you may either use getmerge or you may set the number of reducers to 1 by embedding the following statement to the driver code

job.setNumReduceTasks(1);

Hope this answers your question.


You can run an additional map/reduce task, where map and reduce don't change the data, and partitioner assigns all data to a single reducer.


Besides my previous answer I have one more answer for you which I was trying few minutes ago. You may use CustomOutputFormat which looks like the code given below

public class VictorOutputFormat extends FileOutputFormat<StudentKey,PassValue> {

    @Override
    public RecordWriter<StudentKey,PassValue> getRecordWriter(
            TaskAttemptContext tac) throws IOException, InterruptedException {
        //step 1: GET THE CURRENT PATH
        Path currPath=FileOutputFormat.getOutputPath(tac);

        //Create the full path
        Path fullPath=new Path(currPath,"Aniruddha.txt");

        //create the file in the file system
        FileSystem fs=currPath.getFileSystem(tac.getConfiguration());
        FSDataOutputStream fileOut=fs.create(fullPath,tac);
        return new VictorRecordWriter(fileOut);
    }

}

Just, have a look at the fourth line from the last. I have used my own name as the output file name and I have tested the program with 15 reducers. Still the File remains the same. So getting a single out file instead of two or more is possible yet to be very clear the size of the output file must not exceed the size of the primary memory i.e. the output file must fit into the memory of the commodity machine else there might be a problem with the output file split. Thanks!!


Why not use a pig script like this one for merging partition files:

stuff = load "/path/to/dir/*"

store stuff into "/path/to/mergedir"


If the files have header, you can get rid of it of by doing this:

hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv

then add the header manually for output.csv


. Does map/reduce merge these files?

No. It does not merge.

You can use IdentityReducer to achieve your goal.

Performs no reduction, writing all input values directly to the output.

public void reduce(K key,
                   Iterator<V> values,
                   OutputCollector<K,V> output,
                   Reporter reporter)
            throws IOException

Writes all keys and values directly to output.

Have a look at related SE posts:

hadoop: difference between 0 reducer and identity reducer?

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜