深入理解Netty编解码、粘包拆包、心跳机制( 五 )
@Overrideprotected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());ch.pipeline().addLast(new ProtobufEncoder());ch.pipeline().addLast(new TcpClientHandler());}
接收方加上解码器 ProtobufVarint32FrameDecoder
@Overrideprotected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance())); //给pipeline管道设置处理器 ch.pipeline().addLast(new TcpServerHandler());}
然后再启动服务端和客户端 , 我们可以看到正常了~
ProtobufVarint32LengthFieldPrepender 编码器的工作如下:
* BEFORE ENCODE (300 bytes)AFTER ENCODE (302 bytes) * +---------------++--------+---------------+ * | Protobuf Data |-------------->| Length | Protobuf Data | * |(300 bytes)|| 0xAC02 |(300 bytes)| * +---------------++--------+---------------+@Sharablepublic class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {int bodyLen = msg.readableBytes();int headerLen = computeRawVarint32Size(bodyLen);//写入请求头 , 消息长度out.ensureWritable(headerLen + bodyLen);writeRawVarint32(out, bodyLen);//写入数据out.writeBytes(msg, msg.readerIndex(), bodyLen);}}
ProtobufVarint32FrameDecoder 解码器的工作如下:
* BEFORE DECODE (302 bytes)AFTER DECODE (300 bytes) * +--------+---------------++---------------+ * | Length | Protobuf Data |----->| Protobuf Data | * | 0xAC02 |(300 bytes)||(300 bytes)| * +--------+---------------++---------------+public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List
总结:
- 发送端通过编码器在发送的时候在消息体前面加上一个描述数据长度的数据块 。
- 接收方通过解码器先获取描述数据长度的数据块 , 知道完整数据的长度 , 然后根据数据长度获取一条完整的数据 。
注:心跳包还有另一个作用 , 经常被忽略 , 即:一个连接如果长时间不用 , 防火墙或者路由器就会断开该连接 。
在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 看下它的构造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);}
三个参数的含义如下:- readerIdleTimeSeconds: 读超时 。 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件 。
- writerIdleTimeSeconds: 写超时 。即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件 。
- allIdleTimeSeconds: 读/写超时 。即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件 。
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}
要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
Netty心跳源码分析初步地看下IdleStateHandler源码 , 先看下IdleStateHandler中的channelRead方法:文章插图
红框代码其实表示该方法只是进行了透传 , 不做任何业务逻辑处理 , 让channelPipe中的下一个handler处理channelRead方法;
- 启动|拼多多深入布局母婴产业带 补贴+直播启动“母婴产品溯源”行动
- RFID在冷链物流中的作用-RFID冷链资产管理解决方案
- 《深入理解Java虚拟机》:对象创建、布局和访问全过程
- 都说编程要逻辑好,如何理解这个逻辑
- 关于Netty ByteBuf 的零拷贝
- 成长思维:我大哥对“道法术器”的理解,80%的人不懂
- Linux信号透彻分析理解与各种实例讲解
- 从底层理解this是什么
- 常用的NIO框架-Netty
- 《深入理解Java虚拟机》:锁优化