Netty-8-pipeline 和 channelHandler

上面两个案例中,我们接收到数据包之后,都是先转成Packet类型,然后再去判断是哪种类型,最后做出相应的处理,这里就会有一个问题,随着业务的增加,channelRead()中需要判断的类型越来越多,就需要写很多的if-else来判断,这个方法的代码就会越来越多

另外,每次发指令数据包都要手动调用编码器编码成ByteBuf,对于这类场景的编码优化,我们能想到的办法自然是模块化处理,不同的逻辑放置到单独的类来处理,最后将这些逻辑串联起来,形成一个完整的逻辑处理链.

Netty 中的 pipeline 和 channelHandler 正是用来解决这个问题的:它通过责任链设计模式来组织代码逻辑,并且能够支持逻辑的动态添加和删除,Netty 能够支持各类协议的扩展,比如 HTTP,Websocket,Redis,靠的就是 pipeline 和 channelHandler

pipeline 和 channelHandler

Pipeline与ChannelHandler的构成

netty整个框架中,一个连接对应一个Channel,这条Channel所有的处理逻辑都在一个ChannelPipeline对象里面,ChannelPipeline是一个双向链表结构,和Channel是一对一的关系

ChannelPipeline里面每个节点都是一个ChannelHandlerContext对象,这个对象能够拿到和 Channel 相关的所有的上下文信息,然后这个对象包着一个重要的对象,那就是逻辑处理器 ChannelHandler

channelHandler

先来看一下channelHandler的类图
image

可以看到channelHandler有两大子接口,分别是ChannelInboundHandlerChannelOutBoundHandler

  • ChannelInboundHandler: 这个接口主要是用来处理读数据的逻辑,比如,我们在一端读到一段数据,首先要解析这段数据,然后对这些数据做一系列逻辑处理,最终把响应写到对端,在开始组装响应之前的所有的逻辑,都可以放置在 ChannelInboundHandler 里处理,它的一个最重要的方法就是 channelRead().

  • ChannelOutBoundHandler: 这个接口是用来处理写数据的逻辑,它是定义我们一端在组装完响应之后,把数据写到对端的逻辑,比如,我们封装好一个 response 对象,接下来我们有可能对这个 response 做一些其他的特殊逻辑,然后,再编码成 ByteBuf,最终写到对端,它里面最核心的一个方法就是 write().

这两个接口分别有对应的默认实现,ChannelInboundHandlerAdapterChanneloutBoundHandlerAdapter它们分别实现了两大接口的所有功能,默认情况下会把读写事件传播到下一个 handler

ChannelInboundHandler 的事件传播

下面通过一个案例看一下ChannelInboundHandler的事件传播

新建三个ChannelInboundHandler,分别是InBoundHandlerA,InBoundHandlerB,InBoundHandlerC,代码分别如下:

1
2
3
4
5
6
7
8
9
10
@Slf4j
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("InBoundHandlerA: {}", msg);
super.channelRead(ctx, msg);
}

}

1
2
3
4
5
6
7
8
9
10
@Slf4j
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("InBoundHandlerB: {}", msg);
super.channelRead(ctx, msg);
}

}
1
2
3
4
5
6
7
8
9
10
@Slf4j
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("InBoundHandlerC: {}", msg);
super.channelRead(ctx, msg);
}

}

建好都重写一下channelRead()方法,打印当前的handler信息,然后调用父类的channelRead()方法.

然后是NettyServer中把这三个类加进去,NettyServer.java中部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
log.info("取出childAttr属性:{}", ch.attr(CommonConfig.CLIENT_KEY).get());

// ch.pipeline().addLast(new ServerHandler());
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());

}
});

这里就是通过addLast()方法去添加inBoundHandler,然后顺序是A->B->C,最后启动服务端和客户端,看一下服务端的控制台输出,如下:
image
这里输出的顺序和我们addLast()的顺序是一样的

在每个inBoundHandler中的channelRead()方法最后都调用了父类的channelRead(),而父类的channelRead()会自动调用下一个inBoundHandlerchannelRead()方法,并且会把当前inBoundHandler里处理完毕的对象传递到下一个inBoundHandler

结论

inBoundHandler的执行顺序与我们通过addLast()方法添加的顺序保持一致

ChannelOutboundHandler 的事件传播

和上面的例子一样,这次我们先新建三个outBoundHandler,分别是OutBoundHandlerA,OutBoundHandlerB,OutBoundHandlerC,然后都继承ChannelOutboundHandlerAdapter,重写write()方法,write()方法中输出以下当前的信息,然后调用父类的write()方法,具体代码如下:

1
2
3
4
5
6
7
8
9
10
@Slf4j
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("OutBoundHandlerA: {}", msg);
super.write(ctx, msg, promise);
}

}

1
2
3
4
5
6
7
8
9
@Slf4j
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("OutBoundHandlerB: {}", msg);
super.write(ctx, msg, promise);
}
}
1
2
3
4
5
6
7
8
9
10
@Slf4j
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("OutBoundHandlerC: {}", msg);
super.write(ctx, msg, promise);
}

}

然后再NettyServer中把这三个加进去,部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
log.info("取出childAttr属性:{}", ch.attr(CommonConfig.CLIENT_KEY).get());

// ch.pipeline().addLast(new ServerHandler());
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());

ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});

最后,要想触发outBoundHandler中的write()方法,就必须向客户端写数据, 所以我们修改一下InBoundHandlerC中的channelRead(),最后想客户端写数据,代码如下:

1
2
3
4
5
6
7
8
9
10
@Slf4j
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("InBoundHandlerC: {}", msg);
ctx.channel().writeAndFlush(msg);
}

}

然后启动服务端,客户端看一下控制台输出,如下:
image
我们通过addLast()方法添加的顺序是A->B->C,然后最后控制台输出的顺序是相反的

结论

outBoundHandler的执行顺序与我们通过addLast()方法添加的顺序相反

传递事件其实说白了就是把本 handler 的处理结果传递到下一个 handler 继续处理

pipeline 的结构及执行顺序

结构图如下:
image
不管我们定义的是哪种类型的 handler, 最终它们都是以双向链表的方式连接,这里实际链表的节点是 ChannelHandlerContext

执行顺序图如下:
image
虽然两种类型的 handler 在一个双向链表里,但是这两类 handler 的分工是不一样的,inBoundHandler 的事件通常只会传播到下一个 inBoundHandler,outBoundHandler 的事件通常只会传播到下一个 outBoundHandler,两者相互不受干扰.

完整代码已上传到github, 传送门