Netty-11-channelHandler的生命周期

我们之前在使用channelHanndler的时候,只重写了channelRead()channelActive()这两个方法,它里面还有一些其他的方法, 这些方法的回调顺序就是channelHandler的生命周期

案例

新建一个handler,名称是LifeCyCleTestHandler,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
public class LifeCycleTestHandler extends ChannelInboundHandlerAdapter {

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("逻辑处理器被添加....");
super.handlerAdded(ctx);
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("channel 绑定到线程(NioEventLoop): channelRegistered()");
super.channelRegistered(ctx);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("channel准备就绪:channelActive()");
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("channel有数据可读:channelRead()");
super.channelRead(ctx, msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("某次数据读取完毕.....channelReadComplete()");
super.channelReadComplete(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channel被关闭:channelInactive()");
super.channelInactive(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("channel取消线程(NioEventLoop)的绑定:channelUnregistered()");
super.channelUnregistered(ctx);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("逻辑处理器被移除:handlerRemoved()");
super.handlerRemoved(ctx);
}

}

重写了一部分方法,然后每个方法被调用的时候都打印了一段信息,然后把这个事件继续往下传播,最后把这个handler添加到服务端的第一个位置ch.pipeline().addLast(new LifeCycleTestHandler()),运行客户端,服务端的输出如下:
image

可以看到这里的ChannelHandler的顺序如下:
handlerAdded() –> channelRegistered() –> channelActive() –> channelRead() –> channelReadComplete()

每个回调方法的具体含义如下:

  • handlerAdded(): 当检测到新的连接之后,调用ch.pipeline().addLast(new LifeCycleTestHandler())之后的回调,表示当前的channel中已经成功添加了一个逻辑处理器
  • channelRegistered(): 表示当前的 channel 的所有的逻辑处理已经和某个 NIO 线程建立了绑定关系
  • channelActive(): 当 channel 的 pipeline 中已经添加完所有的 handler 以及绑定好一个NIO线程之后,这条连接算是真正激活了,接下来就会回调这个方法
  • channelRead(): 客户端向服务端每次发来数据之后,都会回调这个方法,表示有数据可读
  • channelReadComplete(): 服务端每次读完一次完整的数据之后,都会回调这个方法,表示数据读取完毕

然后客户端的连接关闭(channel被关闭),之后再看服务端的控制台输出:
image

执行顺序如下:
channelInactive() –> channelUnregistered() –> handlerRemoved()

这里跟新建的时候其实是相反的

  • channelInactive(): 表示这条连接被关闭了
  • channelUnregistered(): 表明与这条连接对应的 NIO 线程移除掉对这条连接的处理
  • handlerRemoved(): 最后把这条连接上的所有逻辑处理器全部移除掉

channelHandler生命周期图

image

ChannelInitializer实现原理

服务端启动的部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline()
.addLast(new LifeCycleTestHandler())
.addLast(new Spliter())
.addLast(new PacketDecoder())
.addLast(new LoginRequestHandler())
.addLast(new MessageRequestHandler())
.addLast(new PacketEncoder());
}
});

这里是通过.childHandler()方法,给连接设置一个handler(ChannelInitializer),然后在ChannelInitializerinitChannel()方法里,拿到channel对应的pipeline,往里面添加我们的handler

看一下ChannelInitializer的源码,关键部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected abstract void initChannel(C ch) throws Exception;

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// ...
initChannel(ctx);
// ...
}

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// ...
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
// ...
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
initChannel((C) ctx.channel());
// ...
return true;
}
return false;
}

这里定义了一个抽象方法initChannel(),这个方法由我们自己去实现,服务端启动流程实现逻辑是往Pipeline里面添加Handler处理器链

handlerAdded()channelRegistered()方法都尝试调用initChannel()方法,initChannel()使用putIfAbsent()防止initChannel()调用多次.

handlerAdded() 与 handlerRemoved()

通常可以用在一些资源的申请和释放

channelActive() 与 channelInActive()

这两个方法表明的含义是TCP连接的建立与释放,通常可以用于统计单机的连接数, 在channelActive()方法里面还可以过滤客户端连接IP黑白名单.

channelRead()

用于服务端根据自定义协议来进行拆包等.

channelReadComplete()

之前写数据的时候都是用writeAndFlush()这个方法,这个方法效率不是很高,可以把这个方法换成write(),然后在channelReadComplete()里面调用ctx.channel().flush()方法进行批量刷新.