开发者

Netty如何自定义编码解码器

目录
  • Netty 自定义编码解码器
    • 自定义一个解码器
      • 代码示例
    • 自定义一个编码器
      • 代码示例
    • 后续 -> io.netty.handler.codec.ReplayingDecoder
    • 总结

      Netty 自定义编码解码器

      入栈:InboundHandler ,出栈:OutboundHandler。Netty 构建 chain 来处理业务。

      自定义一个解码器

      解码器主要是对客户端发送的消息进行解码处理,所以他是一个入栈的处理器,因此他会有一个入栈的标识ibound=true;

      解码器一般我都们都会基层 Netty 提供给的一个实现类来实现自己的解码逻辑 -> io.netty.handler.codec.ByteToMessageDecoder 这就是解码的抽象类,默认我们要实现一个抽象方法

      com.netty.codec.custom.InboundAndOutboundHandler#decode 

      这个方法有大概三个参数;

      • ChannelHandlerContext channelHandlerContext 这个是上下文信息,可以获取通道管道等信息。
      • ByteBuf byteBuf 客户端发送的消息就是存在这个参数的对象里面我们要通过这个对象的read***方法读取我们需要的数据js类型,可以是 LongByte 等类型的数据然后我们可以就可以转换成为我们需要的格式。
      • List<Object> list 集合,将解码后的数据传递给下一个inboundHandler 处理类。

      代码示例

      @Override
      protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
          // Long =www.devze.com> 8 byte
          if (byteBuf.readableBytes() >= 8) {
              list.add(byteBuf.readLong());
          } else log.warn("字节数异常 => {}", byteBuf.readableBytes());
      }

      这样我们就实现了一个简单的解码器。

      自定义一个编码器

      解码器主要是对服务端将要发送给客户端的消息进行编码处理,所以他是一个出栈的处理器,因此他会有一个入栈的标识outbound=true;

      使用 Netty 提供的抽象类 => io.netty.handler.codec.MessageToByteEncoder<T> 泛型 T 表示你要发送的消息的类型,实现抽象方法 => com.netty.codec.custom.OutboundHandler#encode 方法的参数有三个:

      • ChannelHandlerContext channelHandlerContext 这个是上下文信息,可以获取通道管道等信息。
      • Long msg 服务端要发送给客户端的消息
      • ByteBuf byteBuf

      代码示例

      @Override
      protected void encode(ChannelHandlerContext channelHandlerContext, Long msg, ByteBuf byteBuf) {
          byteBufjs.writeLong(msg);
          log.info("发送消息成功");
      }

      后续 -> io.netty.handler.codec.ReplayingDecoder

      ByteToMessage 其实在使用过程中会遇到一些问题,例如:

      当我们的解码器中想要将字节转换为一个 Integer ,我们知道 Integer 是四个字节的,但是如果在读取的时候不够四个字节,这个时候我们就需要做一些判断逻辑 => if (byteBuf.readableBytes() >= 4) 当这个返回值为 true 的时候我么就可以继续执行解码的逻辑。

      那我们怎么可以跳过这一步不判断直接进行我们的转换逻辑呢?

      这个时候就可以使用 Netty 的

      io.netty.handler.codec.ReplayingDecoder

      可以不用判断可读字节数的原理:

      ReplayingDecoderByteToMessage 的子类,源码如下:

      public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
      android    ...
      }

      ReplayingDecoder 的秘密就是对 ByteToMessageCallDecode(...) 方法的重写,观摩一下具体实现:

      @Override
      protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
          replayable.setCumulation(in);
          try {
              while (in.isReadable()) {
                  int oldReaderIndex = checkpoint = in.readerIndex();
                  int outSize = out.size();
      
                  if (outSize > 0) {
                      fireChannelRead(ctx, out, outSize);
                      out.clear();
      
                      // Check if this handler was removed before continuing with decoding.
                      // If it was removed, it is not safe to continue to operate on the buffer.
                      //
                      // See:
                      // - https://github.com/netty/netty/issues/4635
                      if (ctx.isRemoved()) {
                          break;
                      }
                      outSize = 0;
                  }
      
                  S oldState = state;
                  int oldInputLength = in.readableBytes();
                  try {
                      decodeRemovalReentryProtection(ctx, replayable, out);
      
                      // Check if this handler was removed before continuing the loop.
                      // If it was removed, it is not safe to continue to operate on the buffer.
                      //
                      // See https://github.com/netty/netty/issues/1664
                      if (ctx.isRemoved()) {
                          break;
                      }
      
                      if (outSize == out.size()) {
                          if (oldInputLength == in.readableBytes() && oldState == state) {
                              throw new DecoderException(
                                      StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
                                      "data or change its state if it did not decode anything.");
                          } else {
                              // Previous data has been discarded or caused state transition.
                              // Probably it is reading on.
                              continue;
                          }
                      }
                  } catch (Signal replay) {
                      replay.expect(REPLAY);
      
                      // Check if this handler was removed before continuing the loop.
                      // If it was removed, it is not safe to continue to operate on the buffer.
                      //
                      // See https://github.com/netty/netty/issues/1664
                      if (ctx.isRemoved()) {
                          break;
                      }
      
                      // Return to the checkpoint (or oldPosition) and retry.
                      int checkpoint = this.checkpoint;
                      if (checkpoint >= 0) {
                          in.readerIndex(checkpoint);
                      } else {
                          // Called by cleanup() - no need to maintain the readerIndex
                          // anymore because the buffer has been released already.
                      }
                      break;
                  }
      
                  if (oldReaderIndex == in.readerIndex() && oldState == state) {
                      throw new DecoderException(
                             StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
                             "or change its state if it decoded something.");
                  }
                  if (isSingleDecode()) {
                      break;
                  }
              }
          } catch (DecoderException e) {
              throw e;
          } catch (Exception cause) {
              throw new DecoderException(cause);
          }
      }

      实现不需要判断的逻辑就是因为

      int oldReaderIndex = checkpoint = in.readerIndex(); 

      如果在执行过程中出现异常就会在代码中重置 index

      总结

      虽然 ReplayingDecoder节约了判断的逻辑,但是从他的代码实现http://www.devze.com逻辑看到是通过抛出异常来不断的重试,所以在某些特殊的情况下会造成性能的下降。所以还是再选择的时候要根据自己的实际需求来判断是使用 ByteToMessage 还是使用 ReplayingDecoder

      以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜