述
前面已经了解了netty服务端和客户端的启动流程,本文将通过一个小案例,来了解服务端和客户端是如何通信的
案例功能是,客户端连接成功后,向服务端写一段数据,服务端收到数据之后打印,并向客户端回一段数据,具体实现流程如下:
客户端发送数据到服务端
上文中有说,客户端相关的数据读写是通过Bootstrap
的handler()
方法指定,handler()
中创建了一个匿名类ChannelInitializer
,重写了initChannel()
方法,然后我们现在在这个方法里面加一个逻辑处理器,这个处理器的作用就是负责向服务端写数据,部分代码如下:1
2
3
4
5
6.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FirstClientHandler());
}
});
ch.pipeline()
返回的是和这条连接相关的逻辑处理链,采用了责任链模式,然后调用addLast()
方法添加逻辑处理器FirstClientHandler
.
FirstClientHandler
的代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28@Slf4j
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(new Date() + "客户端写出数据");
// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx);
// 2. 写数据
ctx.channel().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 获取二进制抽象
ByteBuf buffer = ctx.alloc().buffer();
// 准备数据,并指定编码格式
byte[] bytes = "hello,world".getBytes(Charset.forName("utf-8"));
// 填充数据到ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
}
这个逻辑处理器继承自ChannelInboundHandlerAdapter
,然后覆盖了channelActive()
方法,这个方法会在客户端连接建立成功之后被调用
客户端连接建立成功之后,调用到channelActive()
方法,在这个方法里面,我们编写向服务端写数据的逻辑
写数据的逻辑分为两步,首先我们需要获取一个 netty 对二进制数据的抽象ByteBuf
,上面代码中,ctx.alloc()
获取到一个ByteBuf
的内存管理器,这个 内存管理器的作用就是分配一个ByteBuf
,然后我们把字符串的二进制数据填充到ByteBuf
,这样我们就获取到了 Netty 需要的一个数据格式,最后我们调用ctx.channel().writeAndFlush()
把数据写到服务端
Netty里面的数据是以ByteBuf为单位的,所有需要写出/读取的数据都得塞到一个ByteBuf
,下面在来看看服务端读取数据的流程
服务端读取客户端数据
服务端相关数据处理逻辑在ServerBootstrap
的childHandler()
方法中,该方法中也声明了一个匿名类ChannelInitializer
,然后重写了initChannel()
方法,同样的也是在这个方法里面加一个逻辑处理器,负责读取客户端传过来的数据,部分代码如下:1
2
3
4
5
6
7
8.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
log.info("取出childAttr属性:{}", ch.attr(CommonConfig.CLIENT_KEY).get());
ch.pipeline().addLast(new FirstServerHandler());
}
});
这里方法里面的逻辑是和客户端一样的,先获取服务端关于这条连接的逻辑处理链pipeline
,然后添加一个逻辑处理器FirstServerHandler
,用来读取客户端发过来的数据
FirstServerHandler
的代码如下:1
2
3
4
5
6@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info(new Date() + "服务端读取数据:{}", byteBuf.toString(Charset.forName("utf-8")));
}
这个逻辑处理器跟客户端的不同,这里是重写的channelRead()
方法,这个方法在接收到客户端发来的数据之后会被回调
这里的msg
就是 Netty 里面数据读写的载体,这里需要进行一次强转,转成ByteBuf
类型,然后用toString()
方法就能获取到发送过来的数据
到现在,客户端发送消息和服务端接收消息都完成了,最后就剩下的是服务端给客户端发送消息了
这里跟客户端发送消息的逻辑是一样的,先创建一个ByteBuf
,然后填充二进制数据,最后调用writeAndFlush()
方法写出去.
完整代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31@Slf4j
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info(new Date() + "服务端读取数据:{}", byteBuf.toString(Charset.forName("utf-8")));
// 给客户端返回消息
log.info(new Date() + "服务端发送数据.....");
ByteBuf outBuf = getByteBuf(ctx);
ctx.channel().writeAndFlush(outBuf);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 获取二进制抽象
ByteBuf buffer = ctx.alloc().buffer();
// 准备数据,并指定编码格式
byte[] bytes = "hello,world from server".getBytes(Charset.forName("utf-8"));
// 填充数据到ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
}
就是服务端在收到客户端的消息之后,给客户端再发一条数据就ok,发送消息的流程是和客户端的是一样的
测试
上面的都搞定之后,先后运行服务端和客户端,然后看一下控制台输出
客户端控制台输出如下:1
210:39:04.141 [nioEventLoopGroup-2-1] INFO com.example.netty.netty.NettyClient - 连接成功
10:39:04.147 [nioEventLoopGroup-2-1] INFO com.example.netty.handler.FirstClientHandler - Sat Jun 01 10:39:04 CST 2019客户端写出数据
然后服务端控制台输出如下:1
2
3
4
5
6
7
8
9
10
1110:38:53.238 [nioEventLoopGroup-2-1] INFO com.example.netty.netty.NettyServer - 服务端启动中...............
10:38:53.243 [nioEventLoopGroup-2-1] INFO com.example.netty.netty.NettyServer - 端口:8000>绑定成功
10:39:04.155 [nioEventLoopGroup-3-1] INFO com.example.netty.netty.NettyServer - 取出childAttr属性:clientValue
10:39:04.219 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
10:39:04.220 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
10:39:04.227 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
10:39:04.228 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
10:39:04.252 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
10:39:04.256 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@6e9133c7
10:39:04.270 [nioEventLoopGroup-3-1] INFO com.example.netty.handler.FirstServerHandler - Sat Jun 01 10:39:04 CST 2019服务端读取数据:hello,world
10:39:04.270 [nioEventLoopGroup-3-1] INFO com.example.netty.handler.FirstServerHandler - Sat Jun 01 10:39:04 CST 2019服务端发送数据.....
总结
- 客户端服务端的逻辑处理都是在启动的时候,通过逻辑处理链
pipeline
添加逻辑处理器,去处理逻辑 - 客户端连接成功后,会调用逻辑处理器的
channelActive()
方法 - 不管是服务端还是客户端,收到数据都会调用逻辑处理器的
channelRead()
方法 - 写数据通过调用
channel.writeAndFlush()
方法,需要把发送的数据塞到ByteBuf
对象然后传进去
总体逻辑如下图:
图片来源,侵删
本文完整代码已上传gitHub,传送门