前提, 应用服务是分布式或多服务, 而这些"多"有共同的"redis";
GitHub: https://github.com/vergilyn/SpringBootDemo
代码结构:
(具体实现思路参考: 分布式锁的实现、如何用消息系统避免分布式事务?)
1、基于数据库
可以用数据库的行锁
, 或专门新建一张
- for update
来实现. 过于依赖数据库, 且健壮性也不是特别好, 完全可以把此种方案舍弃. (话说都涉及到分布式或多服务器,基本主要还是用redis、memcached或其他缓存服务实现并发锁) 2、基于ZooKeeper实现分布式锁 并未去研究, 参考上面的博客链接. 3、基于redis实现 redis实现的复杂度不算高, 只是需要注意一些实现细节. 健壮性貌似只比zookeeper差点, 但完全可接受. (ps: 感觉redis官方jar中完全可以提供实现, 我们直接用就可以, 是我没找到吗?)
- 锁控制表
1、主要的redis核心命令: 利用redis是单线程的特性, 用setnx、getset、time来实现.
2、思路: redis的key-value就代表一个对象锁, 当此key存在说明锁已被获取, 其余相同对象操作则需要等待获取锁.
3、需要注意的细节:
1) 锁的释放, 要特别避免死锁出现, 主要是特殊情况下如何释放锁.
2) 等待获取锁的线程, 最好有超时机制.
3) 注意多服务器之间的时间是否同步.
4) 注意获取锁操作别占用或创建太多的连接(即使及时关闭了连接), 很影响系统的性能.
- /**
- * 锁的策略参考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基于redis分布式锁实现“秒杀”</a>
- * FIXME 此方式加锁策略存在一定缺陷: 在setIfAbsent()之后expire()执行之前程序异常 锁不会被释放. 虽然出现几率极低
- *
- * @param timeout timeout的时间范围内轮询锁, 单位: 秒
- * @param expire 设置锁超时时间
- * @return true, 获取锁成功; false, 获取锁失败.
- */
- public boolean lock(long timeout, long expire, final TimeUnit unit) {
- long beginTime = System.nanoTime(); // 用nanos、mills具体看需求.
- timeout = TimeUnit.SECONDS.toNanos(timeout);
- try {
- // 在timeout的时间范围内不断轮询锁
- while (System.nanoTime() - beginTime < timeout) {
- // 锁不存在的话,设置锁并设置锁过期时间,即加锁
- if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) {
- this.redisClient.expire(key, expire, unit); //设置锁失效时间, 防止永久阻塞
- this.lock = true;
- return true;
- }
- // 短暂休眠后轮询,避免可能的活锁
- System.out.println("get lock waiting...");
- Thread.sleep(30, RANDOM.nextInt(30));
- }
- } catch(Exception e) {
- throw new RuntimeException("locking error", e);
- }
- return false;
- }
以上锁策略已经很完美, 1) 指定了获取锁的超时时间; 2) 设置了锁的失效, 防止永久阻塞;
但可能有极端情况, 即
成功,
- setIfAbsent()
执行之前, 如果出现异常情况, 导致
- expire()
没有执行, 所以此时会出现永久阻塞. (道理是很难遇到这情况)
- expire()
- /**
- * 特别注意: 如果多服务器之间存在时间差, 并不建议用System.nanoTime()、System.currentTimeMillis().
- * 更好的是统一用redis-server的时间, 但只能获取到milliseconds.
- * 锁的策略参考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis实现分布式锁</a>
- *
- * @param timeout 获取锁超时, 单位: 毫秒
- * @param expire 锁失效时常, 单位: 毫秒
- * @return true, 获取锁成功; false, 获取锁失败.
- */
- public boolean lockB(long timeout, long expire) {
- long bt = System.currentTimeMillis();
- long lockVal;
- String lockExpireTime;
- try {
- while (!this.lock) {
- if (System.currentTimeMillis() - bt > timeout) {
- throw new RedisLockException("get lock timeout!");
- }
- // 锁的键值: {当前时间} + {失效时常} = {锁失效时间}
- lockVal = getRedisTime() + expire;
- // 1. 尝试获取锁
- boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + "");
- if (ifAbsent) { // 设置成功, 表示获得锁
- // 这种策略下, 是否设置key失效不太重要. 因为, 正常流程中最后会释放锁(del-key); 如果是异常情况下未释放锁, 后面的代码也会判断锁是否失效.
- // 设置的好处: 能减少redis的内存消耗, 及时清理无效的key(暂时只想到这)
- // this.redisClient.expire(key, timeout, TimeUnit.SECONDS);
- this.lock = true;
- return true;
- }
- lockExpireTime = this.redisClient.opsForValue().get(this.key);
- long curTime = getRedisTime();
- // curTime > expireVal: 表示此锁已无效
- /* 在锁无效的前提下, 尝试获取锁: (一定要用)getAndSet()
- *
- * 假设锁已失效, 且未正常expire. 此时C1、C2同时执行到此, C2先执行getAndSet(key, time-02), C2获取到锁
- * 此时C1.getAndSet(key, time-01)返回的是time-02, 显然curTime > time-02: false.
- * 所以, C1并未获取到锁. 但C1修改了key的值为: time-01.
- * 但因为C1、C2是同时执行到此, 所以time-01、time-02的值近视相等.
- * (若多服务器存在时间差, 那这个差值有问题, 所以服务器时间如果不同步则不能用System.nanoTime()、System.currentTimeMillis(), 该用redis-server time.)
- */
- if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
- // getset必须在{curTime > expireVal} 判断之后; 否则, 可能出现死循环
- lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + "");
- if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
- // this.redisClient.expire(key, timeout, TimeUnit.SECONDS); // 是否设置失效不重要, 理由同上.
- this.lock = true;
- return true;
- }
- }
- // 锁被占用, 短暂休眠等待轮询
- System.out.println(this + ": get lock waiting...");
- Thread.sleep(40);
- }
- } catch(Exception e) {
- e.printStackTrace();
- throw new RedisLockException("locking error", e);
- }
- System.out.println(this + ": get lock error.");
- return false;
- }
此种锁策略特别要注意:
1) 如果多服务器之间时间不同步, 那么可以用redis-server的时间.
2) getset的调用必须在
的前提下, 否则会出现死循环. 3) 并发时getset产生的误差, 完全可忽略. 4) 特别要注意redis连接的释放, 否则很容易占用过多的redis连接数.
- curTime > lockExpireTime
- public class RedisLock {
- private String key;
- private boolean lock = false;
- private final StringRedisTemplate redisClient;
- private final RedisConnection redisConnection;
- /**
- * @param purpose 锁前缀
- * @param key 锁定的ID等东西
- */
- public RedisLock(String purpose, String key, StringRedisTemplate redisClient) {
- if (redisClient == null) {
- throw new IllegalArgumentException("redisClient 不能为null!");
- }
- this.key = purpose + "_" + key + "_redis_lock";
- this.redisClient = redisClient;
- this.redisConnection = redisClient.getConnectionFactory().getConnection();
- }
- /**
- * 锁的策略参考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基于redis分布式锁实现“秒杀”</a>
- * FIXME 此方式加锁策略存在一定缺陷: 在setIfAbsent()之后expire()执行之前程序异常 锁不会被释放. 虽然出现几率极低
- *
- * @param timeout timeout的时间范围内轮询锁, 单位: 秒
- * @param expire 设置锁超时时间
- * @return true, 获取锁成功; false, 获取锁失败.
- */
- public boolean lockA(long timeout, long expire, final TimeUnit unit) {
- long beginTime = System.nanoTime(); // 用nanos、mills具体看需求.
- timeout = unit.toNanos(timeout);
- try {
- // 在timeout的时间范围内不断轮询锁
- while (System.nanoTime() - beginTime < timeout) {
- // 锁不存在的话,设置锁并设置锁过期时间,即加锁
- if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) {
- this.redisClient.expire(key, expire, unit); //设置锁失效时间, 防止永久阻塞
- this.lock = true;
- return true;
- }
- // 短暂休眠后轮询,避免可能的活锁
- System.out.println("get lock waiting...");
- Thread.sleep(30);
- }
- } catch(Exception e) {
- throw new RedisLockException("locking error", e);
- }
- return false;
- }
- /**
- * 特别注意: 如果多服务器之间存在时间差, 并不建议用System.nanoTime()、System.currentTimeMillis().
- * 更好的是统一用redis-server的时间, 但只能获取到milliseconds.
- * 锁的策略参考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis实现分布式锁</a>
- *
- * @param timeout 获取锁超时, 单位: 毫秒
- * @param expire 锁失效时常, 单位: 毫秒
- * @return true, 获取锁成功; false, 获取锁失败.
- */
- public boolean lockB(long timeout, long expire) {
- long bt = System.currentTimeMillis();
- long lockVal;
- String lockExpireTime;
- try {
- while (!this.lock) {
- if (System.currentTimeMillis() - bt > timeout) {
- throw new RedisLockException("get lock timeout!");
- }
- // 锁的键值: {当前时间} + {失效时常} = {锁失效时间}
- lockVal = getRedisTime() + expire;
- // 1. 尝试获取锁
- boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + "");
- if (ifAbsent) { // 设置成功, 表示获得锁
- this.lock = true;
- return true;
- }
- lockExpireTime = this.redisClient.opsForValue().get(this.key);
- long curTime = getRedisTime();
- if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
- lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + "");
- if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
- this.lock = true;
- return true;
- }
- }
- // 锁被占用, 短暂休眠等待轮询
- System.out.println(this + ": get lock waiting...");
- Thread.sleep(40);
- }
- } catch(Exception e) {
- e.printStackTrace();
- throw new RedisLockException("locking error", e);
- }
- System.out.println(this + ": get lock error.");
- return false;
- }
- /**
- * @return current redis-server time in milliseconds.
- */
- private long getRedisTime() {
- return this.redisConnection.time();
- }
- private void closeConnection() {
- if (!this.redisConnection.isClosed()) {
- this.redisConnection.close();
- }
- }
- /** 释放锁 */
- public void unlock() {
- if (this.lock) {
- redisClient.delete(key);
- }
- }
- public boolean isLock() {
- return lock;
- }
- }
- @Target(ElementType.PARAMETER)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface RedisLockedKey {
- /**
- * 复杂对象中需要加锁的成员变量
- */
- String field() default "";
- }
- @Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documented public@interface RedisDistributedLock {
- /** 锁key的前缀 */
- String lockedPrefix()
- default "";
- /** 轮询锁的时间超时时常, 单位: ms */
- long timeout()
- default 2000;
- /** redis-key失效时常, 单位: ms */
- int expireTime()
- default 1000;
- }
- @Component@Aspect public class RedisDistributedLockAop {@Autowired private StringRedisTemplate redisTemplate;
- /**
- * 定义缓存逻辑
- */
- @Around("@annotation(com.vergilyn.demo.springboot.distributed.lock.annotation.RedisDistributedLock)") public void cache(ProceedingJoinPoint pjp) {
- Method method = getMethod(pjp);
- RedisDistributedLock cacheLock = method.getAnnotation(RedisDistributedLock.class);
- String key = getRedisKey(method.getParameterAnnotations(), pjp.getArgs());
- RedisLock redisLock = new RedisLock(cacheLock.lockedPrefix(), key, redisTemplate);
- // boolean isLock = redisLock.lockB(cacheLock.timeout(), cacheLock.expireTime());
- boolean isLock = redisLock.lockA(cacheLock.timeout(), cacheLock.expireTime(), TimeUnit.MILLISECONDS);
- if (isLock) {
- try {
- pjp.proceed();
- return;
- } catch(Throwable e) {
- e.printStackTrace();
- } finally {
- redisLock.unlock();
- }
- }
- System.out.println("执行方法失败");
- }
- /**
- * 获取被拦截的方法对象
- */
- private Method getMethod(ProceedingJoinPoint pjp) {
- Object[] args = pjp.getArgs();
- Class[] argTypes = new Class[pjp.getArgs().length];
- for (int i = 0; i < args.length; i++) {
- argTypes[i] = args[i].getClass();
- }
- Method method = null;
- try {
- method = pjp.getTarget().getClass().getMethod(pjp.getSignature().getName(), argTypes);
- } catch(NoSuchMethodException | SecurityException e) {
- e.printStackTrace();
- }
- return method;
- }
- private String getRedisKey(Annotation[][] annotations, Object[] args) {
- if (null == args || args.length == 0) {
- throw new RedisLockException("方法参数为空,没有被锁定的对象");
- }
- if (null == annotations || annotations.length == 0) {
- throw new RedisLockException("没有被注解的参数");
- }
- // 只支持第一个注解为RedisLockedKey的参数
- for (int i = 0; i < annotations.length; i++) {
- for (int j = 0; j < annotations[i].length; j++) {
- if (annotations[i][j] instanceof RedisLockedKey) { //注解为LockedComplexObject
- RedisLockedKey redisLockedKey = (RedisLockedKey) annotations[i][j];
- String field = redisLockedKey.field();
- try {
- // field存在, 表示取参数对象的相应field;
- if (StringUtils.isBlank(field)) {
- return args[i].toString();
- } else {
- return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString();
- }
- } catch(NoSuchFieldException | SecurityException e) {
- e.printStackTrace();
- throw new RedisLockException("注解对象中不存在属性: " + redisLockedKey.field());
- }
- }
- }
- }
- throw new RedisLockException("未找到注解对象!");
- }
- }
- public class RedisLockException extends RuntimeException {
- public RedisLockException(String msg, Throwable throwable) {
- super(msg, throwable);
- }
- public RedisLockException(String msg) {
- super(msg);
- }
- }
- #### 视情况调整
- # 部分redis配置
- spring.redis.database=0
- spring.redis.host=127.0.0.1
- # spring.redis.password=
- spring.redis.port=6379
- # 连接池最大连接数(使用负值表示没有限制)
- spring.redis.pool.max-active=1
- spring.redis.pool.max-wait=-1
- # 连接池中的最大空闲连接
- spring.redis.pool.max-idle=4
- # 连接池中的最小空闲连接
- spring.redis.pool.min-idle=0
- spring.redis.timeout=2000
- @SpringBootApplication
- @EnableCaching
- public class DistributedLockApplication implements CommandLineRunner{
- @Autowired
- StringRedisTemplate redisTemplate;
- @Autowired
- LockService lockService;
- @Autowired
- ThreadPoolTaskExecutor executor;
- @Bean
- public ThreadPoolTaskExecutor myExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- // 核心线程数
- executor.setCorePoolSize(8);
- // 最大线程数
- executor.setMaxPoolSize(12);
- // 运行线程满时,等待队列的大小
- executor.setQueueCapacity(1000);
- executor.setThreadNamePrefix("vl-thread-");
- // 池和队列满的策略
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- // 空闲线程清除时间
- executor.setKeepAliveSeconds(60);
- // 是否允许释放核心线程
- executor.setAllowCoreThreadTimeOut(true);
- executor.initialize();
- return executor;
- }
- public static void main(String[] args) {
- SpringApplication application = new SpringApplication(DistributedLockApplication.class);
- application.setAdditionalProfiles("redis");
- application.run(args);
- }
- @Override
- public void run(String... args) throws Exception {
- System.out.println("run....");
- for (int i = 0; i < 2; i++) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- // lockService.lockMethod(new LockBean(1L));
- lockService.lockMethod("arg1", 1L);
- }
- });
- }
- System.out.println(executor.getThreadPoolExecutor().getTaskCount());
- }
- }
public interface LockService { public void lockMethod(String arg1, Long arg2); public void lockMethod(LockBean lockBean); }
@Service public class LockServiceImpl implements LockService { public static Map<Long, Integer> goods; static{ goods = new HashMap<>(); goods.put(1L, 100); goods.put(2L, 200); } @Override @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void lockMethod(String arg1, @RedisLockedKey Long arg2) { //最简单的秒杀,这里仅作为demo示例 System.out.println("lockMethod, goods: " + reduceInventory(arg2)); } @Override @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void lockMethod(@RedisLockedKey(field = "idic")LockBean lockBean) { System.out.println("lockMethod bean, goods: " + reduceInventory(lockBean.getIdic())); } // 模拟秒杀操作,姑且认为一个秒杀就是将库存减一 private Integer reduceInventory(Long commodityId){ goods.put(commodityId, goods.get(commodityId) - 1); return goods.get(commodityId); } }
public class LockBean { private Long idic; public LockBean() {} public LockBean(long idic) { this.idic = idic; } public Long getIdic() { return idic; } public void setIdic(Long idic) { this.idic = idic; } }
以上只是简单实现代码, 如果用于实际项目中, 以上代码存在很多性能问题, 具体性能问题:
1) 太频繁的获取redis连接、关闭连接.
lockA: 每次while必定有一次
setIfAbsent, 可能会有
expire, 然后释放锁有
delete. 所以一次正常的流程就需要3个连接. 如果是并发同时竞争等待获取锁, 那么性能影响也蛮大的.
lockB: 这种策略要用到的连接更多, 并且如果是
this.redisClient.getConnectionFactory().getConnection().time()还要注意要手动释放这个连接. 针对此问题, (个人)想到的可能的代码改进方案, 每个
RedisLock中用一个
redisConnection, 把所有的
StringRedisTemplate命令换成更底层的
redisConnection命令:
public class RedisLock { private String key; private boolean lock = false; private final RedisConnection redisConnection; public RedisLock(String purpose, String key, RedisConnection redisConnection) { if (redisConnection == null) { throw new IllegalArgumentException("redisConnection 不能为null!"); } this.key = purpose + "_" + key + "_redis_lock"; this.redisConnection = redisConnection; } public boolean lockAc(long timeout, long expire, final TimeUnit unit) { long beginTime = System.nanoTime(); timeout = unit.toNanos(timeout); try { while (System.nanoTime() - beginTime < timeout) { if (this.redisConnection.setNX(this.key.getBytes(), "1".getBytes())) { this.redisConnection.expire(key.getBytes(), unit.toSeconds(expire)); this.lock = true; return true; } System.out.println("lockAc get lock waiting..."); Thread.sleep(30); } } catch(Exception e) { throw new RedisLockException("locking error", e); } return false; } private long getRedisTime() { return this.redisConnection.time(); } private void closeConnection() { if (!this.redisConnection.isClosed()) { this.redisConnection.close(); } } public void unlock() { if (this.lock) { this.redisConnection.delete(key); } closeConnection(); // 用完一定要关闭, 这个位置不一定好, 可能在Aop调用unlock的finally处更好 } public boolean isLock() { return lock; } }
以上改进代码依然可能存在的问题:
1) 连接很可能没有正常关闭.
2) 连接依然过多, 假设并发有1000个, 那一样会产生1000个连接, 且这些连接只会在竞争获取锁完后才会释放.(且产生了1000个RedisLock对象)
3) 是否可以缓存注解对象
针对问题
2), 主要想达到怎么尽可能减少redis连接? 比如: 有1000个并发, 其中200个是兑换商品A, 其中300个是兑换商品B, 其中500个是兑换商品C.
1、是否可以用
单例模式来实现
RedisLock? 对
单例、
多线程还是很混乱, 不好说. 但如果可行, 会否太影响获取锁的性能? 比如兑换商品A的200个并发共用一个redisConnection, 感觉还是合理的, 毕竟互相之间是竞争关系. 但商品A、商品B、商品C如果也共用一个redisConnection, 是不是完全不合理? 他们之间根本是"并行"的, 相互之间没有一点联系. 2、所以, 是否更进一步的实现是: 同一个锁竞争用相同的
RedisLock对象和
RedisConnection连接. 即竞争商品A的200个并发用同一个"redisConnection_A"、"redisLock_A", 商品B的300个并发用同一个"redisConnection_B"、"redisLock_B"?
针对问题
3), 在代码
RedisDistributedLockAop中, 每次都会: 1)
getMethod(pjp): 获取拦截方法. 2) 通过拦截方法解析出
getRedisKey.
method, 且因为同一个方法, 所能取
field也是一样的. 比如, 有一下几个方法都需要用到分布式并发锁:
@RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void a(String arg1, @RedisLockedKey Long arg2) { // ... } @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void b(@RedisLockedKey(field = "idic")LockBean lockBean) { // ... } @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void c(@RedisLockedKey(field = "xx")LockBean lockBean) { // ... }
// key: 完整方法名, 要唯一正确找到; value: 缓存的method Map < String, Method > methodCache = new HashMap < >; methodCache.put("com.service.aa.a()", method); methodCache.put("com.service.aa.b()", method); methodCache.put("com.service.bb.b()", method); // 然后, 同一个方法的注解内容相同, 所以完全可以直接调用, 省略RedisLockedKey的逻辑判断 if (StringUtils.isBlank(field)) { return args[i].toString(); } else { return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString(); }
以上只是自己的构想, 这些构想的可行性, 代码的具体实现还很难说...
但现在觉得, 基于redis的分布式并发锁完全可以有位大神写个开源的实现啊, 我们这些小白直接拿到系统中用就好了. 我也不知道怎么去找, 不知道github有没有.
来源: http://www.bubuko.com/infodetail-2416546.html