Netty-9-案例-构建客户端与服务端pipeline

上文中,对pipelinechannelHandler和有了一个基本的了解,以及两种channelHandler的执行顺序.

在之前的案例中我们都是用if-else去判断数据包的类型, 那本文将通过使用channelHandler的方式,重构之前的代码

Netty 内置了很多开箱即用的 ChannelHandler.下面,我们通过一些 Netty 内置的 ChannelHandler 来逐步构建我们的 pipeline

ChannelInboundHandlerAdapter 与 ChannelOutboundHandlerAdapter

这两个适配器是比较常用的分别用于实现ChannelInboundHandlerChannelOutboundHandler接口的,这样我们继承他们之后只需要重写我们关心的方法

这两个类我们一般重写的就是ChannelInboundHandlerchannelRead()方法和ChannelOutboundHandlerwrite()方法.

分别来看一下他们的源码

ChannelInboundHandlerchannelRead()方法代码如下:
image
这个方法的作用就是接收上一个的handler的输出,这里的msg就是上个handler的输出,默认情况下会通过fireChannelRead()这个方法直接把上一个handler的输出结果传递到下一个handler

再来看一下ChannelOutboundHandlerwrite()的源码:

image

默认情况下也会把对象传递给下一个outBound节点,只不过传播顺序和上面的inboundHandler是相反的

ByteToMessageDecoder

我们往pipeline添加的第一个handler中的channelRead()方法中,msg对象其实就是ByteBuf,服务端在接收到数据后需要先把msg强转成ByteBuf,然后再解码,转成java对象

每一次收到msg,都需要解码,这就需要一大堆重复的代码,所以netty提供了一个父类ByteToMessageDecoder,来专门做解码这个事儿,下面看一下具体的用法

新建类PacketDecoder,代码如下:

1
2
3
4
5
6
7
8
public class PacketDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
list.add(PacketCodeC.INSTANCE.decode(byteBuf));
}

}

这里继承ByteToMessageDecoder然后重写decode方法,这里的参数传过来直接就是ByteBuf对象了,所以我们不需要强转了,只要把解码后的对象塞到这个list里面就好了,就可以自动实现结果往下一个 handler 进行传递,这样,我们就实现了解码的逻辑 handler

我们用的netty版本是4.1.6.Final,默认情况下ByteBuf是用的堆外内存,之前有说过这个内存需要去手动释放的,但是我们之前的操作都没有去释放这个内存,随着程序的运行,就可能造成内存泄露, 但是我们在使用ByteToMessageDecoder的情况下,就完全不用担心这个问题了,Netty会自动进行内存的释放

SimpleChannelInboundHandler

我们之前处理数据包的时候,需要写一大堆的if-else去判断是哪种类型的数据包,从而做出相应的处理

然后我们又说了可以通过给pipeline添加多个inBoundHandler来解决这个问题,大致逻辑如下:

1
2
3
4
5
if (packet instanceof XXXPacket) {
// ...处理
} else {
ctx.fireChannelRead(packet);
}

这里就又有一个问题, 每个handler中都要去写一个if-else,然后还要手动去传递无法处理的对象(上面else中的代码)这也是一大堆的重复代码, 所以netty抽象出了一个SimpleChannelInboundHandler对象,类型判断和对象传递的活都自动帮我们实现了,而我们可以专注于处理我们所关心的指令即可,下面来看一下具体如何使用

新建类LoginRequestHandler,代码如下:

1
2
3
4
5
6
7
8
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginRequestPacket loginRequestPacket) throws Exception {
// 具体登录的逻辑
}

}

我们只需要去继承SimpleChannelInboundHandler,然后给定一个泛型,处理那种数据包就给定哪种泛型就好,然后重写channelRead0,直接在这个里面去写业务逻辑就可以了,也不用强转,也不用手动去给下一个handler去传递对象了

MessageToByteEncoder

在我们的请求处理完之后,一般都会给客户端一个响应,我们之前写响应的时候,需要把响应的数据包先编码成ByteBuf然后再通过writeAndFlush()对象去写给客户端, 比如下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginRequestPacket loginRequestPacket) throws Exception {
// ...具体登录的逻辑
LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
loginResponsePacket.setXXX();
// ...
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);
ctx.channel().writeAndFlush(responseByteBuf);
}

}

每种数据包给响应都需要先编码,然后writeAndFlush(),这也是一段重复的代码,而且在PacketCodeC.INSTANCE.encode()这个方法里面我们还得手动创建一个ByteBuf对象

Netty 提供了一个特殊的channelHandler来专门处理编码逻辑,我们不需要每一次将响应写到对端的时候调用一次编码逻辑进行编码,也不需要自行创建ByteBuf,这个类叫做MessageToByteEncoder,从字面意思也可以看出,它的功能就是将对象转换到二进制数据,下面来看一下如何使用

新建类PacketEncoder,代码如下:

1
2
3
4
5
6
7
8
public class PacketEncoder extends MessageToByteEncoder<Packet> {

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Packet packet, ByteBuf byteBuf) throws Exception {
PacketCodeC.INSTANCE.encode(byteBuf, packet);
}

}

这里只需要继承MessageToByteEncoder然后给定一个泛型,表示给哪种类型的数据编码,然后重写encode()方法,里面再调用我们PacketCodeC类的编码方法,然后这个方法里面的代码需要修改一下,修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void encode(ByteBuf byteBuf, Packet packet) {

// 序列化java对象
byte[] bytes = Serializer.DEFAULT.serialize(packet);

// 实际编码过程
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
}

和之前的区别就是不需要我们去手动创建ByteBuf对象了,当我们向 pipeline 中添加了这个编码器之后,我们在指令处理完毕之后就只需要 writeAndFlush java 对象即可,像这样

1
2
3
4
5
6
7
8
9
10
@Slf4j
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginRequestPacket loginRequestPacket) throws Exception {
// ..省略具体登录逻辑
channelHandlerContext.channel().writeAndFlush(loginResponsePacket);
}

}

总结

上面这是一个客户端的大概逻辑,然后客户端跟服务端都差不多,结构图如下
image

对应的代码

服务端:

1
2
3
4
5
6
7
8
9
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});

客户端:

1
2
3
4
5
6
7
8
9
10
bootstrap
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginResponseHandler());
ch.pipeline().addLast(new MessageResponseHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});

完整代码已上传gitHub,传送门