Spring Cloud-23-Spring Cloud Stream整合RabbitMQ(下)

上文中我们简单的使用了一下Spring Cloud Stream,我们的消息是通过rabbitmq的web管理页面去发送的,如果我们想通过代码去发送消息要怎么实现呢? 下面就来看一些Spring Cloud Stream的使用细节

自定义消息通道

上文中我们用到了Sink接口, Sink和Source这两个接口,分别定义了输入通道和输出通道, 而Processor通过继承Source和Sink,同时具有输入通道和输出通道.这里我们就模仿Sink和Source,来定义一个自己的消息通道.

MySink

在上文的项目基础上,新建一个接口,叫MySink,代码如下:

1
2
3
4
5
6
7
8
public interface MySink {

String INPUT = "mychannel";

@Input(INPUT)
SubscribableChannel input();

}

这里定义了一个消息通道,名称是mychannel,@Input注解的参数表示了消息通道的名称

同时这里定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者.

MySource

定义一个MySource接口,代码如下:

1
2
3
4
5
6
public interface MySource {

@Output(MySink.INPUT)
MessageChannel output();

}

@Output注解中描述了消息通道的名称,还是mychannel

这里也定义了一个方法,返回一个MessageChannel对象,该对象中有一个向消息通道发送消息的方法

消息接收类

最后,定义一个消息接收类来接收消息,代码如下:

1
2
3
4
5
6
7
8
9
10
@EnableBinding(value = {MySink.class})
@Slf4j
public class MySinkReceiver {

@StreamListener(MySink.INPUT)
public void receive(Object playload) {
log.info("MySinkReceiver收到消息:{}", playload);

}
}

这里就是绑定消息通道,然后监听我们自定义的消息通道

测试

上面都操作完成之后, 写个单元测试来测试一下,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableBinding(MySource.class)
@WebAppConfiguration
public class AppTests {

@Autowired
private MySource mySource;

@Test
public void contextLoads() {
mySource.output().send(MessageBuilder.withPayload("hello").build());
}

@Test
public void sendMsg() {
User user = new User(1L, "张三", "男");
mySource.output().send(MessageBuilder.withPayload(user).build());
}
}

第一个方法中是发送了一个文本的消息,第二个方法中是发送了一个对象 ,然后分别运行两个方法,控制台输出如下:
image

image

回执消息

如果我们想在消息接收成功之后给一个回执,也是可以的,修改刚才的MySinkReceiver类,代码如下:

1
2
3
4
5
6
@StreamListener(MySink.INPUT)
@SendTo(Source.OUTPUT)
public String receive(Object playload) {
log.info("MySinkReceiver收到消息:{}", playload);
return "MySinkReceiver回执消息:" + playload;
}

@SendTo(Source.OUTPUT)这个注解是定义回执发送的消息通道

方法的返回值就是回执消息,回执消息在系统默认的output通道中,所以,如果我们想接收回执消息的话,就必须要监听这个通道

创建一个ReceiptReceiver类,用来监听回执消息,代码如下:

1
2
3
4
5
6
7
8
9
10
@EnableBinding(value = {Source.class})
@Slf4j
public class ReceiptReceiver {

@StreamListener(Source.OUTPUT)
public void receive2(String msg) {
log.info("receive2 收到回执消息:{}", msg);
}

}

最后在测试类的@EnableBinding中加上Source类就可以测试了,@EnableBinding({MySource.class,Source.class}),修改好之后随便执行一个方法,然后查看控制台输出,如下:
image

消费组

我们的服务可能会有多个实例在运行,如果不做任何设置的话,发送一条消息可能被所有的实例接收到,但是有的时候我们只希望被一个实例接收,这个就可以通过消息分组来解决, 只要给项目配置消息组和主题即可,如下配置:

1
2
3
4
5
6
7
spring:
cloud:
stream:
bindings:
mychannel:
group: g1
destination: dest1

这里我们设置该工程都属于g1消费组,输入通道的主题名是dest1,以上就是消费者的配置

消息生产者中可以做出以下配置:

1
2
3
4
5
6
spring:
cloud:
stream:
bindings:
mychannel:
destination: dest1

生产者中配置的消息主题也是dest1(如果发送和接收就在同一个应用中,则这里可以不配置)

这时,我们启动项目两个实例(端口不同),再发送消息,则会被两个实例中的一个接收到,另外一个是接收不到的,但是两个实例时哪一个接收,这就不能确定了

消息分区

有的时候,我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,如果我们只是单纯的使用消费组则无法实现功能,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了,配置方式如下(这里的配置都是在消费组的配置基础上完成的):

消费者配置:

1
2
3
4
5
6
7
8
9
spring: 
cloud:
stream:
bindings:
mychannel:
consumer:
partitioned: true
instance-count: 2
instance-index: 0

  • partitioned 表示开启消息分区
  • instance-count: 表示当前消费者的总的实例个数
  • instance-index: 表示当前实例的索引, 我们启动多个实例的时候,需要启动时在命令行配置索引

然后是生产者的配置:

1
2
3
4
5
6
7
8
spring: 
cloud:
stream:
bindings:
mychannel:
producer:
partitionKeyExpression: payload
partitionCount: 2

  • partitionKeyExpression:配置分区键的表达式规则
  • partitionCount: 设置消息分区数量

此时我们再启动多个消费者实例,然后重复发送多条消息,这些消息都会被同一个消费者处理掉