Spring Cloud-10-Hystrix请求合并

在微服务的架构中,各个模块是通过互相请求来进行通信的,在高并发的情况下,通信次数的增加会导致总的通信时间增加,同时线程池的资源也是有限的,高并发的环境下,会有大量的线程处于等待状态,进而导致了响应延迟,要解决这些问题就可以使用Hystrix的请求合并.

具体原理是怎样的呢? Hystrix中的请求合并,就是利用一个合并处理器,把对同一个服务连续发起的几个请求,合并成一个请求去发送(连续请求的时间窗默认是10ms).

那么接下来就来看一下,如何实现请求合并?

第一种方式

服务提供者接口修改

服务提供者的controller中,添加两个方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@GetMapping("/dc/{id}")
public String dc(@PathVariable Long id) throws InterruptedException {
String services = "Services: " + discoveryClient.getServices();

log.info(services);
log.info("-------------/dc/{}", id);

if (id.equals(1L)){
return "id是1的返回值";
} else if (id.equals(2L)) {
return "id是2的返回值";
}

return services;
}

@GetMapping("/test")
public List<String> test(String ids) throws InterruptedException {

log.info("-------------/test,参数:{}", ids);

List<String> list = new ArrayList<>(3);
list.add("Java");
list.add("Spring Cloud");
list.add("dubbo");

return list;
}

一个是根据id获取数据的接口,一个是批处理的接口, 批处理的接口中的参数ids,格式是用逗号隔开的id字符串,比如”1,2,3,4”这样的, 实际情况中,我们要根据传过来的id,找到对应的数据,组成集合,然后返回去,这里为了方便,就直接返回固定的数据了.

服务消费者修改

在消费者的HelloService中,添加以下两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 单个请求测试
* @param id id
*/
public String getByid(Long id){
return restTemplate.getForObject("http://EUREKA-CLIENT/dc/{1}", String.class, id);
}

/**
* 批处理获取集合
* @param ids ids
*/
public List<String> getList(List<Long> ids){
log.info("getList------{},Thread.currentThread().getName():{}", ids, Thread.currentThread().getName());
// RestTemplate获取的返回值是个集合的话,需要用数组接收,然后再转成集合
String[] strings = restTemplate.getForObject("http://EUREKA-CLIENT/test?ids={1}", String[].class, StringUtils.join(ids, ","));
return Arrays.asList(strings);
}

getByid()用来调用单个id的接口,getList用来调用批处理的接口, 在getList中,打印了该方法执行时,所处线程的名称,为了方便观察

在服务提供者中,批处理接口返回的信息是一个list集合,用RestTemplate请求的话,需要用一个数组去接收,然后再转一下.

修改完HelloService后,新建一个类,名称是StringBatchCommand,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @author 周泽
* @date Create in 14:49 2019/3/12
* @Description 批处理命令
*/
public class StringBatchCommand extends HystrixCommand<List<String>> {

private List<Long> ids;

private HelloService helloService;

public StringBatchCommand(List<Long> ids, HelloService helloService) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollapsingGroup")).andCommandKey(HystrixCommandKey.Factory.asKey("CollapsingKey")));
this.ids = ids;
this.helloService = helloService;
}

@Override
protected List<String> run() throws Exception {
return helloService.getList(ids);
}
}

这个类和之前的StringCommand类是差不多的,都是继承自HystrixCommand类,用来处理合并之后的请求,在run方法中调用HelloService的getList方法.

完了之后呢,还需要一个合并请求的类,名称是StringCollapseCommand,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* @author 周泽
* @date Create in 14:58 2019/3/12
* @Description 请求合并实现类
*/
@Slf4j
public class StringCollapseCommand extends HystrixCollapser<List<String>, String, Long> {

private Long id;

private HelloService helloService;

public StringCollapseCommand(HelloService helloService, Long id){
// 设置了请求时间窗为1000ms,即请求时间间隔在100ms之内的请求会被合并为一个请求。
super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("stringCollapseCommand")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(1000)));
this.helloService = helloService;
this.id = id;
}

@Override
public Long getRequestArgument() {
return id;
}

/**
* 合并请求的方法
*/
@Override
protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, Long>> collection) {
List<Long> ids = new ArrayList<>(collection.size());
ids.addAll(collection.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
StringBatchCommand stringBatchCommand = new StringBatchCommand(ids, helloService);
return stringBatchCommand;
}

/**
* 为每个请求设置请求结果
*/
@Override
protected void mapResponseToRequests(List<String> strings, Collection<CollapsedRequest<String, Long>> collection) {
log.info("mapResponseToRequests....");
int count = 0;
for (CollapsedRequest<String, Long> collapsedRequest : collection) {
String str = strings.get(count++);
collapsedRequest.setResponse(str);
}
}
}

  • 构造方法: 中设置了请求的时间窗是1000ms,意思就是在1000ms之内的请求都会被合并成一个
  • createCommand()方法: 用来合并请求,在这里获取到单个请求的各个id,然后放到一个集合中去,然后创建出一个StringBatchCommand对象,用该对象去发起一个批量请求.
  • mapResponseToRequests()方法: 主要用来为每个请求设置请求结果,该方法的第一个参数就是批处理的请求结果.第二个参数collapsedRequests代表了每一个被合并的请求. 然后通过遍历请求结果,为collapsedRequests 来设置请求结果.

总结一下就是,构造设置请求时间窗,然后合并请求,用批处理的对象去调用服务,然后遍历返回值为每个请求设置返回值.
以上都搞完之后,就可以去测试了.

测试

在消费者的controller中添加以下接口,用来测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@GetMapping("/test3")
public void test3() throws ExecutionException, InterruptedException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();

StringCollapseCommand sc1 = new StringCollapseCommand(helloService, 1L);
StringCollapseCommand sc2 = new StringCollapseCommand(helloService, 2L);
StringCollapseCommand sc3 = new StringCollapseCommand(helloService, 3L);
StringCollapseCommand sc4 = new StringCollapseCommand(helloService, 4L);

Future<String> q1 = sc1.queue();
Future<String> q2 = sc2.queue();
Future<String> q3 = sc3.queue();

String str1 = q1.get();
String str2 = q2.get();
String str3 = q3.get();

Thread.sleep(3000);

Future<String> q4 = sc4.queue();
String str4 = q4.get();

log.info("str1:{}", str1);
log.info("str2:{}", str2);
log.info("str3:{}", str3);
log.info("str4:{}", str4);

context.close();
}

来看一下这个接口:
第一步,需要实例化HystrixRequestContext.
第二步,创建StringCollapseCommand类的示例,来发起请求,我们创建了四个请求,然后先是发送了前三个,然后线程休眠了3秒钟, 而我们请求合并的时间窗是1秒,所以在发送第四个请求的时候,是不会和前三个请求合并的,是单独的一个线程去处理的.

启动服务,访问http://localhost:9000/test3,然后看下控制台打印.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2019-03-20 11:42:41.014  INFO 17304 --- [llapsingGroup-1] c.eureka.consumer.service.HelloService   : getList------[1, 2, 3],Thread.currentThread().getName():hystrix-CollapsingGroup-1
2019-03-20 11:42:41.055 INFO 17304 --- [llapsingGroup-1] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@2a7d0649: startup date [Wed Mar 20 11:42:41 CST 2019]; parent: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@55120f99
2019-03-20 11:42:41.128 INFO 17304 --- [llapsingGroup-1] f.a.AutowiredAnnotationBeanPostProcessor : JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
2019-03-20 11:42:41.422 INFO 17304 --- [llapsingGroup-1] c.netflix.config.ChainedDynamicProperty : Flipping property: EUREKA-CLIENT.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2019-03-20 11:42:41.473 INFO 17304 --- [llapsingGroup-1] c.n.u.concurrent.ShutdownEnabledTimer : Shutdown hook installed for: NFLoadBalancer-PingTimer-EUREKA-CLIENT
2019-03-20 11:42:41.513 INFO 17304 --- [llapsingGroup-1] c.netflix.loadbalancer.BaseLoadBalancer : Client: EUREKA-CLIENT instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=EUREKA-CLIENT,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
2019-03-20 11:42:41.523 INFO 17304 --- [llapsingGroup-1] c.n.l.DynamicServerListLoadBalancer : Using serverListUpdater PollingServerListUpdater
2019-03-20 11:42:41.562 INFO 17304 --- [llapsingGroup-1] c.netflix.config.ChainedDynamicProperty : Flipping property: EUREKA-CLIENT.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2019-03-20 11:42:41.566 INFO 17304 --- [llapsingGroup-1] c.n.l.DynamicServerListLoadBalancer : DynamicServerListLoadBalancer for client EUREKA-CLIENT initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=EUREKA-CLIENT,current list of Servers=[localhost:8081],Load balancer stats=Zone stats: {defaultzone=[Zone:defaultzone; Instance count:1; Active connections count: 0; Circuit breaker tripped count: 0; Active connections per server: 0.0;]
},Server stats: [[Server:localhost:8081; Zone:defaultZone; Total Requests:0; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 08:00:00 CST 1970; First connection made: Thu Jan 01 08:00:00 CST 1970; Active Connections:0; total failure count in last (1000) msecs:0; average resp time:0.0; 90 percentile resp time:0.0; 95 percentile resp time:0.0; min resp time:0.0; max resp time:0.0; stddev resp time:0.0]
]}ServerList:org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList@6df18049
2019-03-20 11:42:41.844 INFO 17304 --- [llapsingGroup-1] c.e.c.hystrix.StringCollapseCommand : mapResponseToRequests....
2019-03-20 11:42:42.528 INFO 17304 --- [erListUpdater-0] c.netflix.config.ChainedDynamicProperty : Flipping property: EUREKA-CLIENT.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2019-03-20 11:42:44.920 INFO 17304 --- [llapsingGroup-2] c.eureka.consumer.service.HelloService : getList------[4],Thread.currentThread().getName():hystrix-CollapsingGroup-2
2019-03-20 11:42:44.928 INFO 17304 --- [llapsingGroup-2] c.e.c.hystrix.StringCollapseCommand : mapResponseToRequests....
2019-03-20 11:42:44.928 INFO 17304 --- [nio-9000-exec-2] c.e.consumer.controller.DcController : str1:Java
2019-03-20 11:42:44.928 INFO 17304 --- [nio-9000-exec-2] c.e.consumer.controller.DcController : str2:Spring Cloud
2019-03-20 11:42:44.929 INFO 17304 --- [nio-9000-exec-2] c.e.consumer.controller.DcController : str3:dubbo
2019-03-20 11:42:44.929 INFO 17304 --- [nio-9000-exec-2] c.e.consumer.controller.DcController : str4:Java

重点是这几条:

1
2
3
4
5
6
7
8
2019-03-20 11:42:41.014  INFO 17304 --- [llapsingGroup-1] c.eureka.consumer.service.HelloService   : getList------[1, 2, 3],Thread.currentThread().getName():hystrix-CollapsingGroup-1
2019-03-20 11:42:41.844 INFO 17304 --- [llapsingGroup-1] c.e.c.hystrix.StringCollapseCommand : mapResponseToRequests....




2019-03-20 11:42:44.920 INFO 17304 --- [llapsingGroup-2] c.eureka.consumer.service.HelloService : getList------[4],Thread.currentThread().getName():hystrix-CollapsingGroup-2
2019-03-20 11:42:44.928 INFO 17304 --- [llapsingGroup-2] c.e.c.hystrix.StringCollapseCommand : mapResponseToRequests....

上面是第一次请求,id是1,2,3,也就是说,前三个请求被合并成了一个请求,下面的是请求id是4,最后一个请求是单独去发送的.

注解方式实现请求合并

上面这种方式呢,虽然是实现了请求合并,但是写着有点蛋疼,太多了,我们也可以通过注解来实现请求合并,具体实现方式如下:

修改HelloService

添加以下两个方法:

1
2
3
4
5
6
7
8
9
10
11
@HystrixCollapser(batchMethod = "test2", collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds" ,value = "1000")})
public Future<String> test(Long id){
return null;
}

@HystrixCommand
public List<String> test2(List<Long> ids){
log.info("test2------{},Thread.currentThread().getName():{}", ids, Thread.currentThread().getName());
String[] strings = restTemplate.getForObject("http://EUREKA-CLIENT/test?ids={1}", String[].class, StringUtils.join(ids, ","));
return Arrays.asList(strings);
}

test方法上添加注解@HystrixCollapser,用来实现请求合并,用batchMethod属性来指定把请求合并之后的处理方法,collapserProperties属性用来指定其他属性,比如上面设置了时间窗是1000ms.

test2方法就是用来等请求合并完了去发送请求,接收返回值的.

测试

在controller中添加如下方法用来测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@GetMapping("/test4")
public void test4() throws ExecutionException, InterruptedException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();


Future<String> q1 = helloService.test(1L);
Future<String> q2 = helloService.test(2L);
Future<String> q3 = helloService.test(3L);

String str1 = q1.get();
String str2 = q2.get();
String str3 = q3.get();

Thread.sleep(3000);

Future<String> q4 = helloService.test(4L);
String str4 = q4.get();

log.info("str1:{}", str1);
log.info("str2:{}", str2);
log.info("str3:{}", str3);
log.info("str4:{}", str4);

context.close();
}

ok, 添加完之后启动项目,访问http://localhost:9000/test4,测试.

控制台输出:

1
2
3
2019-03-21 16:06:49.312  INFO 8548 --- [-HelloService-1] c.eureka.consumer.service.HelloService   : test2------[1, 2, 3],Thread.currentThread().getName():hystrix-HelloService-1

2019-03-21 16:06:53.239 INFO 8548 --- [-HelloService-2] c.eureka.consumer.service.HelloService : test2------[4],Thread.currentThread().getName():hystrix-HelloService-2

效果和上面是一样的, 但是这样写就很舒服.

总结

请求合并呢,就是将多个请求合并成一个请求去一次性处理,可以有效节省网络带宽和线程池资源,但是同时也会引发其他的问题,比如说一个请求用5ms就搞定了,但是我们请求时间窗设置的10ms,那么就还得等10ms看看还有没有别的请求,那么这个请求总共耗时就是15ms,不过,如果我们要发起的命令本身就是一个高延迟的命令,那么这个时候就可以使用请求合并了,因为这个时候时间窗的时间消耗就显得微不足道了,另外高并发也是请求合并的一个非常重要的场景.