深入理解Netty编解码、粘包拆包、心跳机制( 六 )


我们再看看channelActive方法:
深入理解Netty编解码、粘包拆包、心跳机制文章插图
这里有个initialize的方法 , 这是IdleStateHandler的精髓 , 接着探究:
深入理解Netty编解码、粘包拆包、心跳机制文章插图
这边会触发一个Task , ReaderIdleTimeoutTask , 这个task里的run方法源码是这样的:
深入理解Netty编解码、粘包拆包、心跳机制文章插图
第一个红框代码是用当前时间减去最后一次channelRead方法调用的时间 , 假如这个结果是6s , 说明最后一次调用channelRead已经是6s 之前的事情了 , 你设置的是5s , 那么nextDelay则为-1 , 说明超时了 , 那么第二个红框代码则会触发下一个handler的 userEventTriggered方法:
深入理解Netty编解码、粘包拆包、心跳机制文章插图
如果没有超时则不触发userEventTriggered方法 。
Netty心跳检测代码示例服务端package com.niuh.netty.heartbeat;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;public class HeartBeatServer {public static void main(String[] args) throws Exception {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());//IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接 ,//会触发IdleStateEvent事件并且交给下一个handler处理 , 下一个handler必须//实现userEventTriggered方法处理对应事件pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HeartBeatServerHandler());}});System.out.println("netty server start 。。 ");ChannelFuture future = bootstrap.bind(9000).sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {worker.shutdownGracefully();boss.shutdownGracefully();}}}服务端回调处理类
package com.niuh.netty.heartbeat;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.timeout.IdleStateEvent;public class HeartBeatServerHandler extends SimpleChannelInboundHandler {int readIdleTimes = 0;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {System.out.println(" ====== > [server] message received : " + s);if ("Heartbeat Packet".equals(s)) {ctx.channel().writeAndFlush("ok");} else {System.out.println(" 其他信息处理 ... ");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;String eventType = null;switch (event.state()) {case READER_IDLE:eventType = "读空闲";readIdleTimes++; // 读空闲的计数加1break;case WRITER_IDLE:eventType = "写空闲";// 不处理break;case ALL_IDLE:eventType = "读写空闲";// 不处理break;}System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);if (readIdleTimes > 3) {System.out.println(" [server]读空闲超过3次 , 关闭连接 , 释放更多资源");ctx.channel().writeAndFlush("idle close");ctx.channel().close();}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");}}客户端package com.niuh.netty.heartbeat;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import java.util.Random;public class HeartBeatClient {public static void main(String[] args) throws Exception {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new HeartBeatClientHandler());}});System.out.println("netty client start 。。 ");Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();String text = "Heartbeat Packet";Random random = new Random();while (channel.isActive()) {int num = random.nextInt(10);Thread.sleep(2 * 1000);channel.writeAndFlush(text);}} catch (Exception e) {e.printStackTrace();} finally {eventLoopGroup.shutdownGracefully();}}static class HeartBeatClientHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(" client received :" + msg);if (msg != nullctx.channel().closeFuture();}}}}