部分参考链接
- Transaction https://redis.io/topics/transactions
- StackExchange.Redis Transaction
- hashest
正文
Redis 是一种基于内存的单线程数据库. 意味着所有的命令是一个接一个的执行.
考虑只有一个 Redis 实例, 也就是 Redis 本身没有做分布式.
通过 SETNX https://redis.io/commands/setnx 命令, set if not exist 的缩写. 那么多个服务在调用的时候可以通过同一个 key 申请一个 lock(也就是调用命令成功返回 1), 然后根据相应条件做释放 (比如时间到期, or 手动释放), 也就是 delete key.
Redis 本身有 MULTI https://redis.io/commands/multi 命令, 标记开启一个事务. 开启之后后面的命令会在调用 EXEC https://redis.io/commands/exec 命令的时候以一个集合的方式整体执行, 也就是原子性 (都成功 or 失败).
现在有个需求, 用 Redis 实现 Check and Set, 也就是先读取里面的值, 然后设置 (比如做个 +=val); 并发的问题是必须要考虑的.
用 Redis 描述大致是这样的. 这里假设 Redis 没有 incr 这个自增命令.
- val = GET mykey
- val = val + 1
- SET mykey $val
直接这样做, 并发问题是肯定有的. 所以, 按照上面的知识, 应该有 2 种方法来避免这个并发问题.
基于 SENTX 命令.
copy 一下文档的 demo
- Redis> SETNX mykey "Hello"
- (integer) 1
- Redis> SETNX mykey "World"
- (integer) 0
- Redis> GET mykey
- "Hello"
- Redis>
第一次调用 setnx, 设置 mykey 的 value 为 hello, 返回 1, 表示成功.
第二次调用 setnx, 设置 mykey 的 value 为 world, 因为第一次调用并没有释放 mykey, 所以返回 0, 表示设置失败.
最后获取 mykey 的值, 返回的是 hello.
最后记得要去释放 mykey.
这其实是一个悲观锁, 也就是一个进程获取到锁之后要等释放别的进程才能继续.
基于 MULTI 命令.
先看一个简单的应用
- 127.0.0.1:6379> multi
- OK
- 127.0.0.1:6379> incr foo
- QUEUED
- 127.0.0.1:6379> incr bar
- QUEUED
- 127.0.0.1:6379> exec
- 1) (integer) 1
- 2) (integer) 1
第一步调用 MULTI 命令, 表示开始多个命令的输入. 返回 OK, 表示开始接收.
第二步调用 incr foo, 给 foo 对应的值做自增. 返回 queued, 表示已加入队列.
第二步调用 incr bar, 给 bar 对应的值做资政, 返回 queued, 表示已加入队列.
最后调用 exec 命令, 表示执行队列中的命令. 返回每个命令的结果.
有错误了怎么办
首先错误分两种
在 enqueue 的时候出错, 最常见的就是参数错误. 比如下面这个例子
- 127.0.0.1:6379> multi
- OK
- 127.0.0.1:6379> set a 1234
- QUEUED
- 127.0.0.1:6379> set a 1 1 1 1 1 1 11
- QUEUED
- 127.0.0.1:6379> exec
- 1) OK
- 2) (error) ERR syntax error
- 127.0.0.1:6379>
第二个 set a 1 1 1 1 1 1 11 命令是有语法错误, 所以, 在执行 exec 的时候会返回语法错误. 第一个是成功的. 所以, 如果在后面 get a 是会返回 1234, 为成功的设置.
假设报错的命令在中间, 后面的命令也是会执行的.
还有就是直接命令就不对的. 看个例子
- 127.0.0.1:6379> multi
- OK
- 127.0.0.1:6379> set a 11
- QUEUED
- 127.0.0.1:6379> aaa
- (error) ERR unknown command `aaa`, with args beginning with:
- 127.0.0.1:6379> exec
- (error) EXECABORT Transaction discarded because of previous errors.
先 set a, 进入队列.
执行 aaa 命令, 这个命令不存在. 直接报错.
执行 exec, 事务因为之前的错误, exec 中止.
为什么没有回滚
通过上面的例子, 看到 Redis 对 multi 的操作是没有回滚的, 或许有点奇怪. 根据文档描述, 有两个原因.
Redis 的命令执行只有在语法错误或者数据类型出错的时候会失败, 而不是在 enqueue 的时候. 这意味着失败是由程序设置错误导致的. 那么, 这种错误肯定是在开发环境中就应该容易被发现, 而不是在生产环境.
为了快.
WATCH 命令的乐观锁
结合 https://redis.io/commands/watch 命令我们也可以实现上面的需求.
- WATCH mykey
- --Begin---
- ## 下面两行是客户端命令
- val = GET mykey
- val = val + 1
- --End---
- MULTI
- SET mykey $val
- EXEC
解释一下, 先获取一下 mykey 的监控. 然后客户端获取 mykey 的值,(是客户端, 不是命令服务端). 然后赋值自增. 然后服务端开启 MULTI, 设置新的值. 执行.
假设在 MULTI 和 Exec 之间, mykey 的值被别的 client 修改, exec 会返回 (nil).
下面做个演示:
先在 Redis-cli 上执行以下命令
- 127.0.0.1:6379> watch a
- OK
- 127.0.0.1:6379> multi
- OK
- 127.0.0.1:6379> set a 13
- QUEUED
如上, 已经开启 WATCH, 然后设置 a =13 进入队列.
然后在本地的 Redis desktop manager 上去修改这个值.
然后再在服务器上执行 exec,
- 127.0.0.1:6379> exec
- (nil)
返回的是 nil, 表示没有成功. 如果没有客户端去更新, 执行 exec 是返回 OK.
Redis-scripting-and-transactions
在 Redis 2.6 之后, 引入了 Redis script https://redis.io/commands/eval 来实现事务的功能. 通常来说 script 方式速度会相对快一点 (没有做测试). 不过既然 multi 已经出来很久了, 所以, 不太可能会移除这个命令.
在 StackExchange.Redis 中使用
显然, 也分两种, 基于 setnx 或者 MULTI + WATCH. 分别对应的是 IDatabaseAsync.LockTakeAsync 和 IDatabaseAsync.CreateTransaction 这里结合了 https://github.com/App-vNext/Polly 这个库用于重试, 毕竟, 悲观锁, 我多拿几次总能拿到的; 乐观锁, 执行的命令, 我多试几次, 总能成功的.
- LockTakeAsync
- public async Task<T> TakeLockAsync<T>(string key, string token, Func<object, Task<T>> func, object obj)
- where T : class
- {
- var db = GetDb(redisConfigModel.LockDbIndex);// 获取 IDatabaseAsync 对象
- // 定义获取锁的策略
- var policy = Policy
- .HandleResult<bool>(w => !w)
- .WaitAndRetryForeverAsync(
- sleepDurationProvider: attemp => TimeSpan.FromSeconds(3), // 两次重复尝试的间隔
- onRetry: (delegeteRst, ts) =>
- {
- // 可以记录日志啥的
- }
- );
- // 竞争获取锁.
- await policy.ExecuteAsync(async () => await db.LockTakeAsync(key, token, TimeSpan.MaxValue));
- try
- {
- return await func(obj);// 获取到锁之后的具体执行的方法.
- }
- finally
- {
- await db.LockReleaseAsync(key, token); // 最后一定要释放
- }
- }
LockTakeAsync 的时候根据 key 对应的 token 值是否已经被获取来作为条件.
CreateTransaction
StackExchange.Redis 用 multiplexer 类实现 Redis 的一些列命令. 我们的代码不能直接简单的映射到 watch 命令, 因为, 单纯调用 watch 是肯定成功的, 这样会导致大家都 "成功"(假的). 这里用的 Condition 的方式来实现.
- public async Task AddAfterReadAsync(string key, int value, string hashField = "hash_field")
- {
- // 处理 policy 的结果为 false 的情况, 一直重试.
- var policy = Policy.HandleResult<bool>(w => !w).RetryForeverAsync();
- // 执行
- await policy.ExecuteAsync(async () =>
- {
- var db = GetDb(redisConfigModel.LockDbIndex);
- var trans = db.CreateTransaction();
- var oldValue = Convert.ToInt32(await db.StringGetAsync(key));
- trans.AddCondition(Condition.HashNotExists(key,
- hashField)); // 这里确保 hashField 不存在. 也可以用 Condition.KeyNotExists(key)
- // 这里不能 await, 因为每个命令的结果只有在执行了 execute 后才知道.
- trans.StringSetAsync(key, (oldValue + value).ToString());
- var execSuccess = await trans.ExecuteAsync();
- return execSuccess;
- });
- }
小结
这是一篇和 Redis 有关的锁, 事务的文章. 写了我一整个下午. 看完, 感觉也没有多少东西. 感觉开头链接中关于 hashset 还是有点意思的.
来源: http://www.bubuko.com/infodetail-3053797.html