一, 官方文档
简单介绍下 Redis 的几个事务命令:
Redis 事务四大指令: MULTI,EXEC,DISCARD,WATCH.
这四个指令构成了 Redis 事务处理的基础.
1.MULTI 用来组装一个事务;
2.EXEC 用来执行一个事务;
3.DISCARD 用来取消一个事务;
4.WATCH 类似于乐观锁机制里的版本号.
被 WATCH 的 key 如果在事务执行过程中被并发修改, 则事务失败. 需要重试或取消.
以后单独介绍.
下面是最新版本的 spring-data-Redis(2.1.3)的官方手册.
这里, 我们注意这么一句话:
Redis provides support for http://redis.io/topics/transactions through the multi, exec, and discard commands. These operations are available on RedisTemplate. However, RedisTemplate is not guaranteed to execute all operations in the transaction with the same connection.
意思是 Redis 服务器通过 multi,exec,discard 提供事务支持. 这些操作在 RedisTemplate 中已经实现. 然而, RedisTemplate 不保证在同一个连接中执行所有的这些一个事务中的操作.
另外一句话:
Spring Data Redis provides the SessionCallback interface for use when multiple operations need to be performed with the same connection, such as when using Redis transactions. The following example uses the multi method:
意思是: spring-data-Redis 也提供另外一种方式, 这种方式可以保证多个操作 (比如使用 Redis 事务) 可以在同一个连接中进行. 示例如下:
- //execute a transaction
- List<Object> txResults = redisTemplate.execute(new SessionCallback<List<Object>>() {
- public List<Object> execute(RedisOperations operations) throws DataAccessException {
- operations.multi();
- operations.opsForSet().add("key", "value1");
- // This will contain the results of all operations in the transaction
- return operations.exec();
- }
- });
- System.out.println("Number of items added to set:" + txResults.get(0));
二, 实现事务的方式 --RedisTemplate 直接操作
在前言中我们说, 通过 RedisTemplate 直接调用 multi,exec,discard, 不能保证在同一个连接中进行.
这几个操作都会调用 RedisTemplate#execute(RedisCallback<T>, boolean), 比如 multi:
- public void multi() {
- execute(connection -> {
- connection.multi();
- return null;
- }, true);
- }
我们看看 RedisTemplate 的 execute 方法的源码:
- public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
- Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
- Assert.notNull(action, "Callback object must not be null");
- RedisConnectionFactory factory = getRequiredConnectionFactory();
- RedisConnection conn = null;
- try {
9 -- 开启了 enableTransactionSupport 选项, 则会将获取到的连接绑定到当前线程
- if (enableTransactionSupport) {
- // only bind resources in case of potential transaction synchronization
- conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
- } else {
-- 未开启, 就会去获取新的连接
- conn = RedisConnectionUtils.getConnection(factory);
- }
- boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
- RedisConnection connToUse = preProcessConnection(conn, existingConnection);
... 忽略无关代码...
26 RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
27 T result = action.doInRedis(connToExpose); -- 使用获取到的连接, 执行定义在业务回调中的代码
28
... 忽略无关代码...
- // TODO: any other connection processing?
- return postProcessResult(result, connToUse, existingConnection);
- } finally {
- RedisConnectionUtils.releaseConnection(conn, factory);
- }
- }
查看以上源码, 我们发现,
不启用 enableTransactionSupport, 默认每次获取新连接, 代码如下:
- RedisTemplate<String, Object> template = new RedisTemplate<>();
- template.multi();
- template.opsForValue().set("test_long", 1);
- template.opsForValue().increment("test_long", 1);
- template.exec();
启用 enableTransactionSupport, 每次获取与当前线程绑定的连接, 代码如下:
- RedisTemplate<String, Object> template = new RedisTemplate<>();
- template.setEnableTransactionSupport(true);
- template.multi();
- template.opsForValue().set("test_long", 1);
- template.opsForValue().increment("test_long", 1);
- template.exec();
三, 实现事务的方式 --SessionCallback
采用这种方式, 默认就会将所有操作放在同一个连接, 因为在 execute(SessionCallback<T> session)(注意, 这里是重载函数, 参数和上面不一样)源码中:
- public <T> T execute(SessionCallback<T> session) {
- Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
- Assert.notNull(session, "Callback object must not be null");
- RedisConnectionFactory factory = getRequiredConnectionFactory();
- // 在执行业务回调前, 手动进行了绑定
- RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
- try { // 业务回调
- return session.execute(this);
- } finally {
- RedisConnectionUtils.unbindConnection(factory);
- }
- }
四, SessionCallback 方式的示例代码:
- RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration("192.168.19.90");
- JedisConnectionFactory factory = new JedisConnectionFactory(configuration);
- factory.afterPropertiesSet();
- RedisTemplate<String, Object> template = new RedisTemplate<>();
- template.setConnectionFactory(factory);
- template.setDefaultSerializer(new GenericFastJsonRedisSerializer());
- StringRedisSerializer serializer = new StringRedisSerializer();
- template.setKeySerializer(serializer);
- template.setHashKeySerializer(serializer);
- template.afterPropertiesSet();
- try {
- List<Object> txResults = template.execute(new SessionCallback<List<Object>>() {
- @Override
- public List<Object> execute(RedisOperations operations) throws DataAccessException {
- operations.multi();
- operations.opsForValue().set("test_long", 1);
- int i = 1/0;
- operations.opsForValue().increment("test_long", 1);
- // This will contain the results of all ops in the transaction
- return operations.exec();
- }
- });
- } catch (Exception e) {
- System.out.println("error");
- e.printStackTrace();
- }
有几个值得注意的点:
1, 为什么加 try catch
先说结论: 只是为了防止调用的主线程失败.
因为事务里运行到 23 行,(int i = 1/0)时, 会抛出异常.
但是在 template.execute(SessionCallback<T> session)中未对其进行捕获, 只在 finally 块进行了连接释放.
所以会导致调用线程 (这里是 main 线程) 中断.
2.try-catch 了, 事务到底得到保证了没
我们来测试下, 测试需要, 省略非关键代码
2.1 事务执行过程, 抛出异常的情况:
- List<Object> txResults = template.execute(new SessionCallback<List<Object>>() {
- @Override
- public List<Object> execute(RedisOperations operations) throws DataAccessException {
- operations.multi();
- operations.opsForValue().set("test_long", 1);
- int i = 1/0;
- operations.opsForValue().increment("test_long", 1);
- // This will contain the results of all ops in the transaction
- return operations.exec();
- }
- });
执行上述代码, 执行到 int i = 1/0 时, 会抛出异常. 我们需要检查, 抛出异常后, 是否发送了 "discard" 命令给 Redis 服务器?
下面是我的执行结果, 从最后的抓包可以看到, 是发送了 discard 命令的:
2.2 事务执行过程, 不抛出异常的情况:
这次我们注释了抛错的那行, 可以看到 "EXEC" 命令已经发出去了:
3 抛出异常, 不捕获异常的情况:
有些同学可能比较奇怪, 为啥网上那么多教程, 都是没有捕获异常的, 我这里要捕获呢?
其实我也奇怪, 但在我目前测试来看, 不捕获的话, 执行线程就中断了, 因为 template.execute 是同步执行的.
来, 看看:
从上图可以看到, 主线程被未捕获的异常给中断了, 但是, 查看网络抓包, 发现 "DISCARD" 命令还是发出去了的.
4. 总结
从上面可以看出来, 不管捕获异常没, 事务都能得到保证. 只是不捕获异常, 会导致主线程中断.
不保证所有版本如此, 在我这, spring-data-Redis 2.1.3 是这样的.
我跟了 n 趟代码, 发现:
1, 在执行 sessionCallBack 中的代码时, 我们一般会先执行 multi 命令.
multi 命令的代码如下:
- public void multi() {
- execute(connection -> {
- connection.multi();
- return null;
- }, true);
- }
即调用了当前线程绑定的 connection 的 multi 方法.
进入 JedisConnection 的 multi 方法, 可以看到:
- private @Nullable Transaction transaction;
- public void multi() {
- if (isQueueing()) {
- return;
- }
- try {
- if (isPipelined()) {
- getRequiredPipeline().multi();
- return;
- }
- // 赋值给了 connection 的实例变量
- this.transaction = jedis.multi();
- } catch (Exception ex) {
- throw convertJedisAccessException(ex);
- }
- }
2, 在有异常抛出时, 直接进入 finally 块, 会去关闭 connection, 当然, 这里的关闭只是还回到连接池.
大概的逻辑如下:
3. 在没有异常抛出时, 执行 exec, 在 exec 中会先将状态变量修改, 后边进入 finally 的时候, 就不会发送 discard 命令了.
最后的结论就是:
所有这一切的前提是, 共有同一个连接.(使用 SessionCallBack 的方式就能保证, 总是共用同一个连接), 否则 multi 用到的连接 1 里 transcation 是有值的, 但是后面获取到的其他连接 2,3,4, 里面的 transaction 是空的,
还怎么保证事务呢?
五, 思考
在不开启 redisTemplate 的 enableTransactionSupport 选项时, 每执行一次 Redis 操作, 就会向服务器发送相应的命令.
但是, 在开启了 redisTemplate 的 enableTransactionSupport 选项, 或者使用 SessionCallback 方式时, 会像下面这样发送命令:
后来, 我在《Redis 实战》这本书里的 4.4 节, Redis 事务这一节里, 找到了答案:
归根到底呢, 因为重用同一个连接, 所以可以延迟发; 如果每次都不一样的连接, 只能马上发了.
这里另外说一句, 不是所有客户端都这样, Redis 自带的 Redis-cli 是不会延迟发送的.
六, 源码
来源: https://www.cnblogs.com/grey-wolf/p/10142937.html