Netty-3-案例-客户端服务端通信

前面已经了解了netty服务端和客户端的启动流程,本文将通过一个小案例,来了解服务端和客户端是如何通信的

案例功能是,客户端连接成功后,向服务端写一段数据,服务端收到数据之后打印,并向客户端回一段数据,具体实现流程如下:

客户端发送数据到服务端

上文中有说,客户端相关的数据读写是通过Bootstraphandler()方法指定,handler()中创建了一个匿名类ChannelInitializer,重写了initChannel()方法,然后我们现在在这个方法里面加一个逻辑处理器,这个处理器的作用就是负责向服务端写数据,部分代码如下:

1
2
3
4
5
6
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FirstClientHandler());
}
});

ch.pipeline()返回的是和这条连接相关的逻辑处理链,采用了责任链模式,然后调用addLast()方法添加逻辑处理器FirstClientHandler.

FirstClientHandler的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
public class FirstClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(new Date() + "客户端写出数据");

// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx);

// 2. 写数据
ctx.channel().writeAndFlush(buffer);
}


private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 获取二进制抽象
ByteBuf buffer = ctx.alloc().buffer();

// 准备数据,并指定编码格式
byte[] bytes = "hello,world".getBytes(Charset.forName("utf-8"));

// 填充数据到ByteBuf
buffer.writeBytes(bytes);

return buffer;
}
}

这个逻辑处理器继承自ChannelInboundHandlerAdapter,然后覆盖了channelActive()方法,这个方法会在客户端连接建立成功之后被调用

客户端连接建立成功之后,调用到channelActive()方法,在这个方法里面,我们编写向服务端写数据的逻辑

写数据的逻辑分为两步,首先我们需要获取一个 netty 对二进制数据的抽象ByteBuf,上面代码中,ctx.alloc() 获取到一个ByteBuf的内存管理器,这个 内存管理器的作用就是分配一个ByteBuf,然后我们把字符串的二进制数据填充到ByteBuf,这样我们就获取到了 Netty 需要的一个数据格式,最后我们调用ctx.channel().writeAndFlush()把数据写到服务端

Netty里面的数据是以ByteBuf为单位的,所有需要写出/读取的数据都得塞到一个ByteBuf,下面在来看看服务端读取数据的流程

服务端读取客户端数据

服务端相关数据处理逻辑在ServerBootstrapchildHandler()方法中,该方法中也声明了一个匿名类ChannelInitializer,然后重写了initChannel()方法,同样的也是在这个方法里面加一个逻辑处理器,负责读取客户端传过来的数据,部分代码如下:

1
2
3
4
5
6
7
8
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
log.info("取出childAttr属性:{}", ch.attr(CommonConfig.CLIENT_KEY).get());

ch.pipeline().addLast(new FirstServerHandler());
}
});

这里方法里面的逻辑是和客户端一样的,先获取服务端关于这条连接的逻辑处理链pipeline,然后添加一个逻辑处理器FirstServerHandler,用来读取客户端发过来的数据

FirstServerHandler的代码如下:

1
2
3
4
5
6
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;

log.info(new Date() + "服务端读取数据:{}", byteBuf.toString(Charset.forName("utf-8")));
}

这个逻辑处理器跟客户端的不同,这里是重写的channelRead()方法,这个方法在接收到客户端发来的数据之后会被回调

这里的msg就是 Netty 里面数据读写的载体,这里需要进行一次强转,转成ByteBuf类型,然后用toString()方法就能获取到发送过来的数据

到现在,客户端发送消息和服务端接收消息都完成了,最后就剩下的是服务端给客户端发送消息了

这里跟客户端发送消息的逻辑是一样的,先创建一个ByteBuf,然后填充二进制数据,最后调用writeAndFlush()方法写出去.

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class FirstServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;

log.info(new Date() + "服务端读取数据:{}", byteBuf.toString(Charset.forName("utf-8")));

// 给客户端返回消息
log.info(new Date() + "服务端发送数据.....");

ByteBuf outBuf = getByteBuf(ctx);
ctx.channel().writeAndFlush(outBuf);
}


private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 获取二进制抽象
ByteBuf buffer = ctx.alloc().buffer();

// 准备数据,并指定编码格式
byte[] bytes = "hello,world from server".getBytes(Charset.forName("utf-8"));

// 填充数据到ByteBuf
buffer.writeBytes(bytes);

return buffer;
}

}

就是服务端在收到客户端的消息之后,给客户端再发一条数据就ok,发送消息的流程是和客户端的是一样的

测试

上面的都搞定之后,先后运行服务端和客户端,然后看一下控制台输出

客户端控制台输出如下:

1
2
10:39:04.141 [nioEventLoopGroup-2-1] INFO com.example.netty.netty.NettyClient - 连接成功
10:39:04.147 [nioEventLoopGroup-2-1] INFO com.example.netty.handler.FirstClientHandler - Sat Jun 01 10:39:04 CST 2019客户端写出数据

然后服务端控制台输出如下:

1
2
3
4
5
6
7
8
9
10
11
10:38:53.238 [nioEventLoopGroup-2-1] INFO com.example.netty.netty.NettyServer - 服务端启动中...............
10:38:53.243 [nioEventLoopGroup-2-1] INFO com.example.netty.netty.NettyServer - 端口:8000>绑定成功
10:39:04.155 [nioEventLoopGroup-3-1] INFO com.example.netty.netty.NettyServer - 取出childAttr属性:clientValue
10:39:04.219 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
10:39:04.220 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
10:39:04.227 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
10:39:04.228 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
10:39:04.252 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
10:39:04.256 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@6e9133c7
10:39:04.270 [nioEventLoopGroup-3-1] INFO com.example.netty.handler.FirstServerHandler - Sat Jun 01 10:39:04 CST 2019服务端读取数据:hello,world
10:39:04.270 [nioEventLoopGroup-3-1] INFO com.example.netty.handler.FirstServerHandler - Sat Jun 01 10:39:04 CST 2019服务端发送数据.....

总结

  • 客户端服务端的逻辑处理都是在启动的时候,通过逻辑处理链pipeline添加逻辑处理器,去处理逻辑
  • 客户端连接成功后,会调用逻辑处理器的channelActive()方法
  • 不管是服务端还是客户端,收到数据都会调用逻辑处理器的channelRead()方法
  • 写数据通过调用channel.writeAndFlush()方法,需要把发送的数据塞到ByteBuf对象然后传进去

总体逻辑如下图:
image

图片来源,侵删

本文完整代码已上传gitHub,传送门