最近看了有关 redis 的一些东西,了解了 redis 的一下命令,就记录一下:
redis 中的 setnx 命令:
关于 redis 的操作命令,我们一般会使用 set,get 等一系列操作,数据结构也有很多,这里我们使用最简单的 string 来存储锁.
redis 下提供一个 setnx 命令用来将 key 值设为 value,类似于 set 功能,但是它和 set 是有区别的,在于后面的 nx,setnx 是 SET if Not eXists.就是:当且仅当 key 值不存在的时候,将该 key 值设置为 value.
也就是说使用 setnx 加入元素的时候,当 key 值存在的时候,是没有办法加入内容的.
加锁:
下面将使用 python 控制 redis,python 控制 redis 的方式和命令一样,有一个 setnx(key, value) 方法,通过这个方法可以实现 setnx 命令的效果.
首先连接 redis:
文件 connect.py
然后就可以使用 setnx 加锁了,key 对应的 value 中需要填入相应的值,这里设置 Value 为一个 uuid 值.获取到 uuid 之后,将 key 和 value 填入 redis,其他的客户端想要访问并获取到锁,也使用 setnx 方式,key 值只要相同就好.那么代码如下:
import redis
def connect_redis():
return redis.Redis(host='127.0.0.1', port=6379)
文件 operate.py
这样就通过 setnx 将 key 值写入了.但是,这样显然是不合理的,客户端可以等待一会再次获取锁,这样,不至于每次请求都会出现问题.那可以设置 30 秒的时间,让程序在 30 秒内不停的尝试获取锁,知道 30 秒的时间过了或者由其他客户端释放了锁.
# 加锁的过程
def acquire_lock(conn, lockname):
identifier = str(uuid.uuid4())
conn.setnx('lock:' + lockname, identifier):
所以加锁可以变为如下:
这样就可以通过 setnx 加锁了,这个加锁的过程实际上就是在 redis 中存入了一个值,之后当其他的客户端再次想要通过这个 key 加入这个值的时候,发现这个 key 已经存在就不往进写值了,但是在这 30 秒内还会不断尝试的去获取锁,也就是不断的尝试写入这个值,一旦 key 被删除——也就是锁被释放,则该客户端就可以竞争获取锁——进程写入这个值.这种方式就像是操作系统中的自旋锁.
# 加锁的过程
def acquire_lock(conn, lockname, args, acquite_timeout = 30):
identifier = str(uuid.uuid4())
end = time.time() + acquite_timeout
while time.time() < end:
# 这里尝试取得锁 setnx 设置-如果不存在的时候才会set
if conn.setnx('lock:' + lockname, identifier):
# 获得锁之后输出获得锁的'进程'号
print('获得锁:进程'+ str(args))
return identifier
return False
自旋锁
这里先简单介绍一下自旋锁.
和自旋锁对应的还有一种锁,叫做对于互斥锁.
互斥锁:如果资源已经被占用,资源申请者只能进入睡眠状态.
自旋锁:自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,"自旋" 一词就是因此而得名.
释放锁:
既然锁可以添加,那么也就可以释放.释放锁实际上就是将 redis 中的数据删除.这里可以使用 redis 提供的事务流水线去执行.为了保障在执行的时候确确实实释放的所示没有问题的.
代码如下,调用 delete() 方法——del 命令删除 redis 中这个 key 的元素.
这里就遇到了 python3 中的一个坑,获取到的数据为 byte 类型,所以 identifier_real 这个变量实际上是这个字符串之前加了个 b 的,例如 b'xxxx',所以这和传入的 identifier 的类型为 string,这样比较当然会出现 Fasle,所以我们在这里将这个字符串类型转码:
# 释放锁
def release_lock(conn, lockname, identifier):
pipe = conn.pipeline(True)
lockname = 'lock:' + lockname
while True:
try:
pipe.watch(lockname)
identifier_real = pipe.get(lockname)
if identifier_real == identifier:
pipe.multi()
pipe.delete(lockname)
pipe.execute()
return True;
pipe.unwatch()
break
except redis.exceptions.WatchError:
pass
return False
1 pipe.get(lockname).decode()
这样才是整整的字符串类型的字符串,最终的代码如下:
执行:
def release_lock(conn, lockname, identifier):
pipe = conn.pipeline(True)
lockname = 'lock:' + lockname
while True:
try:
pipe.watch(lockname)
identifier_real = pipe.get(lockname).decode()
if identifier_real == identifier:
pipe.multi()
pipe.delete(lockname)
pipe.execute()
return True;
pipe.unwatch()
break
except redis.exceptions.WatchError:
pass
return False
为了验证这个分布式锁的正确性,可以写一个测试的方法进行测试,先去获取锁,如果获取到之后,则 sleep 三秒,等待 3 秒之后,执行释放锁.
模拟过程入下:
文件 operate.py
这样我们就可以使用多进程并发访问的方式进行对锁进行抢占和释放:
# 模拟加锁解锁的过程
def exec_test(conn, lockname, args):
id = acquire_lock(conn, lockname, args)
if id != False:
print(id)
time.sleep(3)
release_lock(conn, lockname, id)
使用 9 个进程进行测试,操作方式如下:
执行结果如下所示:
from connect import connect_redis
from operate import acquire_lock
from multiprocessing import Process
from operate import exec_test
if __name__ == '__main__':
redis = connect_redis()
for i in range(0, 9):
Process(target = exec_test, args = (redis, 'test', i)).start()
注:以上 python 运行环境为 python3.6
来源: https://www.cnblogs.com/Summer7C/p/8277286.html