1 背景
业务背景
在大数据量高并发访问时, 经常会出现服务或接口面对暴涨的请求而不可用的情况, 甚至引发连锁反映导致整个系统崩溃. 此时需要使用的技术手段之一就是限流: 当请求达到一定的并发数或速率, 就进行等待, 排队, 降级, 拒绝服务等. 在限流时, 常见的算法是计数器算法和令牌桶算法.
技术背景
- SpringBoot2.X
- JDK1.8
- guava 23.6-jre
- aop
算法简介
令牌桶算法
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌, 而如果请求需要被处理, 则需要先从桶里获取一个令牌, 当桶里没有令牌可取时, 则拒绝服务. 当桶满时, 新添加的令牌被丢弃或拒绝.
计数器算法
计数器限流算法主要用来限制总并发数, 比如数据库连接池大小, 线程池大小, 程序访问并发数等都是使用计数器算法.
2 技术实现
为达到复用, 简便, 代码零污染等目的, 使用 AOP + 自定义注解技术进行实现.
创建一个 SpringBoot Starter 工程
具体步骤可参考使用 STS 创建 Spring Boot 项目. 然后将 pom.xml 文件清理成下面这个样子.
- <groupId>cn.com.yd.commons</groupId>
- <artifactId>currentlimiter</artifactId>
- <version>1.0.0</version>
- <packaging>jar</packaging>
- <name>currentlimiter</name>
- <description > 基于 spring aop 限流器 </description>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <java.version>1.8</java.version>
- </properties>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-dependencies</artifactId>
- <version>2.0.2.RELEASE</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-aop</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-log4j2</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- </dependency>
- <!-- 项目单独需要的 jar -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>23.6-jre</version>
- </dependency>
- </dependencies>
自定义注解
令牌桶算法注解
- import java.lang.annotation.*;
- @Documented
- @Target(ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- /**
- *
- * 令牌桶限流注解, 默认流量 1000
- * @author 李庆海
- *
- */
- public @interface TbLimiter {
- int value() default 1000;
- }
计数器算法注解
- import java.lang.annotation.*;
- @Documented
- @Target(ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- /**
- *
- * 限流注解, 默认流量 1000
- * @author 李庆海
- *
- */
- public @interface ShLimiter {
- int value() default 1000;
- }
基于 AOP 技术实现的拦截器
令牌桶算法拦截器
- import cn.com.yd.commons.currentlimiter.annotations.TbLimiter;
- import com.google.common.util.concurrent.RateLimiter;
- import org.aspectj.lang.JoinPoint;
- import org.aspectj.lang.annotation.Around;
- import org.aspectj.lang.annotation.Aspect;
- import org.aspectj.lang.annotation.Pointcut;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.cglib.core.ReflectUtils;
- import org.springframework.stereotype.Component;
- import java.lang.reflect.Method;
- import java.util.Arrays;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * 令牌桶算法限流拦截器
- *
- * @author 李庆海, 张蕴
- */
- @Aspect
- @Component
- public class TokenBucketLimiterInterceptor {
- private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<String, RateLimiter>();
- private final static Logger LOG = LoggerFactory.getLogger(TokenBucketLimiterInterceptor.class);
- @Pointcut("@annotation(cn.com.yd.commons.currentlimiter.annotations.TbLimiter)")
- public void aspect() {
- }
- @Around(value = "aspect()")
- public void around(JoinPoint point) throws Throwable {
- LOG.debug("进入限流器");
- // 返回目标对象
- Object target = point.getTarget();
- String targetName = target.getClass().getName();
- // 返回当前连接点签名
- String methodName = point.getSignature().getName();
- // 获得参数列表
- Object[] arguments = point.getArgs();
- Class<?> targetClass = Class.forName(targetName);
- // 获取参数类型数组
- Class<?>[] argTypes = ReflectUtils.getClasses(arguments);
- // 获取目标 method, 考虑方法的重载等问题
- Method method = targetClass.getDeclaredMethod(methodName, argTypes);
- // 获取目标 method 上的限流注解 @Limiter
- TbLimiter limiter = method.getAnnotation(TbLimiter.class);
- RateLimiter rateLimiter = null;
- if (null != limiter) {
- // 以 class + method + parameters 为 key, 避免重载, 重写带来的混乱
- String key = targetName + "." + methodName + Arrays.toString(argTypes);
- rateLimiter = rateLimiters.get(key);
- if (null == rateLimiter) {
- // 获取限定的流量
- // 为了防止并发
- rateLimiters.putIfAbsent(key, RateLimiter.create(limiter.value()));
- rateLimiter = rateLimiters.get(key);
- }
- // 消耗一个令牌
- rateLimiter.acquire();
- point.proceed();
- } else {
- point.proceed();
- }
- LOG.debug("退出限流器");
- }
- }
计数器算法拦截器
- import cn.com.yd.commons.currentlimiter.annotations.ShLimiter;
- import org.aspectj.lang.ProceedingJoinPoint;
- import org.aspectj.lang.annotation.Around;
- import org.aspectj.lang.annotation.Aspect;
- import org.aspectj.lang.annotation.Pointcut;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.cglib.core.ReflectUtils;
- import org.springframework.stereotype.Component;
- import java.lang.reflect.Method;
- import java.util.Arrays;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.Semaphore;
- /**
- * 计数器算法限流拦截器, 基于 Semaphore 技术实现
- *
- * @author 李庆海, 张蕴
- */
- @Aspect
- @Component
- public class SemaphoreLimiterInterceptor {
- private final Map<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
- private final static Logger LOG = LoggerFactory.getLogger(SemaphoreLimiterInterceptor.class);
- @Pointcut("@annotation(cn.com.yd.commons.currentlimiter.annotations.ShLimiter)")
- public void aspect() {
- }
- @Around(value = "aspect()")
- public void around(ProceedingJoinPoint point) throws Throwable {
- LOG.debug("进入" 限流拦截器 "");
- // 返回目标对象
- Object target = point.getTarget();
- String targetName = target.getClass().getName();
- // 返回当前连接点签名
- String methodName = point.getSignature().getName();
- // 获得参数列表
- Object[] arguments = point.getArgs();
- Class<?> targetClass = Class.forName(targetName);
- // 获取参数类型数组
- Class<?>[] argTypes = ReflectUtils.getClasses(arguments);
- // 获取目标 method, 考虑方法的重载等问题
- Method method = targetClass.getDeclaredMethod(methodName, argTypes);
- // 获取目标 method 上的限流注解 @Limiter
- ShLimiter limiter = method.getAnnotation(ShLimiter.class);
- if (null != limiter) {
- // 以 class + method + parameters 为 key, 避免重载, 重写带来的混乱
- String key = targetName + "." + methodName + Arrays.toString(argTypes);
- // 获取限定的流量
- Semaphore semaphore = semaphores.get(key);
- if (null == semaphore) {
- // 为了预防并发
- semaphores.putIfAbsent(key, new Semaphore(limiter.value()));
- semaphore = semaphores.get(key);
- }
- try {
- // 消耗一个令牌
- semaphore.acquire();
- // 调用被代理方法
- point.proceed();
- } finally {
- // 释放令牌
- if (null != semaphore) {
- semaphore.release();
- }
- }
- } else {
- point.proceed();
- }
- LOG.debug("退出" 限流拦截器 "");
- }
- }
让自定义拦截器具备自动注入的能力
在 resources 目录下新建名为 META-INF 的文件夹, 然后新建一个名为 spring.factories 的文件, 在文件中增加下面的内容:
- org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
- cn.com.yd.commons.currentlimiter.interceptors.SemaphoreLimiterInterceptor,\
- cn.com.yd.commons.currentlimiter.interceptors.TokenBucketLimiterInterceptor
至此, 本工程中的代码已经具备了开箱即用的能力. 使用 Maven 将本工程打包, 即可在其他项目中引入使用.
3 在项目中使用
需要 Maven 依赖
- <dependency>
- <groupId>cn.com.yd.commons</groupId>
- <artifactId>currentlimiter</artifactId>
- <version>1.0.0</version>
- </dependency>
在需要限流的方法, 接口上添加限流注解
@TbLimiter 和 @ShLimiter 两个注解都可以达到限流的目的, 任选其中一个添加到需要限流的方法, 接口上即可. 注解的参数 value 默认值为 1000. 使用那个注解启用那个相关的拦截器.
- /**
- * 登录, 1 秒钟的时间内只允许 1000 个登录请求
- * @param loginName 登录帐号
- * @param password 登录密码
- * @return
- * @throws Exception
- */
- @TbLimiter
- public Object login(String loginName,String password)throws Exception;
- /**
- * 登录, 1 秒钟的时间内只允许 100 个退出请求
- * @param loginName 登录帐号
- * @return
- * @throws Exception
- */
- @TbLimiter(100)
- public Object logout(String loginName)throws Exception;
来源: http://www.jianshu.com/p/15f4821f03ca