分布式锁是在分布式环境下 (多个 JVM 进程) 控制多个客户端对某一资源的同步访问的一种实现, 与之相对应的是线程锁, 线程锁控制的是同一个 JVM 进程内多个线程之间的同步. 分布式锁的一般实现方法是在应用服务器之外通过一个共享的存储服务器存储锁资源, 同一时刻只有一个客户端能占有锁资源来完成. 通常有基于 Zookeeper,Redis, 或数据库三种实现形式. 本文介绍基于 Redis 的实现方案.
要求
基于 Redis 实现分布式锁需要满足如下几点要求:
在分布式集群中, 被分布式锁控制的方法或代码段同一时刻只能被一个客户端上面的一个线程执行, 也就是互斥
锁信息需要设置过期时间, 避免一个线程长期占有 (比如在做解锁操作前异常退出) 而导致死锁
加锁与解锁必须一致, 谁加的锁, 就由谁来解(或过期超时), 一个客户端不能解开另一个客户端加的锁
加锁与解锁的过程必须保证原子性
实现
1. 加锁实现
基于 Redis 的分布式锁加锁操作一般使用 SETNX 命令, 其含义是 "将 key 的值设为 value , 当且仅当 key 不存在. 若给定的 key 已经存在, 则 SETNX 不做任何动作".
在 Spring Boot 中, 可以使用 StringRedisTemplate 来实现, 如下, 一行代码即可实现加锁过程.(下列代码给出两种调用形式 -- 立即返回加锁结果与给定超时时间获取加锁结果)
- /**
- * 尝试获取锁(立即返回)
- * @param key 锁的 Redis key
- * @param value 锁的 value
- * @param expire 过期时间 / 秒
- * @return 是否获取成功
- */
- public boolean lock(String key, String value, long expire) {
- return stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS);
- }
- /**
- * 尝试获取锁, 并至多等待 timeout 时长
- *
- * @param key 锁的 Redis key
- * @param value 锁的 value
- * @param expire 过期时间 / 秒
- * @param timeout 超时时长
- * @param unit 时间单位
- * @return 是否获取成功
- */
- public boolean lock(String key, String value, long expire, long timeout, TimeUnit unit) {
- long waitMillis = unit.toMillis(timeout);
- long waitAlready = 0;
- while (!stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS) && waitAlready <waitMillis) {
- try {
- Thread.sleep(waitMillisPer);
- } catch (InterruptedException e) {
- log.error("Interrupted when trying to get a lock. key: {}", key, e);
- }
- waitAlready += waitMillisPer;
- }
- if (waitAlready < waitMillis) {
- return true;
- }
- log.warn("<====== lock {} failed after waiting for {} ms", key, waitAlready);
- return false;
- }
上述实现如何满足前面提到的几点要求:
客户端互斥: 可以将 expire 过期时间设置为大于同步代码的执行时间, 比如同步代码块执行时间为 1s, 则可将 expire 设置为 3s 或 5s. 避免同步代码执行过程中 expire 时间到, 其它客户端又可以获取锁执行同步代码块.
通过设置过期时间 expire 来避免某个客户端长期占有锁.
通过 value 来控制谁加的锁, 由谁解的逻辑, 比如可以使用 requestId 作为 value,requestId 唯一标记一次请求.
setIfAbsent 方法 底层通过调用 Redis 的 SETNX 命令, 操作具备原子性.
错误示例:
网上有如下实现,
- public boolean lock(String key, String value, long expire) {
- boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
- if(result) {
- stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS);
- }
- return result;
- }
该实现的问题是如果在 result 为 true, 但还没成功设置 expire 时, 程序异常退出了, 将导致该锁一直被占用而导致死锁, 不满足第二点要求.
2. 解锁实现
解锁也需要满足前面所述的四个要求, 实现代码如下:
- private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- private static final Long RELEASE_LOCK_SUCCESS_RESULT = 1L;
- /**
- * 释放锁
- * @param key 锁的 Redis key
- * @param value 锁的 value
- */
- public boolean unLock(String key, String value) {
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class);
- long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
- return Objects.equals(result, RELEASE_LOCK_SUCCESS_RESULT);
- }
这段实现使用一个 Lua 脚本来实现解锁操作, 保证操作的原子性. 传入的 value 值需与该线程加锁时的 value 一致, 可以使用 requestId(具体实现下面给出).
错误示例:
- public boolean unLock(String key, String value) {
- String oldValue = stringRedisTemplate.opsForValue().get(key);
- if(value.equals(oldValue)) {
- stringRedisTemplate.delete(key);
- }
- }
该实现先获取锁的当前值, 判断两值相等则删除. 考虑一种极端情况, 如果在判断为 true 时, 刚好该锁过期时间到, 另一个客户端加锁成功, 则接下来的 delete 将不管三七二十一将别人加的锁直接删掉了, 不满足第三点要求. 该示例主要是因为没有保证解锁操作的原子性导致.
3. 注解支持
为了方便使用, 添加一个注解, 可以放于方法上控制方法在分布式环境中的同步执行.
- /**
- * 标注在方法上的分布式锁注解
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.METHOD)
- public @interface DistributedLockable {
- String key();
- String prefix() default "disLock:";
- long expire() default 10L; // 默认 10s 过期
- }
添加一个切面来解析注解的处理,
- /**
- * 分布式锁注解处理切面
- */
- @Aspect
- @Slf4j
- public class DistributedLockAspect {
- private DistributedLock lock;
- public DistributedLockAspect(DistributedLock lock) {
- this.lock = lock;
- }
- /**
- * 在方法上执行同步锁
- */
- @Around(value = "@annotation(lockable)")
- public Object distLock(ProceedingJoinPoint point, DistributedLockable lockable) throws Throwable {
- boolean locked = false;
- String key = lockable.prefix() + lockable.key();
- try {
- locked = lock.lock(key, webUtil.getRequestId(), lockable.expire());
- if(locked) {
- return point.proceed();
- } else {
- log.info("Did not get a lock for key {}", key);
- return null;
- }
- } catch (Exception e) {
- throw e;
- } finally {
- if(locked) {
- if(!lock.unLock(key, WebUtil.getRequestId())){
- log.warn("Unlock {} failed, maybe locked by another client already.", lockable.key());
- }
- }
- }
- }
- }
RequestId 的实现如下, 通过注册一个 Filter, 在请求开始时生成一个 uuid 存于 ThreadLocal 中, 在请求返回时清除.
- public class WebUtil {
- public static final String REQ_ID_HEADER = "Req-Id";
- private static final ThreadLocal<String> reqIdThreadLocal = new ThreadLocal<>();
- public static void setRequestId(String requestId) {
- reqIdThreadLocal.set(requestId);
- }
- public static String getRequestId(){
- String requestId = reqIdThreadLocal.get();
- if(requestId == null) {
- requestId = ObjectId.next();
- reqIdThreadLocal.set(requestId);
- }
- return requestId;
- }
- public static void removeRequestId() {
- reqIdThreadLocal.remove();
- }
- }
- public class RequestIdFilter implements Filter {
- @Override
- public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
- HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
- String reqId = httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER);
- // 没有则生成一个
- if (StringUtils.isEmpty(reqId)) {
- reqId = ObjectId.next();
- }
- WebUtil.setRequestId(reqId);
- try {
- filterChain.doFilter(servletRequest, servletResponse);
- } finally {
- WebUtil.removeRequestId();
- }
- }
- }
- // 在配置类中注册 Filter
- /**
- * 添加 RequestId
- * @return
- */
- @Bean
- public FilterRegistrationBean requestIdFilter() {
- RequestIdFilter reqestIdFilter = new RequestIdFilter();
- FilterRegistrationBean registrationBean = new FilterRegistrationBean();
- registrationBean.setFilter(reqestIdFilter);
- List<String> urlPatterns = Collections.singletonList("/*");
- registrationBean.setUrlPatterns(urlPatterns);
- registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE + 1);
- return registrationBean;
- }
4. 使用注解
- @DistributedLockable(key = "test", expire = 10)
- public void test(){
- System.out.println("线程 -"+Thread.currentThread().getName()+"开始执行..." + LocalDateTime.now());
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("线程 -"+Thread.currentThread().getName()+"结束执行..." + LocalDateTime.now());
- }
总结
来源: https://www.cnblogs.com/spec-dog/p/13320930.html