开发者

PipedInputStream - How to avoid "java.io.IOException: Pipe broken"

I have two threads. One of them writes to PipedOutputStream, another one reads from corresponding PipedInputStream. Background is that one thread is downloading some data from remote server and multiplexes it to several other threads through piped streams.

The problem is that sometimes (especially when downloading large (>50Mb开发者_如何学C) files) I get java.io.IOException: Pipe broken when trying to read from PipedInputStream.

Javadoc says that A pipe is said to be broken if a thread that was providing data bytes to the connected piped output stream is no longer alive.

It is true, my writing thread really dies after writing all his data to PipedOutputStream.

Any solutions? How can I prevent PipedInputStream from throwing this exception? I want to be able to read all data that was written to PipedOutputStream even if writing thread finished his work. (If anybody knows how to keep writing thread alive until all data will be read, this solution is also acceptable).


Use a java.util.concurrent.CountDownLatch, and do not end the first thread before the second one has signaled that is has finished reading from the pipe.

Update: quick and dirty code to illustrate my comment below

    final PipedInputStream pin = getInputStream();
    final PipedOutputStream pout = getOutputStream();

    final CountDownLatch latch = new CountDownLatch(1);

    InputStream in = new InputStream() {

        @Override
        public int read() throws IOException {
            return pin.read();
        }

        @Override
        public void close() throws IOException {
            super.close();
            latch.countDown();
        }
    };


    OutputStream out = new OutputStream(){

        @Override
        public void write(int b) throws IOException {
            pout.write(b);
        }

        @Override
        public void close() throws IOException {
            while(latch.getCount()!=0) {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    //too bad
                }
            }
            super.close();
        }
    };

    //give the streams to your threads, they don't know a latch ever existed
    threadOne.feed(in);
    threadTwo.feed(out);


Are you closing your PipedOutputStream when the thread that's using it ends? You need to do this so the bytes in it will get flushed to the corresponding PipedInputStream.


PipedInputStream and PipedOutputStream are broken (with regards to threading). They assume each instance is bound to a particular thread. This is bizarre. I suggest using your own (or at least a different) implementation.


You might encounter problems with these classes when you use more than one reader or writer thread (see JDK-4028322).

However most users likely only use one reader and one writer thread. Since this is the case for you as well, the reason why you are encountering a broken pipe is most likely that you did not close the PipedOutputStream once you are done writing.

PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);

// You can of course also use an Executor or similar for this
new Thread(() -> {
    // Your method writing the data
    writeDataTo(out);
    // Close PipedOutputStream so that reader knows that the end of the data 
    // has been reached
    try {
        out.close();
    }
    catch (IOException e) {
        // Handle exception or simply ignore it; current implementation will NEVER 
        // throw an IOException: https://github.com/openjdk/jdk/blob/0e14d5a98453407488057e6714f90f04d7dfa383/src/java.base/share/classes/java/io/PipedOutputStream.java#L174
    }
}).start();

// Your method reading the data
readDataFrom(in);
// Close PipedInputStream so that writer fails instead of blocking infinitely in case 
// it tries to write again (for whatever reason)
in.close();

There is also no need to manually call PipedOutputStream.flush(), it only notifies waiting readers, but no data is lost if you directly call close().

Sadly the documentation is at the moment not very clear about all of this. In general it is not a good idea to rely on the implementation, but in this case it might be the only sensible solution:

  • https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/io/PipedInputStream.java
  • https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/io/PipedOutputStream.java
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜