Spring Cloud-21-Spring Cloud Bus整合RabbitMQ

Spring Cloud Bus也是微服务架构中的必备组件,Spring Cloud Bus可以将分布式系统的节点与轻量级消息代理链接,然后可以实现广播状态更改(例如配置更改)或广播其他管理指令

Spring Cloud Bus就像一个分布式执行器,用于扩展的Spring Boot的应用程序,也可以作为应用程序之间的通信通道,这里就涉及到了消息代理,Spring Cloud Bus支持RabbitMQ和Kafka,下面先来看一下整合RabbitMQ

RabbitMQ的安装

首先需要一个rabbitmq的服务,可以去手动安装,或者使用docker去安装mq

整合RabbitMQ

新建工程

新建一个工程,名称是bus-rabbitmq

引入依赖

引入amqp的依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

修改配置

修改application.yml文件,配置如下:

1
2
3
4
5
6
7
8
9
10
spring:
application:
name: bus-rabbitmq
rabbitmq:
host: 172.16.12.3
port: 5672
username: admin
password: admin
server:
port: 2009

主要就是配置了rabbitmq的ip,端口,还有用户名密码

消息生产者

发送消息可以使用spring封装好的AmqpTemplate,具体生产者类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @author 周泽
* @date Create in 15:42 2019/4/8
* @Description 消息生产者
*/
@Component
@Slf4j
public class Sender {

@Autowired
private AmqpTemplate amqpTemplate;

public void send(){
String msg = "test msg" + System.currentTimeMillis();

log.info("发送消息:{}", msg);

amqpTemplate.convertAndSend("test.queue", msg);

}
}

这里就是直接注入AmqpTemplate,然后通过convertAndSend方法,向名称为test.queue的队列中发送一个消息.

消息消费者

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* @author 周泽
* @date Create in 15:46 2019/4/8
* @Description 消息消费者
*/
@Component
@Slf4j
@RabbitListener(queues = "test.queue")
public class Consumer {

@RabbitHandler
public void process(String msg) {
log.info("消费者收到消息:{}", msg);

}
}

这里有两个注解

  • @RabbitListener(queues = "test.queue"):表示要监听的队列名
  • @RabbitHandler:表示该方法用来处理接收到的消息

配置消息队列

上面我们用到了名称是test.queue的这个队列,需要创建一下,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @author 周泽
* @date Create in 15:48 2019/4/8
* @Description 消息队列的配置
*/
@Configuration
public class RabbitConfig {

@Bean
public Queue testQueue() {
return new Queue("test.queue");
}

}

测试

上面的内容都完成后, 用单元测试来测试一下,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RunWith(SpringRunner.class)
@SpringBootTest
public class AppTests {

@Test
public void contextLoads() {
}

@Autowired
private Sender sender;

@Test
public void sendTest(){
sender.send();
}
}

运行以后,观察日志,输出:
image

可以看到,这里接收发送都没有问题,再看一下mq的管理页面
image

我们刚刚创建的队列也创建好了