Spring Cloud-22-Spring Cloud Stream整合RabbitMQ(上)

Spring Cloud Stream是一个构建消息驱动的微服务框架.它构建在Spring Boot之上,用以创建工业级的应用程序,并且通过Spring Integration提供了和消息代理的连接.

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现(目前仅支持RabbitMQ和Kafka),同时引入了发布订阅,消费组和分区的语义概念.

下面来通过一个简单的消息收发,看一下Spring Cloud Stream的基本用法.

整合RabbitMQ

新建工程

新建一个工程,名称是stream-rabbitmq

引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

spring-cloud-starter-stream-rabbit是Spring Cloud Stream对RabbitMQ的封装,这里包含了对RabbitMQ的默认配置,比如连接的ip默认是localhost,默认端口是5672,默认用户是guest,如果我们需要修改的话,还是和上文一样,在配置文件中配置就可以了

修改配置

如果是用的默认配置的话,就不用修改,否则,就改成自己的配置, 直接把上篇文章中的配置复制过来就好,具体如下:

1
2
3
4
5
6
7
8
9
10
spring:
application:
name: stream-rabbitmq
rabbitmq:
host: 172.16.12.3
port: 5672
username: admin
password: admin
server:
port: 2010

创建消息消费者

创建一个消费者用来接收消息,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @author 周泽
* @date Create in 16:34 2019/4/8
* @Description 消息消费者
*/
@EnableBinding(Sink.class)
@Slf4j
public class SinkReceiver {

@StreamListener(Sink.INPUT)
public void receive(Object playload) {
log.info("收到消息:{}", playload);
}
}

@EnableBinding: 这个注解用来实现对消息通道的绑定,这个注解中还传了一个参数Sink.class,Sink是一个接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义.

然后是receive()方法上面的@StreamListener(Sink.INPUT)注解,这个注解表示该方法是消息中间件上数据流的监听器,Sink.INPUT,表示这是input消息通道上的监听处理器.

测试

上面的东西都搞好之后,启动项目,在日志中可以看到以下内容:
image

表示我们的工程通过admin用户创建了一个指向rabbitmq的连接,这时打开mq的web管理页面,查看队列,就可以看到他创建的这个队列了

image

然后点击这个队列,找到Publish message选项,推送一条消息.

image

推送完成之后,查看控制台输出
image

这里已经收到了消息,只是消息没有序列化而已