时间轮算法
时间轮是一种高效, 低延迟的调度数据结构. 其在 Linux 内核中广泛使用, 是 Linux 内核定时器的实现方法和基础之一. 按使用场景, 大致可以分为两种时间轮: 原始时间轮和分层时间轮. 分层时间轮是原始时间轮的升级版本, 来应对时间 "槽" 数量比较大的情况, 对内存和精度都有很高要求的情况. 延迟任务的场景一般只需要用到原始时间轮就可以了.
代码案例
推荐使用 Netty 提供的 HashedWheelTimer 工具类来实现延迟任务.
引入依赖:
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- <version>4.1.23.Final</version>
- </dependency>
红包过期队列信息:
- /**
- * 红包过期队列信息
- */
- public class RedPacketTimerTask implements TimerTask {
- private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
- /**
- * 红包 ID
- */
- private final long redPacketId;
- /**
- * 创建时间戳
- */
- private final long timestamp;
- public RedPacketTimerTask(long redPacketId) {
- this.redPacketId = redPacketId;
- this.timestamp = System.currentTimeMillis();
- }
- @Override
- public void run(Timeout timeout) {
- // 异步处理任务
- System.out.println(String.format("任务执行时间:%s, 红包创建时间:%s, 红包 ID:%s",
- LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), redPacketId));
- }
- }
测试用例:
- /**
- * 基于 netty 的时间轮算法 HashedWheelTimer 实现的延迟任务
- */
- public class RedPacketHashedWheelTimer {
- private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
- public static void main(String[] args) throws Exception {
- ThreadFactory factory = r -> {
- Thread thread = new Thread(r);
- thread.setDaemon(true);
- thread.setName("RedPacketHashedWheelTimerWorker");
- return thread;
- };
- /**
- * @param tickDuration - 每 tick 一次的时间间隔
- * @param unit - tickDuration 的时间单位
- * @param ticksPerWheel - 时间轮中的槽数
- * @param leakDetection - 检查内存溢出
- */
- Timer timer = new HashedWheelTimer(factory, 1,
- TimeUnit.SECONDS, 100,true);
- System.out.println(String.format("开始任务时间:%s",LocalDateTime.now().format(F)));
- for(int i=1;i<10;i++){
- TimerTask timerTask = new RedPacketTimerTask(i);
- timer.newTimeout(timerTask, i, TimeUnit.SECONDS);
- }
- Thread.sleep(Integer.MAX_VALUE);
- }
- }
打印任务执行日志:
开始任务时间: 2020-02-12 15:22:23.404
任务执行时间: 2020-02-12 15:22:25.410, 红包创建时间: 2020-02-12 15:22:23.409, 红包 ID:1
任务执行时间: 2020-02-12 15:22:26.411, 红包创建时间: 2020-02-12 15:22:23.414, 红包 ID:2
任务执行时间: 2020-02-12 15:22:27.424, 红包创建时间: 2020-02-12 15:22:23.414, 红包 ID:3
任务执行时间: 2020-02-12 15:22:28.410, 红包创建时间: 2020-02-12 15:22:23.414, 红包 ID:4
任务执行时间: 2020-02-12 15:22:29.411, 红包创建时间: 2020-02-12 15:22:23.414, 红包 ID:5
任务执行时间: 2020-02-12 15:22:30.409, 红包创建时间: 2020-02-12 15:22:23.414, 红包 ID:6
任务执行时间: 2020-02-12 15:22:31.411, 红包创建时间: 2020-02-12 15:22:23.414, 红包 ID:7
任务执行时间: 2020-02-12 15:22:32.409, 红包创建时间: 2020-02-12 15:22:23.414, 红包 ID:8
任务执行时间: 2020-02-12 15:22:33.411, 红包创建时间: 2020-02-12 15:22:23.414, 红包 ID:9
源码相关
其核心是 workerThread 线程, 主要负责每过 tickDuration 时间就累加一次 tick. 同时也负责执行到期的 timeout 任务以及添加 timeout 任务到指定的 wheel 中.
构造方法:
- public HashedWheelTimer(
- ThreadFactory threadFactory,
- long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
- long maxPendingTimeouts) {
- if (threadFactory == null) {
- throw new NullPointerException("threadFactory");
- }
- if (unit == null) {
- throw new NullPointerException("unit");
- }
- if (tickDuration <= 0) {
- throw new IllegalArgumentException("tickDuration must be greater than 0:" + tickDuration);
- }
- if (ticksPerWheel <= 0) {
- throw new IllegalArgumentException("ticksPerWheel must be greater than 0:" + ticksPerWheel);
- }
- // Normalize ticksPerWheel to power of two and initialize the wheel.
- wheel = createWheel(ticksPerWheel);
- mask = wheel.length - 1;
- // Convert tickDuration to nanos.
- this.tickDuration = unit.toNanos(tickDuration);
- // Prevent overflow.
- if (this.tickDuration>= Long.MAX_VALUE / wheel.length) {
- throw new IllegalArgumentException(String.format(
- "tickDuration: %d (expected: 0 <tickDuration in nanos < %d",
- tickDuration, Long.MAX_VALUE / wheel.length));
- }
- // 这里 - 爪洼笔记
- workerThread = threadFactory.newThread(worker);
- leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
- this.maxPendingTimeouts = maxPendingTimeouts;
- if (INSTANCE_COUNTER.incrementAndGet()> INSTANCE_COUNT_LIMIT &&
- WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
- reportTooManyInstances();
- }
- }
新增任务, 创建即启动:
- public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
- if (task == null) {
- throw new NullPointerException("task");
- }
- if (unit == null) {
- throw new NullPointerException("unit");
- }
- long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
- if (maxPendingTimeouts> 0 && pendingTimeoutsCount> maxPendingTimeouts) {
- pendingTimeouts.decrementAndGet();
- throw new RejectedExecutionException("Number of pending timeouts ("
- + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending"
- + "timeouts (" + maxPendingTimeouts + ")");
- }
- // 这里 - 爪洼笔记
- start();
- // Add the timeout to the timeout queue which will be processed on the next tick.
- // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
- long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
- // Guard against overflow.
- if (delay> 0 && deadline <0) {
- deadline = Long.MAX_VALUE;
- }
- HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
- timeouts.add(timeout);
- return timeout;
- }
线程启动:
- /**
- * Starts the background thread explicitly. The background thread will
- * start automatically on demand even if you did not call this method.
- *
- * @throws IllegalStateException if this timer has been
- * {@linkplain #stop() stopped} already
- */
- public void start() {
- switch (WORKER_STATE_UPDATER.get(this)) {
- case WORKER_STATE_INIT:
- if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
- workerThread.start();
- }
- break;
- case WORKER_STATE_STARTED:
- break;
- case WORKER_STATE_SHUTDOWN:
- throw new IllegalStateException("cannot be started once stopped");
- default:
- throw new Error("Invalid WorkerState");
- }
- // Wait until the startTime is initialized by the worker.
- while (startTime == 0) {
- try {
- startTimeInitialized.await();
- } catch (InterruptedException ignore) {
- // Ignore - it will be ready very soon.
- }
- }
- }
执行相关操作:
- public void run() {
- // Initialize the startTime.
- startTime = System.nanoTime();
- if (startTime == 0) {
- // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
- startTime = 1;
- }
- // Notify the other threads waiting for the initialization at start().
- startTimeInitialized.countDown();
- do {
- final long deadline = waitForNextTick();
- if (deadline> 0) {
- int idx = (int) (tick & mask);
- processCancelledTasks();
- HashedWheelBucket bucket =
- wheel[idx];
- transferTimeoutsToBuckets();
- bucket.expireTimeouts(deadline);
- tick++;
- }
- } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
- // Fill the unprocessedTimeouts so we can return them from stop() method.
- for (HashedWheelBucket bucket: wheel) {
- bucket.clearTimeouts(unprocessedTimeouts);
- }
- for (;;) {
- HashedWheelTimeout timeout = timeouts.poll();
- if (timeout == null) {
- break;
- }
- if (!timeout.isCancelled()) {
- unprocessedTimeouts.add(timeout);
- }
- }
- processCancelledTasks();
- }
小结
以上方案并没有实现持久化和分布式, 生产环境可根据实际业务需求选择使用.
源码
https://gitee.com/52itstyle/spring-boot-seckill
来源: https://www.cnblogs.com/smallSevens/p/12312273.html