述
我们在实现完一系列的功能之后,会有很多的handler通过pipeline加到channel中去, 下面看一下如何去优化这些handler
优化第一步:共享handler
首先来看下服务端的代码:
指令相关的handler很多,netty在这里的逻辑是:每次有新连接到来的时候,都会调用ChannelInitializer
的 initChannel()
方法,然后这里指令相关的 handler 都会被 new 一次
这里的每一个指令handler的内部都没有成员变量的,也就是无状态的,这里就可以使用单例模式,不需要每次都new对象,提高效率,下面看一下具体如何实现
以LoginRequestHandler为例,看一下如何修改,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14@Slf4j
@ChannelHandler.Sharable
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
public static final LoginRequestHandler INSTANCE = new LoginRequestHandler();
protected LoginRequestHandler() {
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginRequestPacket loginRequestPacket) throws Exception {
// ... 省略具体逻辑
}
}
这里首先需要加一个`@ChannelHandler.Sharable`注解,表示这个handler是要被多个channel共享的,不加这个会报错
然后就是把构造堵死,用INSTANCE变量获取对象
最后添加到channel用.addLast(LoginRequestHandler.INSTANCE)
这里有一个叫Spliter
的handler,他内部实现是与每个 channel 有关,每个 Spliter 需要维持每个 channel 当前读到的数据,也就是说他是有状态的,所以这个类是没法去改的
优化第二步: 压缩 handler
下面看一下服务端编解码用的这两个handler
- 解码: PacketDecoder 继承了 ByteToMessageDecoder
- 编码: PacketEncoder 继承了 MessageToByteEncoder
这两个类其实可以合并到一起,继承MessageToMessageCodec
就ok了,新建一个类,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> {
public static final PacketCodecHandler INSTANCE = new PacketCodecHandler();
protected PacketCodecHandler(){}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Packet packet, List<Object> list) throws Exception {
ByteBuf byteBuf = channelHandlerContext.alloc().ioBuffer();
PacketCodeC.INSTANCE.encode(byteBuf, packet);
list.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
list.add(PacketCodeC.INSTANCE.decode(byteBuf));
}
}
这个编解码的类也可以做成单例模式,这里用了我们之前的PacketCodeC
这个类. 然后把这个编解码的类添加到服务端就好了1
2
3
4
5
6
7
8
9
10ch.pipeline()
.addLast(new Spliter())
.addLast(PacketCodecHandler.INSTANCE)
.addLast(LoginRequestHandler.INSTANCE)
.addLast(AuthHandler.INSTANCE)
.addLast(MessageRequestHandler.INSTANCE)
.addLast(CreateGroupRequestHandler.INSTANCE)
.addLast(LogoutRequestHandler.INSTANCE);
}
});
优化第三步:缩短事件传播路径
这里有一个问题, pipeline 链中,绝大部分是指令相关的handler,按现在的写法的话,等指令越来越多,我们要一个个通过addLast()
加进去,然后 handler 链越来越长,在事件传播过程中性能损耗会被逐渐放大, 下面看一下如何解决这个问题
合并平行 handler
在我们的应用中,数据过来然后解码出来之后,只会在一个handler上执行处理,我们可以把这么多的指令 handler 压缩为一个 handler, 代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20@ChannelHandler.Sharable
public class IMHandler extends SimpleChannelInboundHandler<Packet> {
public static final IMHandler INSTANCE = new IMHandler();
private Map<Byte, SimpleChannelInboundHandler<? extends Packet>>handlerMap;
private IMHandler(){
handlerMap = new HashMap<>();
handlerMap.put(Command.MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
handlerMap.put(Command.CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);
handlerMap.put(Command.LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Packet packet) throws Exception {
handlerMap.get(packet.getCommand()).channelRead(channelHandlerContext,packet);
}
}
这里就是建一个总的 handler ,然后这个handler也是无状态的,也可以做成单例模式,然后写一个map,把所有的指令处理器都加到这个map里面,然后再channelRead0()
这个方法里面通过指令找到对应的handler去做具体的处理
最后把这个总的handler加到服务端就好了,代码如下:1
2
3
4
5
6ch.pipeline()
.addLast(new Spliter())
.addLast(PacketCodecHandler.INSTANCE)
.addLast(LoginRequestHandler.INSTANCE)
.addLast(AuthHandler.INSTANCE)
.addLast(IMHandler.INSTANCE);
更改事件传播源
如果OutBound类型的handler用的比较多的话,写数据可以使用ctx.writeAndFlush()
来减短事件传播路径
ctx.writeAndFlush()
ctx.writeAndFlush()
是从pipeline链的当前节点开始往前找到第一个 outBound 类型的 handler ,把对象往前进行传播,如果这个对象确认不需要经过其他 outBound 类型的 handler 处理,就使用这个方法
上图中的这个InBound在处理完逻辑之后,调用ctx.writeAndFlush()
就可以直接一口气把对象送到 codec 中编码,然后写出去
ctx.channel().writeAndFlush()
再来看一下ctx.channel().writeAndFlush()
,它是从 pipeline 链中的最后一个 outBound 类型的 handler 开始,把对象往前进行传播,如果你确认当前创建的对象需要经过后面的 outBound 类型的 handler,那么就调用此方法
传播路径如图,ctx.channel().writeAndFlush()
会从最后一个 outBound 类型的handler往前逐个传递,路径是比ctx.channel().writeAndFlush()
长的
所以在我们的程序中,没有改造编码器之前,必须调用ctx.channel().writeAndFlush()
,但是经过改造之后,这个编码器既属于inBound类型又属于outBound类型,所以可以放到pipeline链的前面了
优化第四步: 减少阻塞主线程的操作
我们的程序中通常会涉及到数据库或者网络等一些很耗时的操作,这些操作是不能放到handler里面的,只要有一个 channel 的一个 handler 中的 channelRead0() 方法阻塞了 NIO 线程,最终都会拖慢绑定在该 NIO 线程上的其他所有的 channel
所以,对于耗时的一些逻辑,都丢到业务线程池去处理
统计程序处理时长
我们通常统计程序的处理时长就是方法的开始前后输出一个时间,然后相减,但是如果writeAndFlush()
方法在nio线程中执行的话,他是一个异步的操作,调用之后会立即返回,但是其实他是没有执行完的, 如果要判断writeAndFlush()
是否执行完毕的话,应该怎么做呢?
writeAndFlush()
方法会返回一个ChannelFuture
,可以给他添加一个监听器,Listener, 去监听writeAndFlush()
的执行结果,然后统计耗时即可
Netty 里面很多方法都是异步的操作,在业务线程中如果要统计这部分操作的时间,都需要使用监听器回调的方式来统计耗时,如果在 NIO 线程中调用,就不需要这么干
完整代码已上传github,传送门