Netty-7-案例-客户端与服务端收发消息

上文中实现了客户端登录的案例,主要就是创建数据包,编码,写到服务端,然后解码,验证,再编码发送到客户端,客户端最后再解码

本文再来实现一个客户端收发消息的案例,具体功能如下:
在控制台输入一串文字,然后按回车,客户端校验登录状态,然后把这个消息传给服务端,服务端收到之后打印出来,并向客户端发送一条消息,客户端收到之后打印

其实跟上个登录的案例的流程是差不多的,下面来看一下具体的实现

收发消息数据包

上面我们在登录的时候是创建了一个数据包LoginRequestPacket,每次请求就把数据包编码发送过去,然后服务端给个响应的LoginResponsePacket数据包

这里我们收发消息也是一样的,需要两个数据包,一个客户端发送消息的数据包,一个服务端响应的数据包,代码如下:

客户端发送消息的数据包:

1
2
3
4
5
6
7
8
9
10
11
@Data
public class MessageRequestPacket extends Packet {

private String message;

@Override
public Byte getCommand() {
return Command.MESSAGE_REQUEST;
}

}

服务端响应的数据包:

1
2
3
4
5
6
7
8
9
10
11
@Data
public class MessageResponsePacket extends Packet {

private String message;

@Override
public Byte getCommand() {
return Command.MESSAGE_RESPONSE;
}

}

然后就是Command类里面的命令标识,添加以下两个:

1
2
3
4
5
6
7
8
9
/**
* 客户端发送消息命令
*/
Byte MESSAGE_REQUEST = 3 ;

/**
* 服务端响应发送消息命令
*/
Byte MESSAGE_RESPONSE = 4;

最后记得在PacketCodeC类里面的packetTypeMap这个map,把新建的这两个数据包put进去:

1
2
packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class);
packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);

登录状态记录

客户端每次发送消息的时候,都需要去判断客户端是否已经登录了,这里就需要一个登录状态的标识,这个标识可以绑定在channel里面,通过channel.attr(xxx).set(xx)的方式,然后用的时候就可以取出来判断有没有登录

所以这里先要定义一个登录成功的标识,新建Attributes接口,代码如下:

1
2
3
4
5
6
7
8
public interface Attributes {

/**
* 客户端是否登录的标识
*/
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");

}

我们之前客户端登录成功后,服务端会给是否登录成功的响应,然后在客户端登录成功的时候,去把这个标记存到channel里面,ClientHandler中,修改的部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;

// 收到服务端发来的消息后,解码
Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);

// 判断是不是登录请求数据包
if (packet instanceof LoginResponsePacket) {
LoginResponsePacket loginResponsePacket = (LoginResponsePacket) packet;

if (loginResponsePacket.getSuccess()) {
log.info("登录成功....");

// 记录登录成功的标识
LoginUtil.markAsLogin(ctx.channel());
} else {
log.info("登录失败,原因:{}", loginResponsePacket.getReason());
}

} else if (packet instanceof MessageResponsePacket) {
MessageResponsePacket messageResponsePacket = (MessageResponsePacket) packet;
log.info("收到了服务端发来的消息:{}", messageResponsePacket.getMessage());

}
}

这里是抽出来一个工具类LoginUtil,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class LoginUtil {

/**
* 标识已经登录
* @param channel
*/
public static void markAsLogin(Channel channel) {
channel.attr(Attributes.LOGIN).set(true);
}

/**
* 判断当前是否有登录的标识(只要标识存在,不管标识的值是什么)
* @param channel
* @return 是否已经登录
*/
public static boolean hasLogin(Channel channel) {
return channel.attr(Attributes.LOGIN).get() != null;
}

}

一个是存登录标识的方法,一个是判断是否已经登录的方法

客户端发送消息给服务端

在控制台输入一串文字,然后按回车,客户端校验登录状态,然后把这个消息传给服务端,需求是这样的.

所以我们在客户端连接成功之后,需要开启一个接受用户输入数据,发送到服务端的线程, 在NettyClient中,部分修改的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
log.info("连接成功");

// 启动控制台线程
Channel channel = ((ChannelFuture) future).channel();
startConsoleThread(channel);

}

// ... 省略其他代码
});
}

public static void startConsoleThread(Channel channel) {
new Thread(() -> {
// 线程没有被中断就继续循环
while (!Thread.interrupted()) {
// 判断是否已经登录
if (LoginUtil.hasLogin(channel)) {
log.info("当前客户端已经登录,请输入要发送的消息...");
// 然后接收用户输入的数据
Scanner sc = new Scanner(System.in);
String line = sc.nextLine();

// 封装到数据包里,然后转码发送给服务端
MessageRequestPacket messageRequestPacket = new MessageRequestPacket();
messageRequestPacket.setMessage(line);

// 编码
ByteBuf buffer = PacketCodeC.INSTANCE.encode(channel.alloc(), messageRequestPacket);
// 发送
channel.writeAndFlush(buffer);
}
}
}).start();
}

这里就是在客户端连接成功之后,开启一个线程然后是判断只要是登录状态,就可以在控制台输入消息,然后从控制台获取到用户输入的内容之后,转成数据包对象,再进行编码,发送

服务端处理消息

服务端收到客户端传过来的消息之后打印出来,并向客户端发送一条消息

服务端接收消息还是和之前的一样,在ServerHandlerchannelRead()方法里面,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

// 收到消息后解码
ByteBuf byteBuf = (ByteBuf) msg;

Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);

// 判断是不是登录请求数据包
if (packet instanceof LoginRequestPacket) {
// ...省略登录的代码
} else if (packet instanceof MessageRequestPacket){
// 处理客户端发过来的消息
MessageRequestPacket messageRequestPacket = (MessageRequestPacket) packet;
log.info("{}:收到客户端发过来的消息:{}", new Date(), messageRequestPacket.getMessage());

// 然后服务端发消息给客户端
MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
messageResponsePacket.setMessage("服务端收到了[" + messageRequestPacket.getMessage() + "]这条消息");
ByteBuf responseBuffer = PacketCodeC.INSTANCE.encode(ctx.alloc(), messageResponsePacket);
ctx.channel().writeAndFlush(responseBuffer);
}
}

这里我们客户端发过来的数据包是一个MessageRequestPacket类型的,所以只需要每次判断是哪种类型的数据包,根据类型做出相应的处理就ok了,最后是把响应的数据包MessageResponsePacket传给客户端

客户端接收消息

客户端需要接收服务端发过来的消息,也是在ClientHandlerchannelRead()方法里面,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;

// 收到服务端发来的消息后,解码
Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);

// 判断是不是登录请求数据包
if (packet instanceof LoginResponsePacket) {
// ...省略登录处理的代码
} else if (packet instanceof MessageResponsePacket) {
MessageResponsePacket messageResponsePacket = (MessageResponsePacket) packet;
log.info("收到了服务端发来的消息:{}", messageResponsePacket.getMessage());

}
}

这里和服务端接收消息是一毛一样的,都是先转成Packet,然后判断是哪种类型,做出相应的处理即可

测试

最后先后运行NettyServerNettyClient,然后在NettyClient的控制台输入消息,回车,看两个控制台的输出,如下:

客户端:
image

服务端:
image

总结

此案例流程图如下:
image
图片来源,侵删

完整代码已上传到github, 传送门