抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Hystrix的使用

前置工作

导入POM

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>

配置类启动Hystrix

1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
@EnableEurekaClient
//开启Hystrix断路器功能
@EnableHystrix
public class OrderApplication {

public static void main(String[] args) {
SpringApplication.run(OrderApplication.class);
}
}

增加查询接口

在订单的OrderService 增加查询接口

OrderService
1
2
@GetMapping("/query")
public String query();
OrderServiceImpl

使用@HystrixCommand 加入Hystrix功能

1
2
3
4
5
@HystrixCommand
public String query() {
System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId());
return "查询接口";
}
测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {OrderApplication.class})
public class HystrixTest {
//线程池 默认100个线程
private ExecutorService executorService = Executors.newFixedThreadPool(100);
// 并发量通知执行多少个请求
private int concurrentNumber = 15;
//使用CountDownLatch进行并发请求
private CountDownLatch countDownLatch = new CountDownLatch(concurrentNumber);


@Autowired
private OrderService orderService;

@Test
public void test() {
for (int i = 0; i < concurrentNumber; i++) {
executorService.execute(() -> {
countDownLatch.countDown();
String value = orderService.query();
System.out.println("开始查询接口,调用线程号:" + Thread.currentThread().getId() + ",返回结果:" + value);
});
}
//结束并等待所以子线程结束任务否则会到这里直接退出不会等待子线程打印结果
executorService.shutdown();
while (!executorService.isTerminated()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

服务隔离策略

线程池隔离

THREAD 线程池隔离策略 独立线程接收请求,默认采用的就是线程池隔离

线程是的默认核心心线程数设置为10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@HystrixCommand(fallbackMethod = "fallback", commandKey = "queryKeys", groupKey = "groupKey")
public String query() {
System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId());
return "调用查询接口...";
}

/**
* 快速失败方法
*
* @return
*/
public String fallback() {
System.out.println("调用快速失败方法,线程号:"+Thread.currentThread().getId());
return "调用快速失败方法...";
}

@HystrixCommand的写法可以写成如下形式,效果一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@HystrixCommand(fallbackMethod = "fallback", commandKey = "queryKeys", groupKey = "groupKey",
commandProperties = {
//线程池隔离策略 默认值 可以省略
@HystrixProperty(name = "execution.isolation.strategy", value = "THREAD")
},
threadPoolKey = "queryOrderHystrixPool",
threadPoolProperties = {
//核心线程数设置为10 默认值
@HystrixProperty(name = "coreSize", value = "10")
})
public String query() {
System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId());
return "调用查询接口...";
}
测试结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
调用快速失败方法,线程号:20
调用快速失败方法,线程号:21
调用快速失败方法,线程号:33
调用快速失败方法,线程号:24
调用快速失败方法,线程号:23
开始查询接口,调用线程号:21,返回结果:调用快速失败方法...
开始查询接口,调用线程号:24,返回结果:调用快速失败方法...
开始查询接口,调用线程号:33,返回结果:调用快速失败方法...
开始查询接口,调用线程号:20,返回结果:调用快速失败方法...
开始查询接口,调用线程号:23,返回结果:调用快速失败方法...
调用查询接口,线程号:50
调用查询接口,线程号:45
调用查询接口,线程号:46
调用查询接口,线程号:48
调用查询接口,线程号:52
调用查询接口,线程号:51
调用查询接口,线程号:49
调用查询接口,线程号:47
调用查询接口,线程号:53
调用查询接口,线程号:44
开始查询接口,调用线程号:31,返回结果:调用查询接口...
开始查询接口,调用线程号:19,返回结果:调用查询接口...
开始查询接口,调用线程号:27,返回结果:调用查询接口...
开始查询接口,调用线程号:32,返回结果:调用查询接口...
开始查询接口,调用线程号:29,返回结果:调用查询接口...
开始查询接口,调用线程号:28,返回结果:调用查询接口...
开始查询接口,调用线程号:26,返回结果:调用查询接口...
开始查询接口,调用线程号:25,返回结果:调用查询接口...
开始查询接口,调用线程号:30,返回结果:调用查询接口...
开始查询接口,调用线程号:22,返回结果:调用查询接口...

可以看到,用户线程和业务类中的线程是不一样的 ,线程池方式又重新开辟线程来执行子任务。

信号量隔离

SEMAPHORE:信号量隔离是采用一个全局变量来控制并发量,一个请求过来全局变量加 1,单加到跟配置 中的大小相等是就不再接受用户请求了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@HystrixCommand(fallbackMethod = "fallback", commandKey = "queryKeys", groupKey = "groupKey",
commandProperties = {
//线程池隔离策略 默认值 可以省略
@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"),
//这参数是用来控制信号量隔离级别的并发大小的
@HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "10")
})
public String query() {
System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId());
return "调用查询接口...";
}

/**
* 快速失败方法
*
* @return
*/
public String fallback() {
System.out.println("调用快速失败方法,线程号:"+Thread.currentThread().getId());
return "调用快速失败方法...";
}

测试结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
调用快速失败方法,线程号:27
调用快速失败方法,线程号:22
调用快速失败方法,线程号:33
调用快速失败方法,线程号:19
调用快速失败方法,线程号:31
开始查询接口,调用线程号:19,返回结果:调用快速失败方法...
开始查询接口,调用线程号:33,返回结果:调用快速失败方法...
开始查询接口,调用线程号:31,返回结果:调用快速失败方法...
开始查询接口,调用线程号:27,返回结果:调用快速失败方法...
开始查询接口,调用线程号:22,返回结果:调用快速失败方法...
调用查询接口,线程号:29
调用查询接口,线程号:26
调用查询接口,线程号:30
调用查询接口,线程号:23
调用查询接口,线程号:28
调用查询接口,线程号:20
开始查询接口,调用线程号:20,返回结果:调用查询接口...
调用查询接口,线程号:21
开始查询接口,调用线程号:23,返回结果:调用查询接口...
调用查询接口,线程号:25
开始查询接口,调用线程号:21,返回结果:调用查询接口...
调用查询接口,线程号:24
开始查询接口,调用线程号:25,返回结果:调用查询接口...
开始查询接口,调用线程号:29,返回结果:调用查询接口...
开始查询接口,调用线程号:28,返回结果:调用查询接口...
开始查询接口,调用线程号:26,返回结果:调用查询接口...
调用查询接口,线程号:32
开始查询接口,调用线程号:24,返回结果:调用查询接口...
开始查询接口,调用线程号:30,返回结果:调用查询接口...
开始查询接口,调用线程号:32,返回结果:调用查询接口...

可以看到,单元测试中的线程和业务类中的线程是一样的,没有单独开启线程。

服务降级

服务降级是对服务调用过程的出现的异常的友好封装,当出现异常时,我们不希望直接把异常原样返回,所以当出现异常时我们需要对异常信息进行包装,抛一友好的信息给前端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@HystrixCommand(fallbackMethod = "fallback", commandKey = "queryKeys", groupKey = "groupKey",
commandProperties = {
//线程池隔离策略 默认值 可以省略
@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"),
//这参数是用来控制信号量隔离级别的并发大小的
@HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "10")
})
public String query() {
System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId());
if (Thread.currentThread().getId() % 2 == 0) {
System.out.println("出现了异常,线程号:" + Thread.currentThread().getId());
int k = 1 / 0;
}
return "调用查询接口...";
}

/**
* 降级方法
*
* @return
*/
public String fallback() {
System.out.println("调用降级方法,线程号:" + Thread.currentThread().getId());
return "调用降级方法...";
}

​ 这里fallback是作为降级方法来用的,我们将测试用例的concurrentNumber设置为10,我们让调用方法随机报错,就会调用降级方法

测试结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
调用查询接口,线程号:28
调用查询接口,线程号:22
调用查询接口,线程号:26
出现了异常,线程号:26
调用查询接口,线程号:20
出现了异常,线程号:20
调用查询接口,线程号:25
调用查询接口,线程号:23
调用查询接口,线程号:27
调用查询接口,线程号:24
出现了异常,线程号:24
调用查询接口,线程号:21
调用查询接口,线程号:19
出现了异常,线程号:22
出现了异常,线程号:28
开始查询接口,调用线程号:25,返回结果:调用查询接口...
开始查询接口,调用线程号:21,返回结果:调用查询接口...
开始查询接口,调用线程号:27,返回结果:调用查询接口...
开始查询接口,调用线程号:23,返回结果:调用查询接口...
开始查询接口,调用线程号:19,返回结果:调用查询接口...
调用降级方法,线程号:26
调用降级方法,线程号:20
调用降级方法,线程号:24
调用降级方法,线程号:22
调用降级方法,线程号:28
开始查询接口,调用线程号:24,返回结果:调用降级方法...
开始查询接口,调用线程号:28,返回结果:调用降级方法...
开始查询接口,调用线程号:22,返回结果:调用降级方法...
开始查询接口,调用线程号:26,返回结果:调用降级方法...
开始查询接口,调用线程号:20,返回结果:调用降级方法...

我们发现如果出现了异常,就会调用降级方法。

数据监控

​ 在微服务架构中为例保证程序的可用性,防止程序出错导致网络阻塞,出现了断路器模型。断路器的状况反应了一个程序的可用性和健壮性,它是一个重要指标。Hystrix Dashboard是作为断路器状态的一个组件,提供了数据监控和友好的图形化界面

​ 通过监控该面板,可以很直观的看到每一个服务请求在短时间内(10s)的请求量,以及成功率,失败率,耗时等信息,可以给后期的系统优化提供依据.

​ Hystrix 进行服务熔断时会对调用结果进行统计,比如超时数、bad 请求数、降级数、异常数等等都会有统计,那么统计的数据就需要有一个界面来展示,hystrix-dashboard 就是这么一个展示 hystrix 统计结果的服务。

Dashboard 工程搭建

新建一个hystrix-dashboard的项目

导入POM
1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
配置文件配置

配置application.properties

1
server.port=9999
启动类
1
2
3
4
5
6
7
8
@SpringBootApplication
//开启Hystrix仪表盘监控注解 监控地址 http://localhost:9999/hystrix
@EnableHystrixDashboard
public class HystrixDashboardApplication {
public static void main(String[] args) {
SpringApplication.run(HystrixDashboardApplication.class);
}
}
访问测试

访问 http://localhost:9999/hystrix

客户端配置

修改order-server 项目

导入POM文件

加入健康检查依赖

1
2
3
4
5
<!--健康检查依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件配置

开启 health以及hystrix.stream端点的监控,主要是开放hystrix.stream访问端点,否则会报404.

1
2
# 开启 health以及hystrix.stream端点的监控
management.endpoints.web.exposure.include=health,hystrix.stream
启动访问测试

启动项目 访问 http://{ip}:{port}/actuator/hystrix.stream

方便复制 地址粘贴出来:http://localhost:8083/actuator/hystrix.stream

出现如下界面说明客户端已经配置好了

这里有一个细节需要注意,要访问/hystrix.stream接口,首先访问消费者(springcloud-service-consumer)工程中任意的一个带有熔断器的接口,否则直接访问/hystrix.stream接口时,会输出一连串ping: ping: ….

Dashboard 使用

访问Dashboard 的页面,在里面输入刚才客户端的hystrix.stream的端点,点击监控即可

点击监控将出现如下界面

Dashboard 参数说明

监控图表中的具体含义如下

实心圆 共有两种含义。它通过颜色的变化代表了实例的健康程度,它的健康程度从 绿色 > 黄色 > 橙色 > 红色 递减,该实心圆除了颜色的变化之外,它的大小也会根据实例的请求流量发生变化,流量越大实心圆就越大,所以通过该实心圆的展示,就可以在大量实例中快速的发现故障实例和高压力实例

详细参数说明

集群报告状态

更多参数说明

使用postman测试

使用postman对hystrix进行处理的接口进行测试

测试时图表会进行变化

Hystrix熔断

熔断就像家里的保险丝一样,家里的保险丝一旦断了,家里就没点了,家里用电器功率高了就会导致保险丝端掉。在我们springcloud领域也可以这样理解,如果并发高了就可能触发hystrix的熔断。

熔断发生的三个必要条件

  1. 有一个统计的时间周期,滚动窗口

    相应的配置属性

    metrics.rollingStats.timeInMilliseconds

    默认10000毫秒(10秒)

  2. 请求次数必须达到一定数量

    相应的配置属性

    circuitBreaker.requestVolumeThreshold

    默认20次

  3. 失败率达到默认失败率

    相应的配置属性

    circuitBreaker.errorThresholdPercentage

    默认50%的失败率

上述3个条件缺一不可,必须全部满足才能开启hystrix的熔断功能。

Jmeter压测

当我们的对一个线程池大小是100的方法压测时看看hystrix的熔断效果:

可以看到失败率超过50%时,circuit的状态是open的。

熔断器的三个状态

关闭状态

​ 关闭状态时用户请求是可以到达服务提供方的

开启状态

​ 开启状态时用户请求是不能到达服务提供方的,直接会走降级方法

半开状态

​ 当hystrix熔断器开启时,过一段时间后,熔断器就会由开启状态变成半开状态。

​ 半开状态的熔断器是可以接受用户请求并把请求传递给服务提供方的,这时候如果远程调用返回成功,那么熔断器就会有半开状态变成关闭状态,反之,如果调用失败,熔断器就会有半开状态变成开启状态。

注意:Hystrix功能建议在并发比较高的方法上使用,并不是所有方法都得使用的。

总结

一般容易将服务降级服务熔断混在一起,为什么我会有这样的误解呢?
针对下面的情形,如图所示:

​ 当Service A调用Service B,失败多次达到一定阀值,Service A不会再去调Service B,而会去执行本地的降级方法!
对于这么一套机制:在Spring cloud中结合Hystrix,将其称为熔断降级!

​ 所以我当时就以为是一回事了,毕竟熔断和降级是一起发生的,而且这二者的概念太相近了!后面接触了多了,发现自己理解的还是太狭隘了,因此本文中带着点我自己的见解,大家如果有不同意见,请轻喷!毕竟还有很多人认为两者是一致的!

服务雪崩

​ OK,我们从服务雪崩开始讲起!假设存在如下调用链:

​ 而此时,Service A的流量波动很大,流量经常会突然性增加!那么在这种情况下,就算Service A能扛得住请求,Service BService C未必能扛得住这突发的请求。
​ 此时,如果Service C因为抗不住请求,变得不可用。那么Service B的请求也会阻塞,慢慢耗尽Service B的线程资源,Service B就会变得不可用。紧接着,Service A也会不可用,这一过程如下图所示

​ 如上图所示,一个服务失败,导致整条链路的服务都失败的情形,我们称之为服务雪崩。

​ 那么,服务熔断服务降级就可以视为解决服务雪崩的手段之一。

服务熔断

服务熔断:当下游的服务因为某种原因突然变得不可用或响应过慢,上游服务为了保证自己整体服务的可用性,不再继续调用目标服务,直接返回,快速释放资源。如果目标服务情况好转则恢复调用。Hystrix默认的超时时间为1s,请求时间超过1s,熔断次数加1。

熔断器机制
  • closed:请求正常时,不使用熔断器;
  • open:统计请求的失败比例,达到阀值时,打开熔断器,请求被降级处理;延时一段时候后(默认休眠时间是5S)会进入halfopen状态;默认失败比例阀值是50%,请求次数最少不低于20次,例如:一段时间内请求30次,有20失败,熔断器将会打开,默认5s内的请求都会失败,直接返回。
  • halfopen:在进入该状态后会放入部分请求;判断请求是否成功,不成功,进入open状态,重新计时,进入halfopen状态;成功,进入closed状态,

服务降级

​ 服务降级是从整个系统的负荷情况出发和考虑的,对某些负荷会比较高的情况,为了预防某些功能(业务场景)出现负荷过载或者响应慢的情况,在其内部暂时舍弃对一些非核心的接口和数据的请求,而直接返回一个提前准备好的fallback(退路)错误处理信息。这样,虽然提供的是一个有损的服务,但却保证了整个系统的稳定性和可用性。

熔断VS降级

相同点
  • 目标一致:都是从可用性和可靠性出发,为了防止系统崩溃;
  • 用户体验类似:最终都让用户体验到的是某些功能暂时不可用;
不同点
  • 触发原因不同:服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑;

实现ThreadLocal上下文的传递

​ Hystrix使用命令模式将所有对外部服务的调用包装在HystrixCommandHystrixObservableCommand对象中,并将该对象放在单独的线程中执行。因为调用在单独的线程中执行,因此原线程的ThreadLocal设置就失效了。

​ 因此,为了将当前线程的ThreadLocal数值传递至Hystrix的线程中,可以参考下文。

封装Callable任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class DelegatingUserContextCallable<V> implements Callable<V> {
private final Callable<V> delegate;
// 用户信息上下文(根据项目实际情况定义ThreadLocal上下文)
private UserContext originalUserContext;

public DelegatingUserContextCallable(Callable<V> delegate,
UserContext userContext) {
this.delegate = delegate;
this.originalUserContext = userContext;
}

public V call() throws Exception {
// 将当前的用户上下文设置进Hystrix线程的TreadLocal中
UserContextHolder.setContext(originalUserContext);
try {
return delegate.call();
}
finally {
// 执行完毕,记得清理ThreadLocal资源
this.originalUserContext = null;
}
}

public static <V> Callable<V> create(Callable<V> delegate,
UserContext userContext) {
return new DelegatingUserContextCallable<V>(delegate, userContext);
}
}

实现Hystrix的并发策略类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy {

// 最简单的方式就是引入现有的并发策略,进行功能扩展
private final HystrixConcurrencyStrategy existingConcurrencyStrategy;

public ThreadLocalAwareStrategy(
HystrixConcurrencyStrategy existingConcurrencyStrategy) {
this.existingConcurrencyStrategy = existingConcurrencyStrategy;
}

@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
: super.getBlockingQueue(maxQueueSize);
}

@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getRequestVariable(rv)
: super.getRequestVariable(rv);
}

@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue)
: super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}

@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy
.wrapCallable(new DelegatingUserContextCallable<>(callable, UserContextHolder.getContext()))
: super.wrapCallable(new DelegatingUserContextCallable<T>(callable, UserContextHolder.getContext()));
}
}

注入并发策略并进行刷新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@ConditionalOnClass(HystrixConcurrencyStrategy.class)
@Configuration
public class HystrixThreadLocalConfiguration {

@Autowired(required = false)
private HystrixConcurrencyStrategy existingConcurrencyStrategy;

@PostConstruct
public void init() {
// Keeps references of existing Hystrix plugins.
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance()
.getCommandExecutionHook();

HystrixPlugins.reset();

HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalAwareStrategy(existingConcurrencyStrategy));
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
}
}

评论