述
Spring Cloud Bus也是微服务架构中的必备组件,Spring Cloud Bus可以将分布式系统的节点与轻量级消息代理链接,然后可以实现广播状态更改(例如配置更改)或广播其他管理指令
Spring Cloud Bus就像一个分布式执行器,用于扩展的Spring Boot的应用程序,也可以作为应用程序之间的通信通道,这里就涉及到了消息代理,Spring Cloud Bus支持RabbitMQ和Kafka,下面先来看一下整合RabbitMQ
RabbitMQ的安装
首先需要一个rabbitmq的服务,可以去手动安装,或者使用docker去安装mq
整合RabbitMQ
新建工程
新建一个工程,名称是bus-rabbitmq
引入依赖
引入amqp的依赖1
2
3
4<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
修改配置
修改application.yml文件,配置如下:1
2
3
4
5
6
7
8
9
10spring:
application:
name: bus-rabbitmq
rabbitmq:
host: 172.16.12.3
port: 5672
username: admin
password: admin
server:
port: 2009
主要就是配置了rabbitmq的ip,端口,还有用户名密码
消息生产者
发送消息可以使用spring封装好的AmqpTemplate,具体生产者类如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/**
* @author 周泽
* @date Create in 15:42 2019/4/8
* @Description 消息生产者
*/
@Component
@Slf4j
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String msg = "test msg" + System.currentTimeMillis();
log.info("发送消息:{}", msg);
amqpTemplate.convertAndSend("test.queue", msg);
}
}
这里就是直接注入AmqpTemplate,然后通过convertAndSend方法,向名称为test.queue的队列中发送一个消息.
消息消费者
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16/**
* @author 周泽
* @date Create in 15:46 2019/4/8
* @Description 消息消费者
*/
@Component
@Slf4j
@RabbitListener(queues = "test.queue")
public class Consumer {
@RabbitHandler
public void process(String msg) {
log.info("消费者收到消息:{}", msg);
}
}
这里有两个注解
@RabbitListener(queues = "test.queue")
:表示要监听的队列名@RabbitHandler
:表示该方法用来处理接收到的消息
配置消息队列
上面我们用到了名称是test.queue的这个队列,需要创建一下,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* @author 周泽
* @date Create in 15:48 2019/4/8
* @Description 消息队列的配置
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue testQueue() {
return new Queue("test.queue");
}
}
测试
上面的内容都完成后, 用单元测试来测试一下,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16@RunWith(SpringRunner.class)
@SpringBootTest
public class AppTests {
@Test
public void contextLoads() {
}
@Autowired
private Sender sender;
@Test
public void sendTest(){
sender.send();
}
}
运行以后,观察日志,输出:
可以看到,这里接收发送都没有问题,再看一下mq的管理页面
我们刚刚创建的队列也创建好了