Hystrix的使用
前置工作
导入POM
1 2 3 4
| <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
|
配置类启动Hystrix
1 2 3 4 5 6 7 8 9 10 11
| @SpringBootApplication @EnableEurekaClient
@EnableHystrix public class OrderApplication {
public static void main(String[] args) { SpringApplication.run(OrderApplication.class); } }
|
增加查询接口
在订单的OrderService 增加查询接口
OrderService
1 2
| @GetMapping("/query") public String query();
|
OrderServiceImpl
使用@HystrixCommand 加入Hystrix功能
1 2 3 4 5
| @HystrixCommand public String query() { System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId()); return "查询接口"; }
|
测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @RunWith(SpringRunner.class) @SpringBootTest(classes = {OrderApplication.class}) public class HystrixTest { private ExecutorService executorService = Executors.newFixedThreadPool(100); private int concurrentNumber = 15; private CountDownLatch countDownLatch = new CountDownLatch(concurrentNumber);
@Autowired private OrderService orderService;
@Test public void test() { for (int i = 0; i < concurrentNumber; i++) { executorService.execute(() -> { countDownLatch.countDown(); String value = orderService.query(); System.out.println("开始查询接口,调用线程号:" + Thread.currentThread().getId() + ",返回结果:" + value); }); } executorService.shutdown(); while (!executorService.isTerminated()) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
服务隔离策略
线程池隔离
THREAD 线程池隔离策略 独立线程接收请求,默认采用的就是线程池隔离
线程是的默认核心心线程数设置为10
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @HystrixCommand(fallbackMethod = "fallback", commandKey = "queryKeys", groupKey = "groupKey") public String query() { System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId()); return "调用查询接口..."; }
public String fallback() { System.out.println("调用快速失败方法,线程号:"+Thread.currentThread().getId()); return "调用快速失败方法..."; }
|
@HystrixCommand的写法可以写成如下形式,效果一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @HystrixCommand(fallbackMethod = "fallback", commandKey = "queryKeys", groupKey = "groupKey", commandProperties = { //线程池隔离策略 默认值 可以省略 @HystrixProperty(name = "execution.isolation.strategy", value = "THREAD") }, threadPoolKey = "queryOrderHystrixPool", threadPoolProperties = { //核心线程数设置为10 默认值 @HystrixProperty(name = "coreSize", value = "10") }) public String query() { System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId()); return "调用查询接口..."; }
|
测试结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| 调用快速失败方法,线程号:20 调用快速失败方法,线程号:21 调用快速失败方法,线程号:33 调用快速失败方法,线程号:24 调用快速失败方法,线程号:23 开始查询接口,调用线程号:21,返回结果:调用快速失败方法... 开始查询接口,调用线程号:24,返回结果:调用快速失败方法... 开始查询接口,调用线程号:33,返回结果:调用快速失败方法... 开始查询接口,调用线程号:20,返回结果:调用快速失败方法... 开始查询接口,调用线程号:23,返回结果:调用快速失败方法... 调用查询接口,线程号:50 调用查询接口,线程号:45 调用查询接口,线程号:46 调用查询接口,线程号:48 调用查询接口,线程号:52 调用查询接口,线程号:51 调用查询接口,线程号:49 调用查询接口,线程号:47 调用查询接口,线程号:53 调用查询接口,线程号:44 开始查询接口,调用线程号:31,返回结果:调用查询接口... 开始查询接口,调用线程号:19,返回结果:调用查询接口... 开始查询接口,调用线程号:27,返回结果:调用查询接口... 开始查询接口,调用线程号:32,返回结果:调用查询接口... 开始查询接口,调用线程号:29,返回结果:调用查询接口... 开始查询接口,调用线程号:28,返回结果:调用查询接口... 开始查询接口,调用线程号:26,返回结果:调用查询接口... 开始查询接口,调用线程号:25,返回结果:调用查询接口... 开始查询接口,调用线程号:30,返回结果:调用查询接口... 开始查询接口,调用线程号:22,返回结果:调用查询接口...
|
可以看到,用户线程和业务类中的线程是不一样的 ,线程池方式又重新开辟线程来执行子任务。
信号量隔离
SEMAPHORE:信号量隔离是采用一个全局变量来控制并发量,一个请求过来全局变量加 1,单加到跟配置 中的大小相等是就不再接受用户请求了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @HystrixCommand(fallbackMethod = "fallback", commandKey = "queryKeys", groupKey = "groupKey", commandProperties = { //线程池隔离策略 默认值 可以省略 @HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"), //这参数是用来控制信号量隔离级别的并发大小的 @HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "10") }) public String query() { System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId()); return "调用查询接口..."; }
public String fallback() { System.out.println("调用快速失败方法,线程号:"+Thread.currentThread().getId()); return "调用快速失败方法..."; }
|
测试结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| 调用快速失败方法,线程号:27 调用快速失败方法,线程号:22 调用快速失败方法,线程号:33 调用快速失败方法,线程号:19 调用快速失败方法,线程号:31 开始查询接口,调用线程号:19,返回结果:调用快速失败方法... 开始查询接口,调用线程号:33,返回结果:调用快速失败方法... 开始查询接口,调用线程号:31,返回结果:调用快速失败方法... 开始查询接口,调用线程号:27,返回结果:调用快速失败方法... 开始查询接口,调用线程号:22,返回结果:调用快速失败方法... 调用查询接口,线程号:29 调用查询接口,线程号:26 调用查询接口,线程号:30 调用查询接口,线程号:23 调用查询接口,线程号:28 调用查询接口,线程号:20 开始查询接口,调用线程号:20,返回结果:调用查询接口... 调用查询接口,线程号:21 开始查询接口,调用线程号:23,返回结果:调用查询接口... 调用查询接口,线程号:25 开始查询接口,调用线程号:21,返回结果:调用查询接口... 调用查询接口,线程号:24 开始查询接口,调用线程号:25,返回结果:调用查询接口... 开始查询接口,调用线程号:29,返回结果:调用查询接口... 开始查询接口,调用线程号:28,返回结果:调用查询接口... 开始查询接口,调用线程号:26,返回结果:调用查询接口... 调用查询接口,线程号:32 开始查询接口,调用线程号:24,返回结果:调用查询接口... 开始查询接口,调用线程号:30,返回结果:调用查询接口... 开始查询接口,调用线程号:32,返回结果:调用查询接口...
|
可以看到,单元测试中的线程和业务类中的线程是一样的,没有单独开启线程。
服务降级
服务降级是对服务调用过程的出现的异常的友好封装,当出现异常时,我们不希望直接把异常原样返回,所以当出现异常时我们需要对异常信息进行包装,抛一友好的信息给前端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @HystrixCommand(fallbackMethod = "fallback", commandKey = "queryKeys", groupKey = "groupKey", commandProperties = { //线程池隔离策略 默认值 可以省略 @HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"), //这参数是用来控制信号量隔离级别的并发大小的 @HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "10") }) public String query() { System.out.println("调用查询接口,线程号:" + Thread.currentThread().getId()); if (Thread.currentThread().getId() % 2 == 0) { System.out.println("出现了异常,线程号:" + Thread.currentThread().getId()); int k = 1 / 0; } return "调用查询接口..."; }
public String fallback() { System.out.println("调用降级方法,线程号:" + Thread.currentThread().getId()); return "调用降级方法..."; }
|
这里fallback是作为降级方法来用的,我们将测试用例的concurrentNumber设置为10,我们让调用方法随机报错,就会调用降级方法
测试结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| 调用查询接口,线程号:28 调用查询接口,线程号:22 调用查询接口,线程号:26 出现了异常,线程号:26 调用查询接口,线程号:20 出现了异常,线程号:20 调用查询接口,线程号:25 调用查询接口,线程号:23 调用查询接口,线程号:27 调用查询接口,线程号:24 出现了异常,线程号:24 调用查询接口,线程号:21 调用查询接口,线程号:19 出现了异常,线程号:22 出现了异常,线程号:28 开始查询接口,调用线程号:25,返回结果:调用查询接口... 开始查询接口,调用线程号:21,返回结果:调用查询接口... 开始查询接口,调用线程号:27,返回结果:调用查询接口... 开始查询接口,调用线程号:23,返回结果:调用查询接口... 开始查询接口,调用线程号:19,返回结果:调用查询接口... 调用降级方法,线程号:26 调用降级方法,线程号:20 调用降级方法,线程号:24 调用降级方法,线程号:22 调用降级方法,线程号:28 开始查询接口,调用线程号:24,返回结果:调用降级方法... 开始查询接口,调用线程号:28,返回结果:调用降级方法... 开始查询接口,调用线程号:22,返回结果:调用降级方法... 开始查询接口,调用线程号:26,返回结果:调用降级方法... 开始查询接口,调用线程号:20,返回结果:调用降级方法...
|
我们发现如果出现了异常,就会调用降级方法。
数据监控
在微服务架构中为例保证程序的可用性,防止程序出错导致网络阻塞,出现了断路器模型。断路器的状况反应了一个程序的可用性和健壮性,它是一个重要指标。Hystrix Dashboard是作为断路器状态的一个组件,提供了数据监控和友好的图形化界面
通过监控该面板,可以很直观的看到每一个服务请求在短时间内(10s)的请求量,以及成功率,失败率,耗时等信息,可以给后期的系统优化提供依据.
Hystrix 进行服务熔断时会对调用结果进行统计,比如超时数、bad 请求数、降级数、异常数等等都会有统计,那么统计的数据就需要有一个界面来展示,hystrix-dashboard 就是这么一个展示 hystrix 统计结果的服务。
Dashboard 工程搭建
新建一个hystrix-dashboard的项目
导入POM
1 2 3 4 5 6 7 8
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId> </dependency>
|
配置文件配置
配置application.properties
启动类
1 2 3 4 5 6 7 8
| @SpringBootApplication
@EnableHystrixDashboard public class HystrixDashboardApplication { public static void main(String[] args) { SpringApplication.run(HystrixDashboardApplication.class); } }
|
访问测试
访问 http://localhost:9999/hystrix
客户端配置
修改order-server 项目
导入POM文件
加入健康检查依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
|
配置文件配置
开启 health以及hystrix.stream端点的监控,主要是开放hystrix.stream访问端点,否则会报404.
1 2
| management.endpoints.web.exposure.include=health,hystrix.stream
|
启动访问测试
启动项目 访问 http://{ip}:{port}/actuator/hystrix.stream
方便复制 地址粘贴出来:http://localhost:8083/actuator/hystrix.stream
出现如下界面说明客户端已经配置好了
这里有一个细节需要注意,要访问/hystrix.stream接口,首先访问消费者(springcloud-service-consumer)工程中任意的一个带有熔断器的接口,否则直接访问/hystrix.stream接口时,会输出一连串ping: ping: ….
Dashboard 使用
访问Dashboard 的页面,在里面输入刚才客户端的hystrix.stream的端点,点击监控即可
点击监控将出现如下界面
Dashboard 参数说明
监控图表中的具体含义如下
实心圆 共有两种含义。它通过颜色的变化代表了实例的健康程度,它的健康程度从 绿色 > 黄色 > 橙色 > 红色 递减,该实心圆除了颜色的变化之外,它的大小也会根据实例的请求流量发生变化,流量越大实心圆就越大,所以通过该实心圆的展示,就可以在大量实例中快速的发现故障实例和高压力实例。
详细参数说明
集群报告状态
更多参数说明
使用postman测试
使用postman对hystrix进行处理的接口进行测试
测试时图表会进行变化
Hystrix熔断
熔断就像家里的保险丝一样,家里的保险丝一旦断了,家里就没点了,家里用电器功率高了就会导致保险丝端掉。在我们springcloud领域也可以这样理解,如果并发高了就可能触发hystrix的熔断。
熔断发生的三个必要条件
有一个统计的时间周期,滚动窗口
相应的配置属性
metrics.rollingStats.timeInMilliseconds
默认10000毫秒(10秒)
请求次数必须达到一定数量
相应的配置属性
circuitBreaker.requestVolumeThreshold
默认20次
失败率达到默认失败率
相应的配置属性
circuitBreaker.errorThresholdPercentage
默认50%的失败率
上述3个条件缺一不可,必须全部满足才能开启hystrix的熔断功能。
Jmeter压测
当我们的对一个线程池大小是100的方法压测时看看hystrix的熔断效果:
可以看到失败率超过50%时,circuit的状态是open的。
熔断器的三个状态
关闭状态
关闭状态时用户请求是可以到达服务提供方的
开启状态
开启状态时用户请求是不能到达服务提供方的,直接会走降级方法
半开状态
当hystrix熔断器开启时,过一段时间后,熔断器就会由开启状态变成半开状态。
半开状态的熔断器是可以接受用户请求并把请求传递给服务提供方的,这时候如果远程调用返回成功,那么熔断器就会有半开状态变成关闭状态,反之,如果调用失败,熔断器就会有半开状态变成开启状态。
注意:Hystrix功能建议在并发比较高的方法上使用,并不是所有方法都得使用的。
总结
一般容易将服务降级和服务熔断混在一起,为什么我会有这样的误解呢?
针对下面的情形,如图所示:
当Service A
调用Service B
,失败多次达到一定阀值,Service A
不会再去调Service B
,而会去执行本地的降级方法!
对于这么一套机制:在Spring cloud中结合Hystrix,将其称为熔断降级!
所以我当时就以为是一回事了,毕竟熔断和降级是一起发生的,而且这二者的概念太相近了!后面接触了多了,发现自己理解的还是太狭隘了,因此本文中带着点我自己的见解,大家如果有不同意见,请轻喷!毕竟还有很多人认为两者是一致的!
服务雪崩
OK,我们从服务雪崩开始讲起!假设存在如下调用链:
而此时,Service A
的流量波动很大,流量经常会突然性增加!那么在这种情况下,就算Service A
能扛得住请求,Service B
和Service C
未必能扛得住这突发的请求。
此时,如果Service C
因为抗不住请求,变得不可用。那么Service B
的请求也会阻塞,慢慢耗尽Service B
的线程资源,Service B
就会变得不可用。紧接着,Service A
也会不可用,这一过程如下图所示
如上图所示,一个服务失败,导致整条链路的服务都失败的情形,我们称之为服务雪崩。
那么,服务熔断和服务降级就可以视为解决服务雪崩的手段之一。
服务熔断
服务熔断:当下游的服务因为某种原因突然变得不可用或响应过慢,上游服务为了保证自己整体服务的可用性,不再继续调用目标服务,直接返回,快速释放资源。如果目标服务情况好转则恢复调用。Hystrix默认的超时时间为1s,请求时间超过1s,熔断次数加1。
熔断器机制
- closed:请求正常时,不使用熔断器;
- open:统计请求的失败比例,达到阀值时,打开熔断器,请求被降级处理;延时一段时候后(默认休眠时间是5S)会进入halfopen状态;默认失败比例阀值是50%,请求次数最少不低于20次,例如:一段时间内请求30次,有20失败,熔断器将会打开,默认5s内的请求都会失败,直接返回。
- halfopen:在进入该状态后会放入部分请求;判断请求是否成功,不成功,进入open状态,重新计时,进入halfopen状态;成功,进入closed状态,
服务降级
服务降级是从整个系统的负荷情况出发和考虑的,对某些负荷会比较高的情况,为了预防某些功能(业务场景)出现负荷过载或者响应慢的情况,在其内部暂时舍弃对一些非核心的接口和数据的请求,而直接返回一个提前准备好的fallback(退路)错误处理信息。这样,虽然提供的是一个有损的服务,但却保证了整个系统的稳定性和可用性。
熔断VS降级
相同点
- 目标一致:都是从可用性和可靠性出发,为了防止系统崩溃;
- 用户体验类似:最终都让用户体验到的是某些功能暂时不可用;
不同点
- 触发原因不同:服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑;
实现ThreadLocal上下文的传递
Hystrix使用命令模式将所有对外部服务的调用包装在HystrixCommand
或HystrixObservableCommand
对象中,并将该对象放在单独的线程中执行。因为调用在单独的线程中执行,因此原线程的ThreadLocal设置就失效了。
因此,为了将当前线程的ThreadLocal数值传递至Hystrix的线程中,可以参考下文。
封装Callable任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public final class DelegatingUserContextCallable<V> implements Callable<V> { private final Callable<V> delegate; private UserContext originalUserContext;
public DelegatingUserContextCallable(Callable<V> delegate, UserContext userContext) { this.delegate = delegate; this.originalUserContext = userContext; }
public V call() throws Exception { UserContextHolder.setContext(originalUserContext); try { return delegate.call(); } finally { this.originalUserContext = null; } }
public static <V> Callable<V> create(Callable<V> delegate, UserContext userContext) { return new DelegatingUserContextCallable<V>(delegate, userContext); } }
|
实现Hystrix的并发策略类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy { private final HystrixConcurrencyStrategy existingConcurrencyStrategy;
public ThreadLocalAwareStrategy( HystrixConcurrencyStrategy existingConcurrencyStrategy) { this.existingConcurrencyStrategy = existingConcurrencyStrategy; }
@Override public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize) : super.getBlockingQueue(maxQueueSize); }
@Override public <T> HystrixRequestVariable<T> getRequestVariable( HystrixRequestVariableLifecycle<T> rv) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getRequestVariable(rv) : super.getRequestVariable(rv); }
@Override public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue) : super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); }
@Override public <T> Callable<T> wrapCallable(Callable<T> callable) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy .wrapCallable(new DelegatingUserContextCallable<>(callable, UserContextHolder.getContext())) : super.wrapCallable(new DelegatingUserContextCallable<T>(callable, UserContextHolder.getContext())); } }
|
注入并发策略并进行刷新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @ConditionalOnClass(HystrixConcurrencyStrategy.class) @Configuration public class HystrixThreadLocalConfiguration {
@Autowired(required = false) private HystrixConcurrencyStrategy existingConcurrencyStrategy;
@PostConstruct public void init() { HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance() .getEventNotifier(); HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance() .getMetricsPublisher(); HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance() .getPropertiesStrategy(); HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance() .getCommandExecutionHook();
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalAwareStrategy(existingConcurrencyStrategy)); HystrixPlugins.getInstance().registerEventNotifier(eventNotifier); HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher); HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy); HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook); } }
|