今天讨论一个关于分布式集群中如何防止重复操作的问题, 重复操作在单机应用就显得很重要了, 何况是在分布式系统中. 小编举个栗子, 我们首先模拟这么一个场景, 加入我们有一个只有 2 台机器的小集群, 每台机器上面部署了同一个应用服务系统, 每个系统中定义了 1 个相同的定时任务 (我们假设它是 ----- 在每天 23 点执行对同一个数据库某个操作), 因为是在集群环境中, 我们怎么保证这两个完全相同的定时任务只执行其中一个呢?
我们想象最坏的一种情况, 那就是两个定时任务都执行了, 结果必然影响相应的业务, 下面我们假设是两个定时任务:
- task
- import java.util.Date;
- import java.util.UUID;
- import javax.annotation.Resource;
- import org.apache.commons.lang3.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.slf4j.MDC;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.EnableScheduling;
- import org.springframework.scheduling.annotation.SchedulingConfigurer;
- import org.springframework.scheduling.config.ScheduledTaskRegistrar;
- import org.springframework.scheduling.support.CronTrigger;
- import org.springframework.stereotype.Component;
- @Component
- @EnableScheduling
- public class PayRecordTradeQueryTask implements SchedulingConfigurer {
- private static final Logger LOGGER = LoggerFactory.getLogger(PayRecordTradeQueryTask.class);
- private static String cron = "0 0/1 * * * ?";
- @Override
- public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
- taskRegistrar.addTriggerTask(() -> {
- // 任务逻辑
- LOGGER.debug("PayOrderTradeQueryTask 启动");
- // 此处不加防重复操作, 两台机器必然执行两次该核心业务
- payRecordSvc.handleProcessRecord();
- } catch (Exception e) {
- LOGGER.error("orderRecordExpiredJob error!", e);
- }
- LOGGER.debug("payOrderTradeQueryTask 处理完毕");
- }, (triggerContext) -> {
- MDC.put(CommonCst.TRANSACTION_ID, UUID.randomUUID().toString());
- // 任务触发, 可修改任务的执行周期
- if (StringUtils.isNotBlank(lifecSysConfig.getPayRecordJobQueryCron())) {
- cron = lifecSysConfig.getPayRecordJobQueryCron();
- }
- CronTrigger trigger = new CronTrigger(cron);
- Date nextExec = trigger.nextExecutionTime(triggerContext);
- return nextExec;
- });
- }
- }
既然是分布式系统, 我们可以使用某种共享资源来实现并发控制, 于是乎 codis 出现了, codis 是 redis 的分布式版本, 当然下面讲的使用它来防止重复操作, 采用 redis 也是可行的. 当第一台机器执行时, 我们在 redis 中插入某个键值, 然后执行定时任务, 当第二台机器准备执行定时任务时, 我们可以判断 redis 中是否已经存在该键, 如果存在, 则不执行定时任务.
看下面改进代码:
- @Component
- @EnableScheduling
- public class PayRecordTradeQueryTask implements SchedulingConfigurer {
- private static final Logger LOGGER = LoggerFactory.getLogger(PayRecordTradeQueryTask.class);
- private static String cron = "0 0/1 * * * ?";
- @Resource(name = "jedisCodis3Pool")
- private JedisResourcePool jedisPool;
- @Autowired
- private LifecSysConfig lifecSysConfig;
- @Resource(name = "queryPayRecordSvc")
- private PayRecordSvc payRecordSvc;
- @Override
- public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
- taskRegistrar.addTriggerTask(() -> {
- // 任务逻辑
- LOGGER.debug("PayOrderTradeQueryTask 启动");
- try {
- MDC.put(CommonCst.TRANSACTION_ID, UUID.randomUUID().toString());
- // 防止多台服务器重复处理
- boolean isRepeated = RedisAvoidRepeatUtils.repeatOpreat(LifecCommonCst.ORDER_PAY_QUERY_JOB_REPEATED_KEY, "1", jedisPool, 10);
- if (!isRepeated) {
- payRecordSvc.handleProcessRecord();
- }
- } catch (Exception e) {
- LOGGER.error("orderRecordExpiredJob error!", e);
- }
- LOGGER.debug("payOrderTradeQueryTask 处理完毕");
- }, (triggerContext) -> {
- MDC.put(CommonCst.TRANSACTION_ID, UUID.randomUUID().toString());
- // 任务触发, 可修改任务的执行周期
- if (StringUtils.isNotBlank(lifecSysConfig.getPayRecordJobQueryCron())) {
- cron = lifecSysConfig.getPayRecordJobQueryCron();
- }
- CronTrigger trigger = new CronTrigger(cron);
- Date nextExec = trigger.nextExecutionTime(triggerContext);
- return nextExec;
- });
- }
- }
我们进入 repeatOpreat() 方法看看怎么实现:
- /**
- * 是否重复操作
- * true : 重复操作
- * false : 没有重复操作
- */
- public static boolean repeatOpreat(String key, String value, JedisResourcePool jedisPool, int time) {
- Jedis jedis = null;
- try {
- jedis = getJedis(jedisPool);
- // 这句话的意思是当 key 存在时, 则返回 null, 否则插入该键值, 并返回 OK
- String status = jedis.set(key, value, "NX", "EX", time);
- LOG.info("key =" + key + "; value =" + value +"; jedis status ->" + status);
- if ("OK".equalsIgnoreCase(status)) {
- return false;
- }
- return true;
- } catch (Exception e) {
- LOG.error("repeatOpreat faild", e);
- } finally {
- if (jedis != null) {
- jedis.close();
- }
- }
- return false;
- }
注意 jedis 提供了很多个版本的 set 方法, 我们应该使用注入上面的方法, 其中参数值 NX 指示 redis, 当 key 存在时, 则返回 null, 否则插入该键值, 并返回 OK, 否则当你重复插入相同键值时, 都会返回 OK.
最后, 其实利用缓存实现防止重复操作的用途非常广泛, 不一定只适用于分布式定时任务的防重, 具体问题具体分析!
来源: https://juejin.im/post/5b1de4dd5188257d5a30cb3e