述
Spring Cloud Stream是一个构建消息驱动的微服务框架.它构建在Spring Boot之上,用以创建工业级的应用程序,并且通过Spring Integration提供了和消息代理的连接.
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现(目前仅支持RabbitMQ和Kafka),同时引入了发布订阅,消费组和分区的语义概念.
下面来通过一个简单的消息收发,看一下Spring Cloud Stream的基本用法.
整合RabbitMQ
新建工程
新建一个工程,名称是stream-rabbitmq
引入依赖
1 | <dependency> |
spring-cloud-starter-stream-rabbit
是Spring Cloud Stream对RabbitMQ的封装,这里包含了对RabbitMQ的默认配置,比如连接的ip默认是localhost,默认端口是5672,默认用户是guest,如果我们需要修改的话,还是和上文一样,在配置文件中配置就可以了
修改配置
如果是用的默认配置的话,就不用修改,否则,就改成自己的配置, 直接把上篇文章中的配置复制过来就好,具体如下:1
2
3
4
5
6
7
8
9
10spring:
application:
name: stream-rabbitmq
rabbitmq:
host: 172.16.12.3
port: 5672
username: admin
password: admin
server:
port: 2010
创建消息消费者
创建一个消费者用来接收消息,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* @author 周泽
* @date Create in 16:34 2019/4/8
* @Description 消息消费者
*/
@EnableBinding(Sink.class)
@Slf4j
public class SinkReceiver {
@StreamListener(Sink.INPUT)
public void receive(Object playload) {
log.info("收到消息:{}", playload);
}
}
@EnableBinding
: 这个注解用来实现对消息通道的绑定,这个注解中还传了一个参数Sink.class
,Sink是一个接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义.
然后是receive()方法上面的@StreamListener(Sink.INPUT)
注解,这个注解表示该方法是消息中间件上数据流的监听器,Sink.INPUT
,表示这是input消息通道上的监听处理器.
测试
上面的东西都搞好之后,启动项目,在日志中可以看到以下内容:
表示我们的工程通过admin用户创建了一个指向rabbitmq的连接,这时打开mq的web管理页面,查看队列,就可以看到他创建的这个队列了
然后点击这个队列,找到Publish message选项,推送一条消息.
推送完成之后,查看控制台输出
这里已经收到了消息,只是消息没有序列化而已