因为在项目实际过程中所采用的是微服务架构, 考虑到承载量基本每个相同业务的服务都是多节点部署, 所以针对某些资源的访问就不得不用到用到分布式锁了.
这里列举一个最简单的场景, 假如有一个智能售货机, 由于机器本身的原因不能同一台机器不能同时出两个商品, 这就要求在在出货流程前针对同一台机器在同一时刻出现并发
创建订单时只能有一笔订单创建成功, 但是订单服务是多节点部署的, 所以就不得不用到分布式锁了.
以上只是一种简单的业务场景, 在各种大型互联网实际应用中, 需要分布式锁的业务场景会更多, 综合比较了业界基于各种中间件来实现的分布式锁方案, 然后结合实际业务最终
决定采用 consul 来实现, 因为我们的项目中采用了 consul 做注册中心, 并且 consul 天生可以保证一致性 (这点类似 zk), 当然 zk 也能实现分布式锁, 但是这里不对这点做过多讨论.
Redis 虽然也能实现分布式锁, 但是可能因为场景比较复杂, 如果 Redis 采用 cluster 部署的话, 如果某一主节点出现故障的话, 有一定几率会出现脑裂现象, 这样就可能会让竞争者在
并发时同时获得到锁, 这样可能会破坏掉后面的业务, 当然出现这种情况的概率很低, 但是也不能完全排除, 因为 Redis 的根本不能保证强一致性导致的.
好了, 这里说的最简单的分布式锁的意思是, 多个竞争者同一时间并发去获得锁时, 获取失败的就直接返回了, 获取成功的继续后续的流程, 然后在合适的时间释放锁, 并且为锁
加了超时时间, 防止获得到锁的进程或线程在未来得及释放锁时自己挂掉了, 导致资源处于一直被锁定的状态无法得到释放. 主要的实现逻辑就是这样, 如果有人想实现获得锁失
败的竞争者一直继续尝试获得, 可以基于该示例进行修改, 加上自旋逻辑就 OK.
以下是锁实现代码:
- package com.lyb.consullock;
- import com.ecwid.consul.v1.ConsulClient;
- import com.ecwid.consul.v1.agent.model.NewCheck;
- import com.ecwid.consul.v1.kv.model.PutParams;
- import com.ecwid.consul.v1.session.model.NewSession;
- import com.ecwid.consul.v1.session.model.Session;
- import lombok.Data;
- import java.time.LocalDateTime;
- import java.util.ArrayList;
- import java.util.List;
- public class DistributedLock{
- private ConsulClient consulClient;
- /**
- * 构造函数
- * @param consulHost 注册 consul 的 client 或服务端的 Ip 或主机名, 或域名
- * @param consulPort 端口号
- */
- public DistributedLock(String consulHost,int consulPort){
- consulClient = new ConsulClient(consulHost,consulPort);
- }
- /**
- * 获得锁的方法
- * @param lockName 竞争的资源名
- * @param ttlSeconds 锁的超时时间, 超过该时间自动释放
- * @return
- */
- public LockContext getLock(String lockName,int ttlSeconds){
- LockContext lockContext = new LockContext();
- if(ttlSeconds<10 || ttlSeconds> 86400) ttlSeconds = 60;
- String sessionId = createSession(lockName,ttlSeconds);
- boolean success = lock(lockName,sessionId);
- if(success == false){
- consulClient.sessionDestroy(sessionId,null);
- lockContext.setGetLock(false);
- return lockContext;
- }
- lockContext.setSession(sessionId);
- lockContext.setGetLock(true);
- return lockContext;
- }
- /**
- * 释放锁
- * @param sessionID
- */
- public void releaseLock(String sessionID){
- consulClient.sessionDestroy(sessionID,null);
- }
- private String createSession(String lockName,int ttlSeconds){
- NewCheck check = new NewCheck();
- check.setId("check"+lockName);
- check.setName(check.getId());
- check.setTtl(ttlSeconds+"s"); // 该值和 session ttl 共同决定决定锁定时长
- check.setTimeout("10s");
- consulClient.agentCheckRegister(check);
- consulClient.agentCheckPass(check.getId());
- NewSession session = new NewSession();
- session.setBehavior(Session.Behavior.RELEASE);
- session.setName("session"+lockName);
- session.setLockDelay(1);
- session.setTtl(ttlSeconds + "s"); // 和 check ttl 共同决定锁时长
- List<String> checks = new ArrayList<>();
- checks.add(check.getId());
- session.setChecks(checks);
- String sessionId = consulClient.sessionCreate(session,null).getValue();
- return sessionId;
- }
- private boolean lock(String lockName,String sessionId){
- PutParams putParams = new PutParams();
- putParams.setAcquireSession(sessionId);
- boolean isSuccess = consulClient.setKVValue(lockName,"lock:"+ LocalDateTime.now(),putParams).getValue();
- return isSuccess;
- }
- /**
- * 竞争锁时返回的对象
- */
- @Data
- public class LockContext{
- /**
- * 获得锁成功返回该值, 比便后面用该值来释放锁
- */
- private String session;
- /**
- * 是否获得到锁
- */
- private boolean isGetLock;
- }
- }
pom 文件
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.1.6.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.lyb</groupId>
- <artifactId>consul-lock</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>consul-lock</name>
- <description>Demo project for Spring Boot</description>
- <properties>
- <java.version>1.8</java.version>
- <spring-cloud.version>Greenwich.SR2</spring-cloud.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-consul-discovery</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.8</version>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>${spring-cloud.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
测试代码:
- package com.lyb.consullock;
- import org.junit.Assert;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class ConsulLockApplicationTests {
- @Autowired
- private ServiceConfig serviceConfig;
- @Test
- public void lockSameResourer() {
- // 针对相同资源在同一时刻只有一个线程会获得锁
- ExecutorService threadPool = Executors.newFixedThreadPool(10);
- for (int a=0;a<20;a++){
- threadPool.submit(
- () -> {
- for (int i = 0;i <100; i++) {
- DistributedLock lock = new DistributedLock(
- serviceConfig.getConsulRegisterHost(),
- serviceConfig.getConsulRegisterPort());
- DistributedLock.LockContext lockContext = lock.getLock("test lock", 10);
- if (lockContext.isGetLock()) {
- System.out.println(Thread.currentThread().getName() + "获得了锁");
- try {
- TimeUnit.SECONDS.sleep(1);
- lock.releaseLock(lockContext.getSession());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }else {
- //System.out.println(Thread.currentThread().getName() + "没有获得锁");
- }
- }
- });
- }
- try {
- TimeUnit.MINUTES.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- @Test
- public void lockDiffResource(){
- // 针对不通的资源所有线程都应该能获得锁
- ExecutorService threadPool = Executors.newFixedThreadPool(10);
- for (int a=0;a<20;a++){
- threadPool.submit(
- () -> {
- for (int i = 0;i < 100; i++) {
- DistributedLock lock = new DistributedLock(
- serviceConfig.getConsulRegisterHost(),
- serviceConfig.getConsulRegisterPort());
- DistributedLock.LockContext lockContext = lock.getLock("test lock"+Thread.currentThread().getName(), 10);
- if (lockContext.isGetLock()) {
- System.out.println(Thread.currentThread().getName() + "获得了锁");
- try {
- TimeUnit.SECONDS.sleep(1);
- lock.releaseLock(lockContext.getSession());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }else {
- //System.out.println(Thread.currentThread().getName() + "没有获得锁");
- Assert.assertTrue(lockContext.isGetLock());
- }
- }
- });
- }
- try {
- TimeUnit.MINUTES.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
希望对大家有所帮助
项目路径:
https://github.com/wenwuxianren/consul-lock
来源: https://www.cnblogs.com/wenwuxianren/p/11181786.html