开发者

Hadoop: intermediate merge failed

I'm running into a strange issue. When I run my Hadoop job over a large dataset (>1TB compressed text files), several of the reduce tasks fail, with stacktraces like these:

java.io.IOException: Task: attempt_201104061411_0002_r_000044_0 - The red开发者_如何学Gouce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
    at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
    at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
    at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:139)
    at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
    at org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:335)
    at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:350)
    at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:156)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2698)
    ... 1 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:375)
    at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:241)
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:125)
    ... 8 more
java.io.IOException: Task: attempt_201104061411_0002_r_000056_0 - The reduce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
    at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
    at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
    at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:123)
    at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:50)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:447)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:381)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:107)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:93)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2689)
    ... 1 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at org.apache.hadoop.io.Text.readString(Text.java:402)
    at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:240)
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:122)
    ... 9 more

Not all of my reducers fail. Several often succeed before I see failures with others. As you can see, the stacktraces always seem to originate from IPAndIPCookieCount.readFields() and always in the in-memory merge stage, but not always from the same part of readFields.

This job succeeds when running over smaller datasets (about 1/30th the size). There are nearly as many outputs as inputs to the job, but each output record is shorter. This job is essentially an implementation of a secondary sort.

We are using the CDH3 Hadoop distribution.

Here is my custom WritableComparable implementation:

public static class IpAndIpCookieCount implements WritableComparable<IpAndIpCookieCount> {

        private String ip;
        private int ipCookieCount;

        public IpAndIpCookieCount() {
            // empty constructor for hadoop
        }

        public IpAndIpCookieCount(String ip, int ipCookieCount) {
            this.ip = ip;
            this.ipCookieCount = ipCookieCount;
        }

        public String getIp() {
            return ip;
        }

        public int getIpCookieCount() {
            return ipCookieCount;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            ip = Text.readString(in);
            ipCookieCount = in.readInt();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            Text.writeString(out, ip);
            out.writeInt(ipCookieCount);
        }

        @Override
        public int compareTo(IpAndIpCookieCount other) {
            int firstComparison = ip.compareTo(other.getIp());
            if (firstComparison == 0) {
                int otherIpCookieCount = other.getIpCookieCount();
                if (ipCookieCount == otherIpCookieCount) {
                    return 0;
                } else {
                    return ipCookieCount < otherIpCookieCount ? 1 : -1;
                }
            } else {
                return firstComparison;
            }
        }

        @Override
        public boolean equals(Object o) {
            if (o instanceof IpAndIpCookieCount) {
                IpAndIpCookieCount other = (IpAndIpCookieCount) o;
                return ip.equals(other.getIp()) && ipCookieCount == other.getIpCookieCount();
            } else {
                return false;
            }
        }

        @Override
        public int hashCode() {
            return ip.hashCode() ^ ipCookieCount;
        }

    }

The readFields method is very simple, and I can't see any problems in this class. Additionally, I have seen other people getting essentially the same stack trace:

  • http://lucene.472066.n3.nabble.com/Reduce-Copier-Failed-td2120228.html
  • https://groups.google.com/a/cloudera.org/group/cdh-user/browse_thread/thread/3544da912bf66506
  • http://www.listware.net/201010/hadoop-common-user/70382-merging-of-the-local-fs-files-threw-an-exception-javaioioexception-javalangruntimeexception-javaioeofexception.html
  • http://mail-archives.apache.org/mod_mbox/hadoop-mapreduce-user/201101.mbox/%3CSNT135-w58DBCAAC6970BB35B50B9AB7FD0@phx.gbl%3E
  • http://web.archiveorange.com/archive/v/5nvvZTgeqwCRQ3F9vEzI

None seemed to have actually figured out the issue behind this. The last two seem to suggest that this could be a memory issue (although these stacktraces aren't OutOfMemoryExceptions). Like the second to last post in that list of links, I have tried setting the number of reducers higher (up to 999), but I still get failures. I have not (yet) tried to allocate more memory to reduce tasks, as that would require us to reconfigure our cluster.

Is this a bug in Hadoop? Or am I doing something wrong?

EDIT: My data is partitioned by day. If I run the job 7 times, once for each day, all 7 complete. If I run one job over all 7 days, it fails. The large report over all 7 days will see exactly the same keys as the smaller ones do (in aggregate), but obviously not in the same order, at the same reducers, etc.


I think this is an artifact of Cloudera's backport of MAPREDUCE-947 to CDH3. This patch results in the formation of a _SUCCESS file for successful job.

Also a _SUCCESS file is created in the output folder for successful jobs. A configuration parameter mapreduce.fileoutputcommitter.marksuccessfuljobs can be set to false to disable creation of _SUCCESS file, or to true to enable creation of the _SUCCESS file.

Looking at your error,

Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)

and comparing it to errors I've seen for this issue before,

Exception in thread "main" java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at java.io.DataInputStream.readFully(DataInputStream.java:152)
    at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1465)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1437)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1424)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1419)
    at org.apache.hadoop.mapred.SequenceFileOutputFormat.getReaders(SequenceFileOutputFormat.java:89)
    at org.apache.nutch.crawl.CrawlDbReader.processStatJob(CrawlDbReader.java:323)
    at org.apache.nutch.crawl.CrawlDbReader.main(CrawlDbReader.java:511)

and on the Mahout mailing list

Exception in thread "main" java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at java.io.DataInputStream.readFully(DataInputStream.java:152)
    at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1457)
    at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1435)
    at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1424)
    at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1419)
    at
org.apache.mahout.df.mapreduce.partial.Step0Job.parseOutput(Step0Job.java:145)
    at
org.apache.mahout.df.mapreduce.partial.Step0Job.run(Step0Job.java:119)
    at
org.apache.mahout.df.mapreduce.partial.PartialBuilder.parseOutput(PartialBuilder.java:115)
    at org.apache.mahout.df.mapreduce.Builder.build(Builder.java:338)
    at
org.apache.mahout.df.mapreduce.BuildForest.buildForest(BuildForest.java:195)

it seems that DataInputStream.readFully is choked by this file.

I would suggest setting mapreduce.fileoutputcommitter.marksuccessfuljobs to false and retrying your job - it should work.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜