述
我们之前在使用channelHanndler的时候,只重写了channelRead()
和channelActive()
这两个方法,它里面还有一些其他的方法, 这些方法的回调顺序就是channelHandler
的生命周期
案例
新建一个handler
,名称是LifeCyCleTestHandler
,具体代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52@Slf4j
public class LifeCycleTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("逻辑处理器被添加....");
super.handlerAdded(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("channel 绑定到线程(NioEventLoop): channelRegistered()");
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("channel准备就绪:channelActive()");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("channel有数据可读:channelRead()");
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("某次数据读取完毕.....channelReadComplete()");
super.channelReadComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channel被关闭:channelInactive()");
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("channel取消线程(NioEventLoop)的绑定:channelUnregistered()");
super.channelUnregistered(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("逻辑处理器被移除:handlerRemoved()");
super.handlerRemoved(ctx);
}
}
重写了一部分方法,然后每个方法被调用的时候都打印了一段信息,然后把这个事件继续往下传播,最后把这个handler添加到服务端的第一个位置ch.pipeline().addLast(new LifeCycleTestHandler())
,运行客户端,服务端的输出如下:
可以看到这里的ChannelHandler
的顺序如下:handlerAdded()
–> channelRegistered()
–> channelActive()
–> channelRead()
–> channelReadComplete()
每个回调方法的具体含义如下:
handlerAdded()
: 当检测到新的连接之后,调用ch.pipeline().addLast(new LifeCycleTestHandler())
之后的回调,表示当前的channel中已经成功添加了一个逻辑处理器channelRegistered()
: 表示当前的 channel 的所有的逻辑处理已经和某个 NIO 线程建立了绑定关系channelActive()
: 当 channel 的 pipeline 中已经添加完所有的 handler 以及绑定好一个NIO线程之后,这条连接算是真正激活了,接下来就会回调这个方法channelRead()
: 客户端向服务端每次发来数据之后,都会回调这个方法,表示有数据可读channelReadComplete()
: 服务端每次读完一次完整的数据之后,都会回调这个方法,表示数据读取完毕
然后客户端的连接关闭(channel被关闭),之后再看服务端的控制台输出:
执行顺序如下:channelInactive()
–> channelUnregistered()
–> handlerRemoved()
这里跟新建的时候其实是相反的
channelInactive()
: 表示这条连接被关闭了channelUnregistered()
: 表明与这条连接对应的 NIO 线程移除掉对这条连接的处理handlerRemoved()
: 最后把这条连接上的所有逻辑处理器全部移除掉
channelHandler生命周期图
ChannelInitializer实现原理
服务端启动的部分代码如下:1
2
3
4
5
6
7
8
9
10
11
12.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline()
.addLast(new LifeCycleTestHandler())
.addLast(new Spliter())
.addLast(new PacketDecoder())
.addLast(new LoginRequestHandler())
.addLast(new MessageRequestHandler())
.addLast(new PacketEncoder());
}
});
这里是通过.childHandler()
方法,给连接设置一个handler(ChannelInitializer
),然后在ChannelInitializer
的initChannel()
方法里,拿到channel对应的pipeline,往里面添加我们的handler
看一下ChannelInitializer
的源码,关键部分如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24protected abstract void initChannel(C ch) throws Exception;
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// ...
initChannel(ctx);
// ...
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// ...
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
// ...
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
initChannel((C) ctx.channel());
// ...
return true;
}
return false;
}
这里定义了一个抽象方法initChannel()
,这个方法由我们自己去实现,服务端启动流程实现逻辑是往Pipeline里面添加Handler处理器链
handlerAdded()
和channelRegistered()
方法都尝试调用initChannel()
方法,initChannel()
使用putIfAbsent()
防止initChannel()
调用多次.
handlerAdded() 与 handlerRemoved()
通常可以用在一些资源的申请和释放
channelActive() 与 channelInActive()
这两个方法表明的含义是TCP连接的建立与释放,通常可以用于统计单机的连接数, 在channelActive()方法里面还可以过滤客户端连接IP黑白名单.
channelRead()
用于服务端根据自定义协议来进行拆包等.
channelReadComplete()
之前写数据的时候都是用writeAndFlush()
这个方法,这个方法效率不是很高,可以把这个方法换成write()
,然后在channelReadComplete()
里面调用ctx.channel().flush()
方法进行批量刷新.