开发者

Jboss Netty - How to serve 2 connections using 3 worker threads

Just as a simple example, lets say I want to handle 3 simultaneous TCP client connections using only 2 worker threads in netty, how would I do it?

Questions A) With the code below, my third connection doesn't get any data from the server - the connection just sits there. Notice - how my worker executor and worker count is 2. So if I have 2 worker threads and 3 connections, shouldnt all three connections be served by the 2 threads?

B) Another question is - Does netty use CompletionService of java.util.concurrent? It doesnt seem to use it. Also, I didnt see any source code that does executor.submit or future.get So all this has added to the confusion of how it handles and serves data to connections that are MORE than its worker threads?

C) I'm lost on how netty handles 10000+ simultaneous TCP connections....will it create 10000 threads? Thread per connection is not a scalable solution, so I'm confused, because how my test code doesnt work as expected.

    import java.net.InetSocketAddress;
    import java.nio.channels.ClosedChannelException;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.logging.Level;
    import java.util.logging.Logger;

    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelFuture;
    import org.jboss.netty.channel.ChannelFutureListener;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    import org.jboss.netty.handler.codec.string.StringEncoder;

    public class SRNGServer {

      public static void main(String[] args) throws Exception {
          // Configure the server.
          ServerBootstrap bootstrap = new ServerBootstrap(
                  new NioServerSocketChannelFactory(
                          Executors.newCachedThreadPool(),
                          //Executors.newCachedThreadPool()
                          Executors.newFixedThreadPool(2),2
                          ));

          // Configure the pipeline factory.
          bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP());

          // Bind and start to accept incoming connections.
          bootstrap.bind(new 开发者_如何学PythonInetSocketAddress(8080));
      }



      private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler {

        private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName());


        @Override
        public void channelConnected(
                ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

          // Send greeting for a new connection.
          Channel ch=e.getChannel();

          System.out.printf("channelConnected with channel=[%s]%n", ch);

          ChannelFuture writeFuture=e.getChannel().write("It is " + new Date() + " now.\r\n");

          SRNGChannelFutureListener srngcfl=new SRNGChannelFutureListener();

          System.out.printf("Registered listener=[%s] for future=[%s]%n", srngcfl, writeFuture);

          writeFuture.addListener(srngcfl);      

        }

        @Override
        public void exceptionCaught(
                ChannelHandlerContext ctx, ExceptionEvent e) {

            logger.log(
                    Level.WARNING,
                    "Unexpected exception from downstream.",
                    e.getCause());
            if(e.getCause() instanceof ClosedChannelException){
              logger.log(Level.INFO, "****** Connection closed by client - Closing Channel");
            }
            e.getChannel().close();
        }
      }



      private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory {

        public ChannelPipeline getPipeline() throws Exception {

            // Create a default pipeline implementation.
            ChannelPipeline pipeline = Channels.pipeline();

            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new SRNGServerHandlerP());

            return pipeline;
        }
      }


      private static class SRNGChannelFutureListener implements ChannelFutureListener{

        public void operationComplete(ChannelFuture future) throws InterruptedException{
          Thread.sleep(1000*5);
          Channel ch=future.getChannel();
          if(ch!=null && ch.isConnected()){
              ChannelFuture writeFuture=ch.write("It is " + new Date() + " now.\r\n");
              //-- Add this instance as listener itself.
              writeFuture.addListener(this);
          }

        }

      }
    }


I haven't analyzed your source code in detail, so I don't know exactly why it doesn't work properly. But this line in SRNGChannelFutureListener looks suspicious:

Thread.sleep(1000*5);

This will make the thread that executes it be locked for 5 seconds; the thread will not be available to do any other processing during that time.

Question C: No, it will not create 10,000 threads; the whole point of Netty is that it doesn't do that, because that would indeed not scale very well. Instead, it uses a limited number of threads from a thread pool, generates events whenever something happens, and runs event handlers on the threads in the pool. So, threads and connections are decoupled from each other (there is not a thread for each connection).

To make this mechanism work properly, your event handlers should return as quickly as possible, to make the threads that they run on available for running the next event handler as quickly as possible. If you make a thread sleep for 5 seconds, then you're keeping the thread allocated, so it won't be available for handling other events.

Question B: If you really want to know you could get the source code to Netty and find out. It uses selectors and other java.nio classes for doing asynchronous I/O.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜