抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

提高吞吐量架构设计

抢购打车券

先从打车优惠券说起,优惠券大奖都抢过,他的特点是抢购后直接返回给你是否抢购成功

img

整体业务流程

整体业务流程相对简单,流程图如下

image-20210120174859911

业务分析

整体业务相对来说不比较简单,但是要求请求响应必须是同步的,不能异步的获取结果

  1. 用户请求过来首先进行预扣库存
  2. 如果预扣库存成功则进行真实扣库存并且提交订单
  3. 然后对优惠券进行下单
涉及到的组件

整体业务相对来说还是比较简单的,主要涉及以下第三方组件

  • redis 服务进行预扣库存
  • mysql 真实扣库存
  • mysql 将订单插入数据库

性能目标

压测性能

我们对现在的方案进行了优化发现吞吐量在131左右,吞吐量很低我们需要进行优化

image-20210122164944638

优化目标

我们抢购用户大概有1500人,因为抢购是瞬时的,部署的微服务有4台,平均到每1台大概的并发数大概有375,我们需要每一台的并发数在400左右才能承担起来瞬时并发数,我们需要达到的平均响应时间最高400ms,经过计算 我们的吞吐量至少要达到1000以上。

经过计算我们需要优化的目标如下

  • 吞吐量>=1000
  • 平均响应时间<=400ms

这里涉及到了吞吐量的概念什么是吞吐量呢下来我们来介绍下

什么是吞吐量

吞吐量定义

吞吐量是指对网络、设备、端口、虚电路或其他设施,单位时间内成功地传送数据的数量(以比特、字节、分组等测量)。

​ 以上的定义比较宽泛,定义到网站或者接口的吞吐量是这样的:吞吐量是指系统在单位时间内处理请求的数量。这里有一个注意点就是 单位时间内 ,对于网站的吞吐量这个单位时间一般定义为1秒,也就是说网站在一秒之内能处理多少http(https/tcp)请求。与吞吐量对应的衡量网站性能的还有响应时间、并发数、QPS每秒查询率。

吞吐量要素

一个系统的吞度量(承压能力)与request对CPU的消耗、外部接口、IO等等紧密关联。单个reqeust 对CPU消耗越高,外部系统接口、IO影响速度越慢,系统吞吐能力越低,反之越高,系统吞吐量几个重要参数:

  • QPS(TPS)
  • 并发数
  • 响应时间
QPS(TPS)

每秒钟 请求/事务 数量

​ 对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准,在因特网上,作为域名系统服务器的机器的性能经常用每秒请求率来衡量。对应fetches/sec,即每秒的响应请求数,也即是最大吞吐能力。

并发数

系统同时处理的 请求/事务数

​ 并发数是指系统同时能处理的请求数量,这个也是反应了系统的负载能力。

响应时间

一般取平均响应时间

​ 响应时间是一个系统最重要的指标之一,它的数值大小直接反应了系统的快慢。响应时间是指执行一个请求从开始到最后收到响应数据所花费的总体时间。

QPS计算

理解了上面三个要素的意义之后,就能推算出它们之间的关系:

QPS(TPS)= 并发数/平均响应时间 或者 并发数 = QPS*平均响应时间

举个例子:

银行窗口业务,早上8点上班,窗口数量为10个窗口,平均每个人办理业务的时候为5分钟。可以用下面的方法计算。

并发数=10个窗口

平均响应时间为 = 5*60 秒

QPS = 10/(5*60) 事务/秒

​ 我们以高速收费站为例子也许更直观一些,吞吐量就是一天之内通过的车辆数,响应时间就是车速,并发数就是高速上同时奔跑的汽车数。由此可见其实以上几个指标是有内在联系的。比如:响应时间缩短,在一定程度上可以提高吞吐量。

其实以上几个指标主要反映了两个概念:

  • 系统在单位时间之内能做多少事情
  • 系统做一件事情需要的时间

影响吞吐量因素

服务器(进程)级别

​ 服务器级别增加网站吞吐量也是诸多措施中最容易并且是效果最好的,如果一个网站能通过增加少量的服务器来提高吞吐量,菜菜觉得是应该优先采用的。毕竟一台服务器的费用相比较一个程序员费用来说要低的多。但是有一个前提, 就是你的服务器是系统的瓶颈,网站系统之后的其他系统并非瓶颈 。如果你的系统的瓶颈在DB或者其他服务,盲目的增加服务器并不能解决你的问题。

​ 通过增加服务器来解决你的网站瓶颈,意味着你的网站需要做负载均衡,如果没有运维相关人员,你可能还得需要研究负载均衡的方案,比如LVS,Nginx,F5等。我曾经面试过很多入道不久的同学,就提高吞吐量问题,如果没有回答上用负载均衡方案的基本都pass了,不要说别的,这个方案就是一个基础,就好比学习一个语言,你连最基本的语法都不会,我凭什么让你通过。

​ 其实现在很多静态文件采用CDN,本质上也可以认为是增加服务器的策略

线程级别

​ 当一个请求到达服务器并且正确的被服务器接收之后,最终执行这个请求的载体是一个线程。当一个线程被cpu载入执行其指令的时候,在同步的状态下,当前线程会阻塞在那里等待cpu结果,如果cpu执行的是比较慢的IO操作,线程会一直被阻塞闲置很长时间。

​ 当一个新的请求到来的时候,如果没有新的线程去领取这个任务并执行,要么会发生异常,要么创建新的线程。线程是一种很稀缺的资源,不可能无限制的创建。这种情况下我们就要把线程这种资源充分利用起来,不要让线程停下来。这也是程序推荐采用异步的原因,试想,一个线程不停的在工作,遇到比较慢的IO不会去等待结果,而是接着处理下一个请求,当IO的结果返回来得到通知的时候,线程再去取IO结果,岂不是能在相同时间内处理更多的请求。

​ 程序异步化(非阻塞)会明显提高系统的吞吐量,但是响应时间可能会稍微变大。

​ 还有一点,尽量减少线程上线文在cpu的切换,因为线程上线文切换的成本也是比较大的,在线程切换的时候,cpu需要把当前线程的上下文信息记录下来用以下次调用的时候使用,然后把新线程的上下文信息载入然后执行。这个过程相对于cpu的执行速度而言,要慢很多。

网络传输级别

​ 至于网络传输级别,由于协议大部分是Tcp/ip,所以在协议传输方面优化的手段比较少,但是应用程序级别协议可以选择压缩率更好的,比如采用grpc会比单纯的http协议要好很多,http2 要比http 1.1要好很多。另外一方面网卡尽量加大传输速率,比如千兆网卡要比百兆网卡速度更快。由于网络传输比较偏底层,所以人工干预的切入点会少很多。

如何提高系统QPS

由前面的公式:QPS(TPS)= 并发数/平均响应时间 可以看出,要提高qps,我们必须做2个方面努力

增加并发数
  1. 比如增加tomcat并发的线程数,开喝服务器性能匹配的线程数,可以更多满足服务请求。

  2. 增加数据库的连接数,预建立合适数量的TCP连接数

  3. 后端服务尽量无状态话,可以更好支持横向扩容,满足更大流量要求

  4. 调用链路上的各个系统和服务尽量不要单点,要从头到尾都是能力对等的,不能让其中某一点成为瓶颈。

  5. RPC调用的尽量使用线程池,预先建立合适的连接数。

减少平均响应时间
  1. 请求尽量越前结束,越好,这样压力就不要穿透到后面的系统上,可以在各个层上加上缓存
  2. 流量消峰。放行适当的流量,处理不了的请求直接返回错误或者其他提示。和水坝道理很类似
  3. 减少调用链
  4. 优化程序
  5. 减少网络开销,适当使用长连接
  6. 优化数据库,建立索引

优化策略

上面讲解了我们的整体业务流程,优化的话可以从以下几方面入手

提高并发数

提高并发数就是增加同一时刻能够处理的数据量,有以下几种方案

异步Servlet

​ 从3.0开始,Tomcat的Servlet支持异步请求,或者说是Tomcat提供了异步Servlet,从而可以将一些耗时的操作放到独立的线程中,在操作完成后再返回数据,不阻塞请求的执行和返回,甚至可以基于此实现服务器推的功能,因为不需要阻塞了所以可以很大程度上提高并发数。

请求响应队列

img

​ 使用RabbitMQ可以提高系统的吞吐量,因为有了RabbitMQ的缓冲可以很大程度提高服务的并发数以及系统的承载能力,请求响应队列是RabbitMQ的一个特有功能可以像是请求响应一样,发到MQ一条消息处理完成后会随着响应队列将结果返回。

减少响应时间

多线程请求合并

多线程合并请求就是将多个请求比如几十个请求合并成一个请求进行处理

image-20210125104245387

​ 主要借助本地队列将线程请求缓存,每隔一段时间比如 200ms,将这200ms合并到队列的请求统一当成一个请求处理,如果每一个请求耗时比较长,这样可以节省大量时间,并且可以减少网络IO的开销

Redis批量与扣减

合并请求后,本来redis需要扣减100次 但是现在是需要扣减一次即可,可以大量减少redis的网络IO的开销

数据库批量扣减

对于数据库也是一样的,本来需要操作100次,现在只需要操作一次

使用本地缓存

使用本地缓存可以很大程度上的提高系统的吞吐量

请求响应模式

下面我们先从请求响应开始讲结

什么是请求响应模式

​ 如果你正在进行web服务编程,那么最常用的模式是请求-响应模式。这种模式总是由客户端发起,然后等待服务器端的响应。如果客户端想发送一些信息给服务器,或者客户端按照某些标准请求一些信息,那么这种模式非常适合。如果服务器想自己发送信息给客户端,那么这种模式就非常不适合。这时我们就必须依赖像长轮询或者web挂钩这样的对HTTP进行某种程度扩展的技巧了。

img

消息系统请求响应

​ 而对消息系统来说,最常用的模式是发送-接收模式。生产者节点发布一条信息,接下来这条信息会被传送给消费节点。这儿没有纯粹的客户端或者服务器的概念;节点可以是生产者,也可以是消费者,或者二者兼有。当一个节点想发送一些信息给另一个节点或者相反,这种模式都运行的非常好,不过,如果一个节点想按照某些标准向另一个节点请求信息,那么这种模式就不是很适合。

img

​ RabbitMQ 是基于 AMQP 协议实现的一个消息队列(Message Queue),Message Queue 是一个典型的生产者/消费者模式。生产者发布消息,消费者消费消息,生产者和消费者之间是解耦的,互相不知道对方的存在。

​ 然而,这一切并不是完全做不到。我们可以模仿请求-应答模式:让客户端创建一个应答队列,这个队列存储客户端发送给服务器的查询消息的应答。客户端可以设置请求消息的reply_to属性字段为应答队列名。服务器检查reply_to子段,然后通过默认的整理中心把应答消息发布给应答队列,接着这个消息就由客户端接收。

img

客户端和服务端即是生产者也是消费者。客户端发布请求,消费响应;服务端消费请求,发布响应。

请求响应方案

请求端的实现很简单;它看起来就像标准的发送-接收模式。而对于应答端,我们可以有多个选择。如果你通过谷歌搜寻”RabbitMQ RPC”或者”RabbitMQ request response”,你就发现有关应答队列的性质方面有几个的意见:

  • 每个请求是不是应该对应一个应答队列,或者客户端对多个请求是不是应该只维护一个应答队列?

  • 应答队列是不是应该是独享的,即只可用于一个通道,或者应该不是独享的?注意:当通道关闭的时候,独享队列应该删除,无论是有意,还是网络中断或者转发网关失效,都能引起连接丢失。

每个请求独享一个应答队列

​ 这种情况下,每个请求创建一个应答队列。好处是实现起来简单。把响应与请求关联起来也没有问题,因为每个请求都有它自己对应的响应消费者。如果客户端与代理之间的连接在响应接收之前就断开了,那么代理就会清除掉剩余的应答队列,这时就会丢失响应的消息。

​ 实现这个的主要问题是:倘若由于服务器问题引起服务器无法发布响应,那么我们必须清除所有的应答队列。

​ 这种方式有很大的性能开销,因为它对每个请求都要创建一个新的队列和消费者。

每个客户端独享一个应答队列

​ 这种情况下,每个客户端连接维护着由许多请求共享的应答队列。这减少了对每个请求都要创建队列和消费者所造成的性能开销,不过它增加了客户端需要跟踪应答队列并且把应答与各自对应的请求匹配方面的开销。处理这个的标准办法是使用关联id,服务器可以从响应所对应的请求里拷贝这个id。

​ 在客户端断开的时候,删除应答队列没有任何问题,因为转发网关会自动删除队列的。然而,这确实意味着断开的那个时刻的仍在传输的任何响应都将丢失。

永久应答队列

​ 上面两种情形都存在这样的问题:如果客户端和转发网关之间的连接断开,并且响应还处在运行状态,那么这些响应就会丢失。这是因为它们使用的是独享型的队列,也就是说,当拥有这个队列的连接关闭的时候,转发网关必须删除这个队列。

​ 针对这个问题的常见的解决办法就是使用非独享型的应答队列。不过这会引起一些管理开销。你需要采用某种方式命名应答队列,并把它与特定的客户端关联起来。问题是:客户端很难知道一个应答队列是属于自己的,还是属于另一个客户端的。不过随意地创建一个可把响应发送给不对应的客户端这样的环境却非常容易。你可能最终要手工创建和命名响应队列,这么做就没有了第一种情况下可根据消息选择代理的好处了。

默认交换器

默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。

​ 它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

image-20210121114427204

提高系统吞吐量

我们可以使用请求响应队列提高系统的吞吐量,根据上面的讲解提高吞吐量的方式有以下两种

  • 提高并发数
  • 减少平均响应时间

提高并发数

提高并发数主要有以下方案

  • 异步Servlet
  • 请求响应队列

使用异步Servlet

结合上面讲的,提高并发量一个一个手段是使用异步Servlet,servlet从3.0开始支持异步

什么是Servlet

Servlet 是基于 JavaWeb 组件,由容器进行管理,来生成动态内容。像其他基于 Java 的组件技术一样,servlet 也是基于平台无关的 Java 类格式,被编译为平台无关的字节码,可以被基于 Java 技术的 Web 服务器动态加载并运行。

容器Container),有时候也叫做servlet引擎,是Web服务器为支持 servlet 功能扩展的部分。客户端通过 servlet 容器实现的 request/response paradigm(请求/应答模式) 与 Servlet 进行交互。

什么是Servlet规范

Servlet版本发布都会对应一个Servlet版本的规范,Servlet2.5、Servlet3.0、Servlet3.1.

​ 规范中描述了Java Servlet API 的标准,定义了 Java Servlet API 中类、接口、方法签名的完整规范且附带的Javadoc 文档供开发人员查阅,目的主要是为Java Servlet 给出一个完整和清晰的解释。

​ 看出Servlet规范版本和tomcat支持的版本的对应关系。比如Servlet3是从tomcat7以后开始支持的。

img

同步,异步,阻塞,非阻塞

同步异步是数据通信的方式,阻塞和非阻塞是一种状态。比如同步这种数据通讯方式里面可以有阻塞状态也可以有非阻塞状态。

同步请求.png

异步请求.png

同步与异步

同步和异步关注的是消息通信机制

​ 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回,但是一旦调用返回,就得到返回值了。

​ 而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

举个通俗的例子:

​ 你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下”,然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。

​ 而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。

阻塞与非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

​ 阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

​ 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

还是上面的例子

​ 你打电话问书店老板有没有《分布式系统》这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。
​ 在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关

Servlet3的异步流程

img

​ 接收到request请求之后,tomcat工作线程HttpServletRequest中获得一个异步上下文AsyncContext对象,然后由tomcat工作线程把AsyncContext对象传递给业务处理线程,同时tomcat工作线程归还到工作线程池。

​ 在业务处理线程中完成业务逻辑的处理,生成response返回给客户端。在Servlet3.0中虽然处理请求可以实现异步,但是InputStreamOutputStream的IO操作还是阻塞的,当数据量大的request body 或者 response body的时候,就会导致不必要的等待。从Servlet3.1以后增加了非阻塞IO,需要tomcat8.x支持。

Servlet3的异步使用步骤

我们使用的大致步骤如下:

  1. 声明Servlet,增加asyncSupported属性,开启异步支持。

    1
    @WebServlet(asyncSupported = true)
  2. 通过request获取异步上下文AsyncContext

    1
    AsyncContext asyncCtx = request.startAsync();
  3. 开启业务逻辑处理线程,并将AsyncContext 传递给业务线程。

    1
    executor.execute(new AsyncRequestProcessor(asyncCtx, secs));
  4. 在异步业务逻辑处理线程中,通过asyncContext获取request和response,处理对应的业务。

  5. 业务逻辑处理线程处理完成逻辑之后,调用AsyncContext 的complete方法从而结束该次异步线程处理。

    1
    asyncContext.complete();

SpringMVC对Servlet3异步请求的支持

SpringMVC对Servlet3异步请求的支持有两种方式,分别是通过处理器方法返回Callable和DeferredResult。

​ 按照Servlet3的规范,支持异步请求时需要配置对应的Servlet和Filter支持异步请求,为了使SpringMVC支持异步请求的处理,需要在定义DispatcherServlet时配置其支持异步请求,在DispatcherServlet之前定义的Filter也需要配置支持异步请求。

返回Callable

​ 当处理器的返回方法是Callable类型时会默认发起异步请求,并使用一个TaskExecutor来调用返回的Callable,之后的处理就跟正常的SpringMVC请求是一样的。

​ Callable的返回结果也跟正常请求SpringMVC的一样,可以返回Model、ModelAndView、String、Object等,也可以结合@ResponseBody使用,具体可以参考CallableMethodReturnValueHandler的handleReturnValue()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RequestMapping("/callableTest")
public CallbackResult<ResponseVO> callableTest() {
//创建CallbackResult对象
CallbackResult resultCallable = new CallableResult<ResponseVO>(0);
//异步线程处理
ThreadPoolUtils.execute(ResponseVO.success(null), x -> {
try {
//休眠10S
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
resultCallable.setCallResult(x);
});
return resultCallable;
}

​ 如果需要针对于单个Callable请求指定超时时间,我们可以把Callable用一个WebAsyncTask包裹起来。然后还可以指定超时回调和正常处理完成的回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RequestMapping("/callable/timeout")
 public WebAsyncTask<String> forCallableWithTimeout(Model model) throws Exception {
   long timeout = 5 * 1000L;
   WebAsyncTask<String> asyncTask = new WebAsyncTask<>(timeout, () -> {
     TimeUnit.MILLISECONDS.sleep(timeout + 10);
     model.addAttribute("a", "aaaaaaa");
     return "async_request_callable";
   });
   asyncTask.onTimeout(() -> {
     System.out.println("响应超时回调");
     return "async_request_callable_timeout";
   });
   asyncTask.onCompletion(() -> {
     System.out.println("响应callable调用完成的回调");
   });
   return asyncTask;
 }
返回DeferredResult

一旦启用了异步请求处理功能 ,控制器就可以将返回值包装在DeferredResult,控制器可以从不同的线程异步产生返回值。优点就是可以实现两个完全不相干的线程间的通信。

​ 使用DeferredResult的返回结果的编程通常是在处理器方法中创建一个DeferredResult实例,把它保存起来后再进行返回,比如保存到一个队列中,然后在另外的一个线程中会从这个队列中拿到相应的DeferredResult对象进行相应的业务处理后会往DeferredResult中设置对应的返回值。

​ 返回了DeferredResult后SpringMVC将创建一个DeferredResultHandler用于监听DeferredResult,一旦DeferredResult中设置了返回值后,DeferredResultHandler就将对返回值进行处理。DeferredResult的处理过程见DeferredResultMethodReturnValueHandler的handleReturnValue()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RequestMapping("/deferredResultTest")
public CallbackResult<ResponseVO> deferredResultTest() {
CallbackResult<ResponseVO> deferredResult = new CallbackDeferredResult<>(0);
ThreadPoolUtils.execute(ResponseVO.success(null), x -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
deferredResult.setCallResult(x);
});

return deferredResult;
}

​ 对于DeferredResult也是可以单独指定超时时间和超时后的回调的,它的超时时间可以直接通过构造函数传递,单位是毫秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RequestMapping("/deferredresult/timeout")
public DeferredResult<String> forDeferredResultWithTimeout() throws Exception {
DeferredResult<String> result = new DeferredResult<>(10 * 1000);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(31);
} catch (InterruptedException e) {
e.printStackTrace();
}
result.setResult("async_request_deferredresult");
}).start();

result.onTimeout(() -> {
System.out.println("响应超时回调函数");
});

result.onCompletion(() -> {
System.out.println("响应完成的回调函数");
});

return result;
}
两者的的区别

DeferredResult需要自己用线程来处理结果setResult,而Callable的话不需要我们来维护一个结果处理线程。

​ 总体来说,Callable的话更为简单,同样的也是因为简单,灵活性不够;

​ 相对地,DeferredResult更为复杂一些,但是又极大的灵活性,所以能实现非常多个性化的、复杂的功能,可以设计高级应用。

​ 有些较常见的场景,Callable也并不能解决,比如说:我们访问A接口,A接口调用三方的服务,服务回调(注意此处指的回调,不是返回值)B接口,这种情况就没办法使用Callable了,这个时候可以使用DeferredResult

​ 使用原则:基本上在可以用Callable的时候,直接用Callable;而遇到Callable没法解决的场景的时候,可以尝试使用DeferredResult

​ 站在一定高度来看这问题,Callable和Deferredresult做的是同样的事情——释放容器线程,在另一个线程上异步运行长时间的任务。不同的是谁管理执行任务的线程。

请求响应队列提高并发数

上面我们讲解了异步Servlet方式提上吞吐量,但是微服务调用需要花费时间,我们可以使用异步servlet+请求响应队列提高吞吐量

image-20210121164424038

请求流程
  1. 首先用户请求的时候通过tomcat线程进行请求端的处理
  2. 请求生成一个消息ID,并生成以及保存回调对象到本地缓存
  3. 将消息ID设置到消息队列中,然后投递消息
  4. 请求线程完成并继续处理其他请求
回调流程
  1. 业务处理完成后将处理结果放进响应队列
  2. 会掉到消息处理端,从消息体中拿到消息ID
  3. 从本地缓存中拿到回调对象
  4. 响应结果唤醒响应线程返回响应

提高响应时间方案

提高响应时间主要有以下方案

  • 批量合并请求
  • 使用本地缓存

批量合并请求

项目中一般会请求第三方的接口,也会对外提供接口,可能是RPC,也可能是HTTP等方式。在对外提供接口时,有必要提供相应的批量接口,好的批量实现能够提升性能。

​ 高并发场景中,调用批量接口相比调用非批量接口有更大的性能优势。但有时候,请求更多的是单个接口,不能够直接调用批量接口,如果这个接口是高频接口,对其做请求合并就很有必要了。比如电影网站的获取电影详情接口,APP的一次请求是单个接口调用,用户量少的时候请求也不多,完全没问题;但同一时刻往往有大量用户访问电影详情,是个高并发的高频接口,如果都是单次查询,后台就不一定能hold住了。为了优化这个接口,后台可以将相同的请求进行合并,然后调用批量的查询接口。如下图所示

img

模拟耗时调用

用下面的延时操作来模拟一下比较耗时的操作

1
2
3
4
5
6
7
8
9
10
11
12
public static long purchase(long num) {
logger.info("开始进行下单操作,数量:{}", num);
try {
//模拟接口调用耗时
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
long result = atomicLong.addAndGet(-num);
logger.info("完成进行下单操作,剩余数量:{}", result);
return result;
}
无合并请求

​ 合并请求前,我们一般是调用服务层的单次创建方法。看起来都比较简单,且易于理解。

1
2
3
4
5
6
7
8
public class SingleRequestInvok {
//直接进行调用
public String invok() {
long result = RequestHandel.purchase(1);

return result >= 0 ? "下单成功..." : "下单失败...";
}
}
请求合并

​ 请求合并的好处前面有提到,那不能每次写接口就做请求合并吧?我们要明白,技术无好坏,要在特定的业务场景下衡量利弊,采用与否需要深思熟虑。合并请求会令代码变得复杂,也会增加一定的接口延迟,其中还可能存在各种未知的风险。

​ 合并请求是针对高并发场景的一种手段,我们实现请求合并之前,要结合业务场景思考一番,是否值得承受的合并带来的访问延迟?用户体验是否会打折扣?自身的技术是否足够hold住请求合并带来的未知风险?

​ 思路:收到前端的请求时,先存起来,隔段时间批量请求第三方服务批量接口,然后分别通知存起来的请求,并且响应前端。

创建回调对象

主要是该类实现了Callable 接口,并且使用CountDownLatch hold住响应线程,需要其他线程调用setResult设置值后 才能唤醒响应线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 回调对象 实现callable接口
*
* @param <T>
*/
public class CallbackTask<T> implements Callable<T> {
//结果对象
private T result;
//创建countDownLatch 在前台调用call的时候 hold住线程
private CountDownLatch countDownLatch = new CountDownLatch(1);

/**
* 给其他回调线程执行的call
* @return
* @throws Exception
*/
@Override
public T call() throws Exception {
//hold 住线程 其他 响应线程会在这里阻塞
countDownLatch.await();
//返回结果
return result;
}

/**
* 设置结果并唤醒回调线程
* @param result
*/
public void setResult(T result) {
//设置结果
this.result = result;
//唤醒响应线程
countDownLatch.countDown();
}
}
缓存请求

将请求缓存到阻塞队列中,并返回callable,交给异步servlet进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 浏览器调用请求
*
* @return
*/
public CallbackTask requestInvok() {
//创建回调对象
CallbackTask<String> callbackTask = new CallbackTask<String>();
try {
//加入队列缓冲
blockingQueue.put(callbackTask);
} catch (InterruptedException e) {
e.printStackTrace();
}
//以callable方式返回
return callbackTask;
}
异步线程处理数据

这里主要是从阻塞队列中取出来callable,然后缓存到本地list缓冲区中

如果符合一下条件则进行批量执行

  • 缓冲区数量< 最小 批次数 并且没有达到延时时间需要合并请求

  • 缓冲区数量 >=最大批次数进行批量执行

  • 等待合并时间>=200ms 进行批量提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 异步线程处理数据
*/
public void asyncHandel() {
//设置CallBack缓冲区
List<CallbackTask<String>> cacheCallbackList = new LinkedList<>();
//当前时间
long currentTime = System.currentTimeMillis();
while (!Thread.interrupted()) {
try {
//获取一个回调对象
CallbackTask<String> callbackTask = blockingQueue.poll(delayTime, TimeUnit.MILLISECONDS);
if (null != callbackTask) {
//添加到缓存
cacheCallbackList.add(callbackTask);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
//list为空 继续循环
if (cacheCallbackList.isEmpty()) {
continue;
}
//获取当前调用耗时
long duration = (System.currentTimeMillis() - currentTime);

//如果缓冲区lsit 小于最小批次数 并且没有 没有达到延时时间 继续循环
if (cacheCallbackList.size() < minBatchSize && duration < delayTime) {
continue;
}
//如果 list缓存数量>最大皮次数 或者 耗时超过200Ms 进行一次批量提交
if (cacheCallbackList.size() >= maxBatchSize || duration >= delayTime) {
//进行批量扣减
long result = RequestHandel.purchase(cacheCallbackList.size());
//设置回调值 并唤醒 响应线程
setBatchResult(cacheCallbackList, result);
//清空list缓存
cacheCallbackList.clear();
//重置基准时间
currentTime = System.currentTimeMillis();
}
}
}
唤醒响应线程

这里线程会调用setBatchResult 根据结果集 会调用CallbackTask的setResult 设置值并唤醒异步响应线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 批量设置结果值 并唤醒响应线程
* @param callbackTaskList
* @param result
*/
public void setBatchResult(List<CallbackTask<String>> callbackTaskList, long result) {
//计算扣减成功的数量
long successSize = result >= 0 ? callbackTaskList.size() : callbackTaskList.size() + result;
for (int i = 0; i < callbackTaskList.size(); i++) {
CallbackTask<String> callbackTask = callbackTaskList.get(i);
if (i <= successSize) {
callbackTask.setResult("下单成功...");
} else {
callbackTask.setResult("下单失败...");
}
}
}

使用本地缓存

对于访问非常频繁的数据,可以使用本地缓存优化响应速度

image-20210125105951015

​ 这里我们用本地缓存判断扣减状态,如果状态扣减为0 则将扣减状态设置为 fasle 进行快速失败,而不是继续调用业务服务。

image-20210122164658326

提高服务承载能力

经过上面优化后系统的并发量已经很高了 但是如果如果调用服务慢 可能造成大量数据压入本地队列 把服务器撑爆,这种情况下可以使用 rabbitMQ的请求响应队列提高整体服务的承载能力。

image-20210124182555944

压测验证

Main启动tomcat

手动启动Tomcat 然后创建一个servlet进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class TomcateServer {
public static void main(String[] args) {
Tomcat tomcat = new Tomcat();
// 设置主机名称
tomcat.setHostname("localhost");
//设置访问端口号
tomcat.setPort(8080);
//设置路径
tomcat.setBaseDir("D:/tmp/embedTomcat");
String contextPath = "";

StandardContext context = new StandardContext();
// 设置资源路径
context.setPath(contextPath);
// 设置应用路径
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());
// 将context加入tomcat
tomcat.getHost().addChild(context);
// 在context中创建表示servlet的Wrapper并返回
tomcat.addServlet(contextPath, "request", new MergeRequestAsyncServlet());
context.addServletMappingDecoded("/request", "request");

try {
// 启动tomcat
tomcat.start();
} catch (LifecycleException e) {
e.printStackTrace();
}
// 等待请求
tomcat.getServer().await();
}
}

同步servlet压测

使用同步Servlet进行压测效果如下

image-20210122164944638

吞吐量在131 左右

异步servlet压测

使用异步Servlet进行压测

image-20210122165137461

我们发现使用异步Servlet后整体吞吐量提高了一些

合并请求后压测

使用异步Servlet以及合并请求后进行压测

image-20210122165258271

我们发现通过异步Servlet+合并请求压测后 整体吞吐量提升了十倍左右

评论