开发者

Mixing NIO with IO

Usually you have a single bound tcp port and several connections on these. At least there are usually more connections as bound ports. My case is different: I want to bind a lot of ports and usually have no (or at least very few) connections.

So I want to use NIO to accept the incoming connections.

However, I need to pass the accepted connections to the existing jsch ssh library. That requires IO sockets instead of NIO sockets, it spawns one (or two) thread(s) per connection. But that's fine for me.

Now, I thought that the following lines would deliver the very same result:

Socket a = serverSocketChannel.accept().socket();
Socket b = serverSocketChannel.socket().accept();
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking( true );
Socket c = channel.socket();
Socket d = serverSocket.accept();

However the getInputStream() and getOut开发者_开发百科putStream() functions of the returned sockets seem to work different. Only if the socket was accepted using the last call, jsch can work with it. In the first three cases, it fails (and I am sorry: I don't know why).

So is there a way to convert such a socket?


The input and output streams returned by sockets obtained from SocketChannels are internally synchronized on the channel at some points. So you can't use them for full-duplex protocols like SSH, because the system will lock up. Same applies to streams converted from channels via the Channels class (which is what the first case amounts to).


I had to do this because we use JSch and had 1000's of threads listening on ports. I'll leave it to you do handle accepting the connections, but this works with JSch from a thread launched to handle the connection:

/**
 * Handler that allows for input and output streams of an NIO socket to be
 * read as blocking I/O
 * <p>
 * Reading and writing from the streams <b>must</b> be done on the same thread
 * @author Eric
 *
 */
public class NioBlockingSocketStreamer
{

    private static long NIO_SELECT_TIMEOUT = 0;
    private static long NIO_WRITE_TIMEOUT_SECONDS = 30;
    /** poison pill result that signals a disconnected channel */
    private static int NIO_READ_IOEXCEPTION = -2;

    private final SocketChannel socketChannel;

    private Selector selector;

    private final InputStream inputStream = new MyInputStream(this);
    private final OutputStream outputStream = new MyOutputStream(this);

    private final LinkedBlockingQueue<ByteBuffer> pendingWrites;
    private final MyReadQueue<ReadRequest> pendingRead;

    private volatile boolean disconnected = false;

    public NioBlockingSocketStreamer(SocketChannel socketChannel,
                                     int writeQueueSize)
    {
        pendingRead = new MyReadQueue<ReadRequest>( );
        pendingWrites = new LinkedBlockingQueue<ByteBuffer>(writeQueueSize);
        this.socketChannel = socketChannel;
    }

    public NioBlockingSocketStreamer start( )
            throws IOException
    {
        this.selector = Selector.open( );
        this.socketChannel.configureBlocking(false);

        final NioBlockingSocketStreamer streamer = this;
        Thread t = new Thread(new Runnable( )
        {
            @Override
            public void run( )
            {
                try
                {
                    streamer.nioServiceLoop( );
                }
                catch (ClosedChannelException e)
                {
                    //shutting down
                }
                catch (ClosedSelectorException e)
                {
                    //shutting down
                }
                catch (IOException e)
                {
                    e.printStackTrace( );
                }
            }
        });
        t.start( );
        return this;
    }

    public void nioServiceLoop( )
            throws IOException
    {
        ByteBuffer writeBuffer = null;
        ReadRequest readRequest = null;

        try
        {
            for (;;)
            {
                if (readRequest == null)
                    readRequest = pendingRead.poll( );
                int interestOps = readRequest != null
                        ? SelectionKey.OP_READ
                        : 0;
                if (writeBuffer == null && pendingWrites.isEmpty( ))
                    interestOps &= ~SelectionKey.OP_WRITE;
                else
                    interestOps |= SelectionKey.OP_WRITE;
                socketChannel.register(selector, interestOps);

                int n = selector.select(NIO_SELECT_TIMEOUT);
                if (n > 0)
                {
                    Set<SelectionKey> selectedKeys = selector.selectedKeys( );
                    for (SelectionKey selectedKey : selectedKeys)
                    {
                        if (selectedKey.isValid( ))
                        {
                            if (selectedKey.isReadable( ))
                            {
                                int cbRead;
                                try
                                {
                                    cbRead = socketChannel.read(readRequest.buffer);
                                }
                                catch (IOException e)
                                {
                                    //"java.io.IOException: An existing connection was forcibly closed by the remote host"
                                    //is the typical disconnection
                                    return;
                                }
                                interestOps &= ~SelectionKey.OP_READ;
                                readRequest.submitResult(cbRead);
                                readRequest = null;
                            }

                            if (selectedKey.isWritable( ))
                            {
                                if (writeBuffer == null)
                                    writeBuffer = pendingWrites.poll( );

                                if (writeBuffer != null)
                                {
                                    try
                                    {
                                        socketChannel.write(writeBuffer);
                                    }
                                    catch (IOException e)
                                    {
                                        //"java.io.IOException: An existing connection was forcibly closed by the remote host"
                                        //is the typical disconnection
                                        return;
                                    }
                                    if (!writeBuffer.hasRemaining( ))
                                        writeBuffer = null;
                                }
                            }
                        }
                    }
                    selectedKeys.clear( );
                }
            }
        }
        finally
        {
            disconnected = true;

            //submit poison pill result for outstanding read request
            if (readRequest != null)
            {
                readRequest.submitResult(NIO_READ_IOEXCEPTION);
            }

            //since the read request queue became empty before we disconnected, check for the next
            //read request that may have queued in the meantime
            readRequest = pendingRead.poll( );
            if (readRequest != null)
            {
                readRequest.submitResult(NIO_READ_IOEXCEPTION);
            }
        }
    }

    private ReadRequest readRequest = new ReadRequest( );

    public synchronized int read(ByteBuffer buffer)
            throws IOException
    {
        if (disconnected)
            throw new IOException("disconnected");

        readRequest.buffer = buffer;
        int result;
        try
        {
            pendingRead.offer(readRequest);
            selector.wakeup( );
            result = readRequest.getResult( );
        }
        catch (InterruptedException e)
        {
            throw new IOException(e);
        }

        if (result == NIO_READ_IOEXCEPTION)
            throw new IOException("disconnected");

        return result;
    }

    public void write(ByteBuffer buffer)
            throws IOException
    {
        if (disconnected)
            throw new IOException("disconnected");

        boolean queued = false;
        if (pendingWrites.remainingCapacity( ) > 0)
        {
            try
            {
                queued = pendingWrites.add(buffer);
            }
            catch (IllegalStateException e)
            {
                throw new IOException("write queue capacity exceeded");
            }
        }
        else
        {
            try
            {
                queued = pendingWrites.offer(buffer, NIO_WRITE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
            }
            catch (InterruptedException e)
            {
                throw new IOException(e);
            }
        }

        if (!queued)
            throw new IOException("unable to queue write");

        if (selector != null) //selector should never be null
        {
            selector.wakeup( );
        }
    }

    public InputStream getInputStream( )
    {
        return inputStream;
    }

    public OutputStream getOutputStream( )
    {
        return outputStream;
    }

    /**
     * Implements a simple blocking mechanism for getting the results of a read
     * <p>
     * <b>Note</b> This class must avoid autoboxing due to Retroweaver
     */
    static class ReadRequest
    {
        ByteBuffer buffer;
        LinkedBlockingQueue<Integer> result = new LinkedBlockingQueue<Integer>(1);

        void submitResult(int cb)
        {
            result.add(new Integer(cb));
        }

        int getResult( )
                throws InterruptedException
        {
            return (int) result.take( );
        }
    }

    static class MyInputStream
        extends InputStream
    {

        private final NioBlockingSocketStreamer streamer;

        MyInputStream(NioBlockingSocketStreamer streamer)
        {
            this.streamer = streamer;
        }

        @Override
        public int read(byte[] b,
                        int off,
                        int len)
                throws IOException
        {
            ByteBuffer buffer = ByteBuffer.allocate(len);
            int n = streamer.read(buffer);
            if (n > 0)
                System.arraycopy(buffer.array( ), 0, b, off, n);
            return n;
        }

        @Override
        public int read(byte[] b)
                throws IOException
        {
            return read(b, 0, b.length);
        }

        @Override
        public int read( )
                throws IOException
        {
            return read(new byte[1], 0, 1);
        }

        @Override
        public void close( )
                throws IOException
        {
            // TODO Auto-generated method stub
        }

    }

    static class MyOutputStream
        extends OutputStream
    {

        private final NioBlockingSocketStreamer streamer;

        MyOutputStream(NioBlockingSocketStreamer streamer)
        {
            this.streamer = streamer;
        }

        @Override
        public void write(byte[] b,
                          int off,
                          int len)
                throws IOException
        {
            ByteBuffer buffer = ByteBuffer.allocate(len);
            buffer.put(b, off, len);
            buffer.flip( );
            streamer.write(buffer);
        }

        @Override
        public void write(byte[] b)
                throws IOException
        {
            write(b, 0, b.length);
        }

        @Override
        public void write(int b)
                throws IOException
        {
            write(new byte[1], 0, 1);
        }

        @Override
        public void close( )
                throws IOException
        {
            // TODO Auto-generated method stub
        }

        @Override
        public void flush( )
                throws IOException
        {
            // TODO Auto-generated method stub
        }
    }

    static class MyReadQueue<T>
    {
        //FIXME using LinkedBlockingQueue is overkill
        private final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(1);

        public T poll( )
        {
            return queue.poll( );
        }

        public void offer(T request)
                throws InterruptedException
        {
            queue.offer(request, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        }
    }

}


This sequence works in our production environment:

final SocketAddress serverAddr =
    new InetSocketAddress(
        bind_address,
        server_port
    );

final ServerSocketChannel serverChannel = ServerSocketChannel.open( );

serverChannel.socket( ).bind(
    serverAddr,
    backlog
);

final SocketChannel socketChannel = serverChannel.accept( );

final Socket socket = socketChannel.socket( );

final OutputStream out = socket.getOutputStream( );

final InputStream in = socket.getInputStream( );

All channels will be in blocking mode by default.

Beware, that I did not show any cleanup/exception handling code.

Also, there are some bugs related to NIO and socket streams

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜