1,Redis 事物介绍
1. Redis 事物是可以一次执行多个命令, 本质是一组命令的集合.
2. 一个事务中的所有命令都会序列化, 按顺序串行化的执行而不会被其他命令插入
作用: 一个队列中, 一次性, 顺序性, 排他性的执行一系列命令
2,multi 指令基本使用
1. 下面指令演示了一个完整的事物过程, 所有指令在 exec 前不执行, 而是缓存在服务器的一个事物队列中
2. 服务器一旦收到 exec 指令才开始执行事物队列, 执行完毕后一次性返回所有结果
3. 因为 Redis 是单线程的, 所以不必担心自己在执行队列是被打断, 可以保证这样的 "原子性"
注: Redis 事物在遇到指令失败后, 后面的指令会继续执行
- # Multi 命令用于标记一个事务块的开始事务块内的多条命令会按照先后顺序被放进一个队列当中, 最后由 EXEC 命令原子性 ( atomic ) 地执行
- > multi(开始一个 Redis 事物)
- incr books
- incr books
- > exec (执行事物)
- > discard (丢弃事物)
- [[email protected] ~]# Redis-cli
- 127.0.0.1:6379> multi
- OK
- 127.0.0.1:6379> set test 123
- QUEUED
- 127.0.0.1:6379> exec
- 1) OK
- 127.0.0.1:6379> get test
- "123"
- 127.0.0.1:6379> multi
- OK
- 127.0.0.1:6379> set test 456
- QUEUED
- 127.0.0.1:6379> discard
- OK
- 127.0.0.1:6379> get test
- "123"
- 127.0.0.1:6379>
在命令行测试 Redis 事物
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- import Redis
- r = Redis.Redis(host='127.0.0.1')
- pipe = r.pipeline()
- pipe.multi() #开启事务
- pipe.set('key2', 4) #存储子命令
- pipe.execute() #执行事务
- print(r.get('key2'))
使用 python 测试 Redis 事物
注: MySQL 的 rollback 与 Redis 的 discard 的区别
1. MySQL 回滚为 sql 全部成功才执行, 一条 sql 失败则全部失败, 执行 rollback 后所有语句造成的影响消失
2. Redis 的 discard 只是结束本次事务, 正确命令造成的影响仍然还在.
1)Redis 如果在一个事务中的命令出现错误, 那么所有的命令都不会执行;
2)Redis 如果在一个事务中出现运行错误, 那么正确的命令会被执行.
3,watch 指令作用
实质: WATCH 只会在数据被其他客户端抢先修改了的情况下通知执行命令的这个客户端 (通过 WatchError 异常) 但不会阻止其他客户端对数据的修改
1.watch 其实就是 Redis 提供的一种乐观锁, 可以解决并发修改问题
2. watch 会在事物开始前盯住一个或多个关键变量, 当服务器收到 exec 指令要顺序执行缓存中的事物队列时, Redis 会检查关键变量自 watch 后是否被修改
3. WATCH 只会在数据被其他客户端抢先修改了的情况下通知执行命令的这个客户端 (通过 WatchError 异常) 但不会阻止其他客户端对数据的修改
1.2 setnx(Redis 分布式锁)
1, 分布式锁
1. 分布式锁本质是占一个坑, 当别的进程也要来占坑时发现已经被占, 就会放弃或者稍后重试
2. 占坑一般使用 setnx(set if not exists)指令, 只允许一个客户端占坑
3. 先来先占, 用完了在调用 del 指令释放坑
- > setnx lock:codehole true
- .... do something critical ....
- > del lock:codehole
4. 但是这样有一个问题, 如果逻辑执行到中间出现异常, 可能导致 del 指令没有被调用, 这样就会陷入死锁, 锁永远无法释放
5. 为了解决死锁问题, 我们拿到锁时可以加上一个 expire 过期时间, 这样即使出现异常, 当到达过期时间也会自动释放锁
- > setnx lock:codehole true
- > expire lock:codehole 5
- .... do something critical ....
- > del lock:codehole
6. 这样又有一个问题, setnx 和 expire 是两条指令而不是原子指令, 如果两条指令之间进程挂掉依然会出现死锁
7. 为了治理上面乱象, 在 Redis 2.8 中加入了 set 指令的扩展参数, 使 setnx 和 expire 指令可以一起执行
- > set lock:codehole true ex 5 nx
- '''do something'''
- > del lock:codehole
1.3 Redis 解决超卖问题
1, 使用 reids 的 watch + multi 指令实现
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- import Redis
- def sale(rs):
- while True:
- with rs.pipeline() as p:
- try:
- p.watch('apple') # 监听 key 值为 apple 的数据数量改变
- count = int(rs.get('apple'))
- print('拿取到了苹果的数量: %d' % count)
- p.multi() # 事务开始
- if count> 0 : # 如果此时还有库存
- p.set('apple', count - 1)
- p.execute() # 执行事务
- p.unwatch()
- break # 当库存成功减一或没有库存时跳出执行循环
- except Exception as e: # 当出现 watch 监听值出现修改时, WatchError 异常抛出
- print('[Error]: %s' % e)
- continue # 继续尝试执行
- rs = Redis.Redis(host='127.0.0.1', port=6379) # 连接 Redis
- rs.set('apple',1000) # # 首先在 Redis 中设置某商品 apple 对应数量 value 值为 1000
- sale(rs)
watch+multi 解决超卖问题
1)原理
1. 当用户购买时, 通过 WATCH 监听用户库存, 如果库存在 watch 监听后发生改变, 就会捕获异常而放弃对库存减一操作
2. 如果库存没有监听到变化并且数量大于 1, 则库存数量减一, 并执行任务
2)弊端
1. Redis 在尝试完成一个事务的时候, 可能会因为事务的失败而重复尝试重新执行
2. 保证商品的库存量正确是一件很重要的事情, 但是单纯的使用 WATCH 这样的机制对服务器压力过大
2, 使用 reids 的 watch + multi + setnx 指令实现
1)为什么要自己构建锁
1. 虽然有类似的 SETNX 命令可以实现 Redis 中的锁的功能, 但他锁提供的机制并不完整
2. 并且 setnx 也不具备分布式锁的一些高级特性, 还是得通过我们手动构建
2)创建一个 Redis 锁
1. 在 Redis 中, 可以通过使用 SETNX 命令来构建锁: rs.setnx(lock_name, uuid 值)
2. 而锁要做的事情就是将一个随机生成的 128 位 UUID 设置位键的值, 防止该锁被其他进程获取
3)释放锁
1. 锁的删除操作很简单, 只需要将对应锁的 key 值获取到的 uuid 结果进行判断验证
2. 符合条件 (判断 uuid 值) 通过 delete 在 Redis 中删除即可, pipe.delete(lockname)
3. 此外当其他用户持有同名锁时, 由于 uuid 的不同, 经过验证后不会错误释放掉别人的锁
4)解决锁无法释放问题
1. 在之前的锁中, 还出现这样的问题, 比如某个进程持有锁之后突然程序崩溃, 那么会导致锁无法释放
2. 而其他进程无法持有锁继续工作, 为了解决这样的问题, 可以在获取锁的时候加上锁的超时功能
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- import Redis
- import uuid
- import time
- # 1. 初始化连接函数
- def get_conn(host,port=6379):
- rs = Redis.Redis(host=host, port=port)
- return rs
- # 2. 构建 Redis 锁
- def acquire_lock(rs, lock_name, expire_time=10):
- '''
- rs: 连接对象
- lock_name: 锁标识
- acquire_time: 过期超时时间
- return -> False 获锁失败 or True 获锁成功
- '''
- identifier = str(uuid.uuid4())
- end = time.time() + expire_time
- while time.time() <end:
- # 当获取锁的行为超过有效时间, 则退出循环, 本次取锁失败, 返回 False
- if rs.setnx(lock_name, identifier): # 尝试取得锁
- return identifier
- time.sleep(.001)
- return False
- # 3. 释放锁
- def release_lock(rs, lockname, identifier):
- '''
- rs: 连接对象
- lockname: 锁标识
- identifier: 锁的 value 值, 用来校验
- '''
- pipe = rs.pipeline(True)
- try:
- pipe.watch(lockname)
- if rs.get(lockname).decode() == identifier: # 防止其他进程同名锁被误删
- pipe.multi() # 开启事务
- pipe.delete(lockname)
- pipe.execute()
- return True # 删除锁
- pipe.unwatch() # 取消事务
- except Exception as e:
- pass
- return False # 删除失败
- '''在业务函数中使用上面的锁'''
- def sale(rs):
- start = time.time() # 程序启动时间
- with rs.pipeline() as p:
- '''
- 通过管道方式进行连接
- 多条命令执行结束, 一次性获取结果
- '''
- while True:
- lock = acquire_lock(rs, 'lock')
- if not lock: # 持锁失败
- continue
- try:
- count = int(rs.get('apple')) # 取量
- p.set('apple', count-1) # 减量
- p.execute()
- print('当前库存量: %s' % count)
- break
- finally:
- release_lock(rs, 'lock', lock)
- print('[time]: %.2f' % (time.time() - start))
- rs = Redis.Redis(host='127.0.0.1', port=6379) # 连接 Redis
- rs.set('apple',1000) # # 首先在 Redis 中设置某商品 apple 对应数量 value 值为 1000
- sale(rs)
setnx+watch+multi 解决超卖问题
- def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10):
- '''
- rs: 连接对象
- lock_name: 锁标识
- acquire_time: 过期超时时间
- locked_time: 锁的有效时间
- return -> False 获锁失败 or True 获锁成功
- '''
- identifier = str(uuid.uuid4())
- end = time.time() + expire_time
- while time.time() < end:
- # 当获取锁的行为超过有效时间, 则退出循环, 本次取锁失败, 返回 False
- if rs.setnx(lock_name, identifier): # 尝试取得锁
- # print('锁已设置: %s' % identifier)
- rs.expire(lock_name, locked_time)
- return identifier
- time.sleep(.001)
- return False
优化: 给分布式锁加超时时间防止死锁
来源: http://www.bubuko.com/infodetail-3475503.html