深入理解Netty编解码、粘包拆包、心跳机制( 五 )


@Overrideprotected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());ch.pipeline().addLast(new ProtobufEncoder());ch.pipeline().addLast(new TcpClientHandler());}接收方加上解码器 ProtobufVarint32FrameDecoder
@Overrideprotected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance())); //给pipeline管道设置处理器 ch.pipeline().addLast(new TcpServerHandler());}然后再启动服务端和客户端 , 我们可以看到正常了~
ProtobufVarint32LengthFieldPrepender 编码器的工作如下:
* BEFORE ENCODE (300 bytes)AFTER ENCODE (302 bytes) * +---------------++--------+---------------+ * | Protobuf Data |-------------->| Length | Protobuf Data | * |(300 bytes)|| 0xAC02 |(300 bytes)| * +---------------++--------+---------------+@Sharablepublic class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {int bodyLen = msg.readableBytes();int headerLen = computeRawVarint32Size(bodyLen);//写入请求头 , 消息长度out.ensureWritable(headerLen + bodyLen);writeRawVarint32(out, bodyLen);//写入数据out.writeBytes(msg, msg.readerIndex(), bodyLen);}}ProtobufVarint32FrameDecoder 解码器的工作如下:
* BEFORE DECODE (302 bytes)AFTER DECODE (300 bytes) * +--------+---------------++---------------+ * | Length | Protobuf Data |----->| Protobuf Data | * | 0xAC02 |(300 bytes)||(300 bytes)| * +--------+---------------++---------------+public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {//标记读取的下标位置in.markReaderIndex();//获取读取的下标位置int preIndex = in.readerIndex();//解码 , 获取消息的长度,并且移动读取的下标位置int length = readRawVarint32(in);//比较解码前和解码后的下标位置 , 如果相等 。 表示字节数不够读取 , 跳到下一轮if (preIndex == in.readerIndex()) {return;}//如果消息的长度小于0 , 抛出异常if (length < 0) {throw new CorruptedFrameException("negative length: " + length);}//如果不够读取一个完整的数据 , reset还原下标位置 。if (in.readableBytes() < length) {in.resetReaderIndex();} else {//否则 , 把数据写入到out , 接收端就拿到了完整的数据了out.add(in.readRetainedSlice(length));} }总结:
  • 发送端通过编码器在发送的时候在消息体前面加上一个描述数据长度的数据块 。
  • 接收方通过解码器先获取描述数据长度的数据块 , 知道完整数据的长度 , 然后根据数据长度获取一条完整的数据 。
Netty心跳检测机制何为心跳所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.
注:心跳包还有另一个作用 , 经常被忽略 , 即:一个连接如果长时间不用 , 防火墙或者路由器就会断开该连接 。
在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 看下它的构造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);}三个参数的含义如下:
  • readerIdleTimeSeconds: 读超时 。 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件 。
  • writerIdleTimeSeconds: 写超时 。即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件 。
  • allIdleTimeSeconds: 读/写超时 。即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件 。
注:这三个参数默认的时间单位是秒 。 若需要指定其他时间单位 , 可以使用另一个构造方法:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));Netty心跳源码分析初步地看下IdleStateHandler源码 , 先看下IdleStateHandler中的channelRead方法:
深入理解Netty编解码、粘包拆包、心跳机制文章插图
红框代码其实表示该方法只是进行了透传 , 不做任何业务逻辑处理 , 让channelPipe中的下一个handler处理channelRead方法;