一, 简介
锁的概念, 在 Java 日常开发和面试中, 都是个很重要的知识点. 锁能很好的控制生产数据的安全性, 比如商品的数量超卖问题等. 传统的做法中, 可以直接利用数据库锁 (行锁或者表锁) 来进行数据访问控制. 随着请求量逐步变多的情况下, 将压力怼到数据库上会对其性能产生极大影响. 这时候, 单体应用中可以利用 JVM 锁, 在程序层面进行访问的控制, 将压力前移, 对数据库友好. 当请求量再进一步变多, 这时候一般会考虑集群分布式去处理, 不断的加机器来抗压. 这时候, JVM 锁就不能很好的控制压力了, 同一时刻还是会有大量请求怼到数据库上, 这时就需要提升为分布式锁去控制了, 将压力继续停留在程序层面.
Java 的面向接口编程, 可以很好很快的去切换实现而不需要动业务代码部分. 下面, 基于 Lock 接口去使用锁.
二, JVM 锁
基于 ReentrantLock 实现锁控制, 业务控制层 service 部分代码如下, 用 lock 锁去控制并发访问
- package com.cfang.service;
- import java.sql.Time;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Isolation;
- import org.springframework.transaction.annotation.Transactional;
- import com.cfang.dao.ProductDao;
- import lombok.extern.slf4j.Slf4j;
- @Service
- @Slf4j
- @Scope("prototype")
- public class ProductWithLockService {
- private Lock lock = new ReentrantLock();
- @Autowired
- private ProductDao productDao;
- @Transactional
- public boolean buy(String userName, String productname, int number) {
- boolean result = false;
- try {
- lock.lock();
- // TimeUnit.SECONDS.sleep(1);
- log.info("用户 {} 欲购买 {} 个{}", userName, number, productname);
- int stock = productDao.getStock(productname);
- log.info("{} 查询数量{}...", userName, stock);
- if(stock <number) {
- log.warn("库存不足...");
- return false;
- }
- result = productDao.buy(userName, productname, number);
- } catch (Exception e) {
- } finally {
- log.info("{} 释放锁...", userName);
- lock.unlock();
- }
- log.info("{}购买结果,{}",userName, result);
- return result;
- }
- }
在单体应用中, 这样子使用是可以的, 但是当应用部署多套的时候, 那么, 就不能很好的保障并发控制了, 同一时刻的请求可能会大量打到数据库上. 所以, 这就引入下面的分布式锁去控制了.
三, 基于 ZooKeeper 的分布式锁
首先, 锁获取释放的工具类:
- package com.cfang.zkLockUtil;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
- import org.I0Itec.zkclient.exception.ZkNodeExistsException;
- import org.apache.commons.lang3.StringUtils;
- import com.cfang.zkClient.MyZkSerializer;
- import lombok.extern.slf4j.Slf4j;
- @Slf4j
- public class ZkLockUtil implements Lock{
- private String znode;
- private ZkClient zkClient;
- public ZkLockUtil(String znode) {
- if(StringUtils.isBlank(znode)) {
- throw new IllegalArgumentException("锁节点 znode 不能为空字符串");
- }
- this.znode = znode;
- this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183");
- this.zkClient.setZkSerializer(new MyZkSerializer());
- }
- @Override
- public void lock() {
- if(!tryLock()) { // 抢锁失败
- // 阻塞等待锁节点的释放
- waitLock();
- // 递归调用, 重新尝试去抢占锁
- lock();
- }
- }
- private void waitLock() {
- CountDownLatch latch = new CountDownLatch(1);
- // 注册监听 znode 锁节点变化, 当删除的时候, 说明锁被释放
- IZkDataListener listener = new IZkDataListener() {
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- log.info("znode 节点被删除, 锁释放...");
- latch.countDown();
- }
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
- };
- this.zkClient.subscribeDataChanges(this.znode, listener);
- try {
- // 阻塞等待锁 znode 节点的删除释放
- if(this.zkClient.exists(znode)) {
- latch.await();
- }
- } catch (Exception e) {
- }
- // 取消 znode 节点监听
- this.zkClient.unsubscribeDataChanges(this.znode, listener);
- }
- @Override
- public boolean tryLock() {
- boolean result = false;
- try {
- this.zkClient.createEphemeral(znode); // 创建临时节点
- result = true;
- } catch (ZkNodeExistsException e) {
- log.warn("锁节点 znode 已存在, 抢占失败...");
- result = false;
- } catch (Exception e) {
- log.warn("创建锁节点 znode 异常,{}...", e.getMessage());
- }
- return result;
- }
- @Override
- public void unlock() {
- zkClient.delete(znode);
- }
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- // TODO Auto-generated method stub
- return false;
- }
- @Override
- public void lockInterruptibly() throws InterruptedException {
- // TODO Auto-generated method stub
- }
- @Override
- public Condition newCondition() {
- // TODO Auto-generated method stub
- return null;
- }
- }
业务控制 service 中, 就是将基本的 JVM 锁的 service 中, Lock 的实现更换即可:
private Lock lock = new ZkLockUtil("/p1node");
当程序运行中, 所有的请求会去争抢创建 zk 节点, 谁创建成功, 则就获得锁资源, 继续执行业务代码. 其他所有线程基于递归等待, 等待 zk 节点的删除, 然后再去尝试争抢创建. 达到控制并发的目的.
但是, 这种但是有个不好的地方, 也就是, 当一个锁释放后, 所有的线程都会一下子全去争抢, 每次都是轮回这样哄抢的过程, 会有一定的压力, 也不必如此. 所以, 下面基于 zk 永久节点下临时顺序节点做点改善, 每个线程节点, 只需要关注前面一个节点变化即可, 不需要造成哄抢事件.
四, ZooKeeper 的分布式锁提高版
锁获取释放的工具类:
- package com.cfang.zkLockUtil;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
- import org.apache.commons.lang3.StringUtils;
- import com.cfang.zkClient.MyZkSerializer;
- import lombok.extern.slf4j.Slf4j;
- @Slf4j
- public class ZKLockImproveUtil implements Lock{
- private String znode;
- private ZkClient zkClient;
- private ThreadLocal<String> currentNode = new ThreadLocal<String>(); // 当前节点
- private ThreadLocal<String> beforeNode = new ThreadLocal<String>(); // 前一个节点
- public ZKLockImproveUtil(String znode) {
- if(StringUtils.isBlank(znode)) {
- throw new IllegalArgumentException("锁节点 znode 不能为空字符串");
- }
- this.znode = znode;
- this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183");
- this.zkClient.setZkSerializer(new MyZkSerializer());
- try {
- if(!this.zkClient.exists(znode)) {
- this.zkClient.createPersistent(znode, true); // true 是否创建层级目录
- }
- } catch (Exception e) {
- }
- }
- @Override
- public void lock() {
- if(!tryLock()) {
- waitLock();
- lock();
- }
- }
- private void waitLock() {
- CountDownLatch latch = new CountDownLatch(1);
- IZkDataListener listener = new IZkDataListener() {
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- log.info("{}节点删除, 锁释放...", dataPath);
- latch.countDown();
- }
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
- };
- this.zkClient.subscribeDataChanges(beforeNode.get(), listener);
- try {
- if(this.zkClient.exists(beforeNode.get())) {
- latch.await();
- }
- } catch (Exception e) {
- }
- this.zkClient.unsubscribeDataChanges(beforeNode.get(), listener);
- }
- @Override
- public boolean tryLock() {
- boolean result = false;
- // 创建顺序临时节点
- if(null == currentNode.get() || !this.zkClient.exists(currentNode.get())) {
- String enode = this.zkClient.createEphemeralSequential(znode + "/", "zk-locked");
- this.currentNode.set(enode);
- }
- // 获取 znode 节点下的所有子节点
- List<String> list = this.zkClient.getChildren(znode);
- Collections.sort(list);
- /**
- * 如果当前节点是第一个的话, 则是为获取锁, 继续执行
- * 不是头结点的话, 则去查询其前面一个节点, 然后准备监听前一个节点的删除释放操作
- */
- if(currentNode.get().equals(this.znode + "/" + list.get(0))) {
- log.info("{}节点为头结点, 获得锁...", currentNode.get());
- result = true;
- } else {
- int currentIndex = list.indexOf(currentNode.get().substring(this.znode.length() + 1));
- String bnode = this.znode + "/" + list.get(currentIndex - 1);
- this.beforeNode.set(bnode);
- }
- return result;
- }
- @Override
- public void unlock() {
- if(null != this.currentNode) {
- this.zkClient.delete(currentNode.get());
- this.currentNode.set(null);
- }
- }
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- // TODO Auto-generated method stub
- return false;
- }
- @Override
- public void lockInterruptibly() throws InterruptedException {
- // TODO Auto-generated method stub
- }
- @Override
- public Condition newCondition() {
- // TODO Auto-generated method stub
- return null;
- }
- }
service 中更换实现:
private Lock lock = new ZKLockImproveUtil("/pnode");
五, 小结
主要是学习测试使用, 并未考虑到生产实际的问题, 比如 如果业务处理中假死状态, 导致 zk 不释放锁, 那么就会导致死锁问题(可以对锁节点来个有效期处理).
上述为部分代码片段, 整体工程可以在 GitHub 上获取, 地址: https://github.com/qiuhan00/zkLock
来源: https://www.cnblogs.com/eric-fang/p/11837194.html