背景
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。缓存、降级和限流是保护微服务系统运行稳定性的三大利器。
- 缓存:提升系统访问速度和增大系统能处理的容量
- 降级:当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉
- 限流:解决服务雪崩,级联服务发生阻塞时,及时熔断,防止请求堆积消耗占用系统的线程、IO等资源,造成其他级联服务所在服务器的崩溃
这里我们主要说一下限流,限流的目的应当是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率就可以拒绝服务、等待、降级。 首先,我们需要去了解最基本的两种限流算法。
限流算法
- 漏桶算法
- 令牌桶算法
- 计算器算法
这里主要是提一下,详细了解限流算法请参考下面链接 https://www.cnblogs.com/hopeiscoming/p/12297528.html
限流框架
下面说一下现有流行的限流工具
guava
Google的Guava工具包中就提供了一个限流工具类——RateLimiter。RateLimiter是基于“令牌通算法”来实现限流的。
hystrix
hystrix主要是通过资源池以及信号量来限流,暂时能支持简单的限流
sentinel
限流比较主流的三种算法:漏桶,令牌桶,滑动窗口。而Sentinel采用的是最后一种,滑动窗口来实现限流的。当然sentinel不仅仅局限于限流,它是一个面向分布式服务架构的高可用流量防护组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。
限流实战
有很多应用都是可以直接在调用端、代理、网关等中间层进行限流,下面简单介绍下集中中间件限流方式
nginx限流
nginx限流方式有三种
- limit_conn_zone
- limit_req_zone
- ngx_http_upstream_module 详细了解限流可以参考下面链接: https://www.cnblogs.com/biglittleant/p/8979915.html
但是nginx限流不够灵活,不好动态配置。
zuul限流
除了zuul引入限流相关依赖
<dependency>
<groupid>com.marcosbarbero.cloud</groupid>
<artifactid>spring-cloud-zuul-ratelimit</artifactid>
<version>2.0.0.RELEASE</version>
</dependency>
相关配置如下:
zuul:
ratelimit:
key-prefix: your-prefix #对应用来标识请求的key的前缀
enabled: true
repository: REDIS #对应存储类型(用来存储统计信息)默认是IN_MEMORY
behind-proxy: true #代理之后
default-policy: #可选 - 针对所有的路由配置的策略,除非特别配置了policies
limit: 10 #可选 - 每个刷新时间窗口对应的请求数量限制
quota: 1000 #可选- 每个刷新时间窗口对应的请求时间限制(秒)
refresh-interval: 60 # 刷新时间窗口的时间,默认值 (秒)
type: #可选 限流方式
- user
- origin
- url
policies:
myServiceId: #特定的路由
limit: 10 #可选- 每个刷新时间窗口对应的请求数量限制
quota: 1000 #可选- 每个刷新时间窗口对应的请求时间限制(秒)
refresh-interval: 60 # 刷新时间窗口的时间,默认值 (秒)
type: #可选 限流方式
- user
- origin
- url
注意这里的仓库如果是针对全局限流,那么可以考虑存到redis中,这里的zuul.ratelimit.repository可以设置为redis,但是如果扩容后则需要动态调整,不过灵活,所以这里我建议还是选择本地内存(INM_MOMERY)或者不设置,这样伸缩容后可以自动扩展,不用变更配置,
如果需要动态更新,可以集成apollo配置进行动态更新,
public class ZuulPropertiesRefresher implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Autowired
private RouteLocator routeLocator;
@ApolloConfigChangeListener(interestedKeyPrefixes = "zuul.",value="zuul.yml")
public void onChange(ConfigChangeEvent changeEvent) {
refreshZuulProperties(changeEvent);
}
private void refreshZuulProperties(ConfigChangeEvent changeEvent) {
log.info("Refreshing zuul properties!");
/**
* rebind configuration beans, e.g. ZuulProperties
* @see org.springframework.cloud.context.properties.ConfigurationPropertiesRebinder#onApplicationEvent
*/
this.applicationContext.publishEvent(new EnvironmentChangeEvent(changeEvent.changedKeys()));
/**
* refresh routes
* @see org.springframework.cloud.netflix.zuul.ZuulServerAutoConfiguration.ZuulRefreshListener#onApplicationEvent
*/
this.applicationContext.publishEvent(new RoutesRefreshedEvent(routeLocator));
log.info("Zuul properties refreshed!");
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
springcloud gateway限流
在Spring Cloud Gateway中,有Filter过滤器,因此可以在“pre”类型的Filter中自行实现上述三种过滤器。但是限流作为网关最基本的功能,Spring Cloud Gateway官方就提供了RequestRateLimiterGatewayFilterFactory这个类,适用Redis和lua脚本实现了令牌桶的方式。具体实现逻辑在RequestRateLimiterGatewayFilterFactory类中,lua脚本在如下图所示的文件夹中:
在这里插入图片描述
具体源码不打算在这里讲述,读者可以自行查看,代码量较少,先以案例的形式来讲解如何在Spring Cloud Gateway中使用内置的限流过滤器工厂来实现限流。 首先在工程的pom文件中引入gateway的起步依赖和redis的reactive依赖,代码如下:
<dependency>
<groupid>org.springframework.cloud</groupid>
<artifactid>spring-cloud-starter-gateway</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifatid>spring-boot-starter-data-redis-reactive
</artifatid></dependency>
复制代码在配置文件中做以下的配置:
spring:
redis:
host: 127.0.0.1
port: 6379
cloud:
gateway:
routes:
- id: limit_route
uri: http://httpbin.org:80/get
predicates:
- After=2017-01-20T17:42:47.789-07:00[America/Denver]
filters:
- name: RequestRateLimiter
args:
key-resolver: '#{@hostAddrKeyResolver}'
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 3
配置了 redis的信息,并配置了RequestRateLimiter的限流过滤器,该过滤器需要配置三个参数:
- burstCapacity,令牌桶总容量。
- replenishRate,令牌桶每秒填充平均速率。
- key-resolver,用于限流的键的解析器的 Bean 对象的名字。它使用 SpEL 表达式根据#{@beanName}从 Spring 容器中获取 Bean 对象。
可以通过KeyResolver来指定限流的Key,比如我们需要根据用户来做限流,IP来做限流等等。
- IP限流
@Bean
public KeyResolver ipKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
}
- 用户限流
@Bean
KeyResolver userKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("userId"));
}
- 接口限流
@Bean
KeyResolver apiKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getPath().value());
}
这里只是针对单节点限流,如果需要可以自定义全局限流
sentinel 限流
sentinel限流这里不做详细描述,大家想了解可以参考下面文档 https://juejin.cn/post/6844903758405566477 https://www.cnblogs.com/wuzhenzhao/p/11453649.html
应用限流
这里springboot应用服务需要限流的话,这里给的方案是集成google的guava类库,大家在网上能搜索到很多demo,我这里不做详细描述,主要是下面api的使用:
RateLimiter.create(callerRate);
现在容器比较火,现在如果部署在容器或者虚拟机上,我们需要动态调整资源数后,那么限流也会跟着变化,这里说一下如何实现动态限流。第一步肯定是集成配置中心实现配置动态更新,至于说生效方式有几种 方案一: 增加监听器,当配置变动时重新创建限流对象
方案二: 限流对象定时创建,这里引入了应用缓存框架,下面给个demo
import com.ctrip.framework.apollo.Config;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.concurrent.TimeUnit;
@Slf4j
public class RateLimitInterceptor implements HandlerInterceptor {
private Config config;
private static final String RATE_TYPE_GLOBAL = "global";
private static final String RATE_TYPE_URL = "url";
//全局限流
public RateLimitInterceptor(Config config) {
this.config = config;
}
Cache<object, ratelimiter> rateLimiterCache = Caffeine.newBuilder()
.initialCapacity20
.expireAfterWrite(2, TimeUnit.MINUTES)
.maximumSize100
.softValues()
.recordStats()
.build();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
if (StringUtils.isBlank(request.getRequestURI()) || request.getRequestURI().startsWith("/actuator/")
|| request.getRequestURI().startsWith("/srch-recommend/fault-tolerant/health")||request.getRequestURI().startsWith("/health")) {
return true;
}
try {
boolean rateLimitEnabled=config.getBooleanProperty("ratelimit.enabled", false);
if(!rateLimitEnabled){
return true;
}
if (!do(RATE_TYPE_GLOBAL, StringUtils.EMPTY, "ratelimit.global")) {
return false;
}
String url = request.getRequestURI();
if (StringUtils.isNotBlank(url)) {
return do(RATE_TYPE_URL, url, "ratelimit.url.");
}
return true;
} catch (Exception e) {
log.warn("RateLimitInterceptor error message:{}", e.getMessage(), e);
return true;
}
}
private boolean doRateLimiter(String rateType, String key, String configPrefix) {
String cacheKey = rateType + "-" + key;
RateLimiter rateLimiter = rateLimiterCache.getIfPresent(cacheKey);
if (rateLimiter == null) {
int callerRate = config.getIntProperty(configPrefix + uniqueKey, 0);
if (callerRate > 0) {
rateLimiter = RateLimiter.create(callerRate);
rateLimiterCache.put(cacheKey, rateLimiter);
}
}
return rateLimiter == null || rateLimiter.tryAcquire();
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
ModelAndView modelAndView) {
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
}
}
当然这里如果有业务相关的限流可以根据参考上面的demo自己来实现限流