我们日常工作中 (以及面试中) 经常说到的并发问题, 一般都是指进程内的并发问题, JDK 的并发包也是用以解决 JVM 进程内多线程并发问题的工具. 但是, 进程之间, 以及跨服务器进程之间的并发问题, 要如何应对? 这时, 就需要借助分布式锁来协调多进程 / 服务之间的交互.
分布式锁听起来很高冷, 很高大上, 但它本质上也是锁, 因此, 它也具有锁的基本特征:
原子性
互斥性
除此之外, 分布式的锁有什么不一样呢? 简单来说就是:
独立性
因为分布式锁需要协调其他进程 / 服务的交互, 所以它本身应该是一个独立的, 职责单一的进程 / 服务.
可用性
因为分布式锁是协调多进程 / 服务交互的基础组件, 所以它的可用性直接影响了一组进程 / 服务的可用性, 同时也要避免: 性能, 饥饿, 死锁这些潜在问题.
进程锁和分布式锁的区别:
图示 -- 进程级别的锁:
图示 -- 分布式锁:
分布式锁的业界最佳实践应该非大名鼎鼎的 ZooKeeper 莫属了. 但杀鸡焉用牛刀? 在直接使用 ZooKeeper 实现分布式锁方式之前, 我们先通过 Redis 来演练一下分布式锁算法, 毕竟 Redis 相对来说简单, 轻量很多, 我们可以通过这个实践来详细探讨分布式锁的特性. 这之后再对比地去看 ZooKeeper 的实现方式, 相信会更加容易地理解.
怎么实现分布式锁?
由于 Redis 是高性能的分布式 KV 存储器, 它本身就具备了分布式特性, 所以我们只需要专注于实现锁的基本特征就好了.
首先来看看如何设计锁记录的数据模型:
key | value |
---|---|
lock name | lock owner |
举个例子,"注册表的分布式写锁":
lock name | lock owner |
---|---|
registry_write | 10.10.10.110:25349 |
注意, 为保证锁的互斥性, lock owner 标识必需保证全局唯一, 不会如例子中显示的那样简单.
原子性
因为 Redis 提供的方法可以认为是并发安全的, 所以只要保证加, 解锁操作是原子操作就可以了. 也就是说, 只使用一个 Redis 方法来完成加, 解锁操作的话, 那就能够保证原子性.
加锁操作: set(lockName, lockOwner, ...)
set 是原子的, 所以调用一次 set 也是原子的.
解锁操作: eval(deleteScript, ...)
这里你也许会疑惑, 为什么不直接使用 del(key) 来实现解锁? 因为解锁的时候, 需要先判断你是不是加锁的进程, 不是加锁者是无权解锁的. 如果任何进程都能够解锁, 那锁还有什么意义?
因为 "先判断是不是加锁者, 然后再解锁" 是两步的复合操作, 而 Redis 并没有提供一个可以实现这个复合操作的直接方法, 我们只能通过在 delete script 里面进行复合操作来绕过这个问题: 因为执行一条脚本的 eval 方法是原子的, 所以这个解锁操作的也是原子的.
互斥性
互斥性是说, 一旦有一个进程加锁成功能, 那么在该进程解锁之前, 其他的进程都不能加锁.
在实现互斥性的同时, 注意不能打破锁的原子性.
加锁操作: set(lockName, lockOwner, "NX", ...)
第 3 个参数 NX 的含义: 只有当 lockName(key) 不存在时才会设置该键值.
解锁操作:
- eval(
- eval(
- "if Redis.call('get', KEYS[1]) == ARGV[1] then" + "return Redis.call('del', KEYS[1]) else return 0 end" , List ( lockName ), List ( lockOwner
- )
- )
当解锁者等于锁的持有者时, 才会删除该键值.
超时
解锁权唯一属于锁的持有者, 如果持有者进程异常退出, 就永远无法解锁了. 针对这种情况, 我们可以在加锁时设置一个过期时间, 超过这个时间没有解锁, 锁会自动失效, 这样其他进程就能进行加锁了.
加锁操作: set(lockName, lockOwner, "NX", "PX", expireTime)
"PX" : 过期时间单位:"EX" -- 秒,"PX" -- 毫秒
expireTime : 过期时间
代码片段 1 : 加锁, 解锁
- // 由 Scala 编写
- case class RedisLock(client: JedisClient,
- lockName: String,
- locker: String) {
- private val LOCK_SUCCESS = "OK"
- private val SET_IF_NOT_EXISTS = "NX"
- private val EXPIRE_TIME_UNIT = "PX"
- private val RELEASE_SUCCESS = 1L
- def tryLock(expire: Duration): Boolean = {
- val res = client.con.set(
- lockName, // key
- locker, // value
- SET_IF_NOT_EXISTS, // nxxx
- EXPIRE_TIME_UNIT, // expire time unit
- expire.toMillis // expire time
- )
- val isLock = LOCK_SUCCESS.equals(res)
- println(s"${locker} : ${if (isLock)"lock ok"else"lock fail"}")
- isLock
- }
- def unlock: Boolean = {
- val cmd =
- "if Redis.call('get', KEYS[1]) == ARGV[1] then" +
- "return Redis.call('del', KEYS[1]) else return 0 end"
- val res = client.con.eval(
- cmd,
- List(lockName), // keys
- List(locker) // args
- )
- val isUnlock = RELEASE_SUCCESS.equals(res)
- println(s"${locker} : ${if (isUnlock)"unlock ok"else"unlock fail"}")
- isUnlock
- }
- }
测试加锁:
- object TryLockDemo extends App {
- val client = JedisContext.client
- val lock1 = RedisLock(client, "LOCK", "LOCKER_1")
- // Try lock
- lock1.tryLock(1000.millis)
- Thread.sleep(2000.millis.toMillis)
- // Try lock after expired
- lock1.tryLock(1000.millis)
- // Unlock
- lock1.unlock
- }
测试结果:
- LOCKER_1 : lock ok # 加锁成功, 1 秒后锁失效
- LOCKER_1 : lock ok
- # 2 秒之后, 锁已过期释放, 所以成功加锁
- LOCKER_1 : unlock ok # 解锁成功
阻塞加锁
到目前为止, 我们实现了简单的加解锁功能:
通过 tryLock() 方法尝试加锁, 会立即返回加锁的结果
锁拥有者通过 unlock() 方法解锁
但在实际的加锁场景中, 如果加锁失败了 (锁被占用或网络错误等异常情况), 我们希望锁工具有同步等待(或者说重试) 的能力. 面对这个需求, 一般会想到两种解决方案:
简单暴力轮询
Pub / Sub 订阅通知模式
因为 Redis 本身有极好的读性能, 所以暴力轮询不失为一种简单高效的实现方式, 接下来就让我们来尝试下实现阻塞加锁方法.
先来推演一下算法过程:
设置阻塞加锁的超时时间 timeout
如果已超时, 则返回失败 false
如果未超时, 则通过 tryLock() 方法尝试加锁
如果加锁成功, 返回成功 true
如果加锁失败, 休眠一段时间 frequency 后, 重复第 2 步
代码片段 2 : 阻塞加锁
- def lock(expire: Duration,
- timeout: Duration,
- frequency: Duration = 500.millis): Boolean = {
- var isTimeout = false
- TimeoutUtil.delay(timeout.toMillis).map(_ => isTimeout = true)
- while (!isTimeout) {
- if (tryLock(expire)) {
- return true
- }
- Thread.sleep(frequency.toMillis)
- }
- println(s"${locker} : timeout")
- return false;
- }
代码片段 -- 超时工具类:
- object TimeoutUtil {
- def delay(millis: Long): Future[Unit] = {
- val promise = Promise[Unit]()
- val timer = new Timer
- timer.schedule(new TimerTask {
- override def run(): Unit = {
- promise.success()
- timer.cancel()
- }
- }, millis)
- promise.future
- }
- }
测试阻塞加锁:
- object LockDemo extends App {
- val client = JedisContext.client
- val lock1 = RedisLock(client, "LOCK", "LOCKER_1")
- val lock2 = RedisLock(client, "LOCK", "LOCKER_2")
- // Lock
- lock1.lock(3000.millis, 1000.millis)
- lock2.lock(3000.millis, 1000.millis)
- lock2.lock(3000.millis, 3000.millis)
- // Unlock
- lock1.unlock
- lock2.unlock
- }
测试结果:
- LOCKER_1 : lock ok # LOCKER_1 加锁成功, 3 秒后锁失效
- LOCKER_2 : lock fail # LOCKER_2 尝试加锁失败
- LOCKER_2 : lock fail # LOCKER_2 重试, 尝试加锁失败
- LOCKER_2 : timeout # LOCKER_2 重试超时, 返回失败
- LOCKER_2 : lock fail # LOCKER_2 尝试加锁失败
- LOCKER_2 : lock fail # LOCKER_2 重试, 尝试加锁失败
- LOCKER_2 : lock fail
- LOCKER_2 : lock fail
- LOCKER_2 : lock ok # 3 秒时间到, 锁失效, LOCKER_2 加锁成功
- LOCKER_1 : unlock fail # LOCKER_1 解锁失败, 因为此时锁被 LOCKER_2 占有
- LOCKER_2 : unlock ok # LOCKER_2 解锁成功
更进一步
这个分布式锁的实现, 有一个比较明显的缺陷, 就是等待锁的进程无法实时的知道锁状态的变化, 从而及时的做出响应. 我们不妨思考一下, 通过什么方式可以实时, 高效的获得锁的状态?
作为分布式锁的业界标准, ZooKeeper 以及相关的工具库提供了更加直接, 高效的支持, 那么 ZooKeeper 是怎样的思路? 具体又是如何实现的? 欲知后事如何, 且听下回分解: ZooKeeper 分布式锁实践.
来源: http://www.tuicool.com/articles/6JfyU3v