百度 uid-generator 源码
https://github.com/baidu/uid-generator
snowflake 算法
uid-generator 是基于 Twitter 开源的 snowflake 算法实现的.
snowflake 将 long 的 64 位分为了 3 部分, 时间戳, 工作机器 id 和序列号, 位数分配如下.
其中, 时间戳部分的时间单位一般为毫秒. 也就是说 1 台工作机器 1 毫秒可产生 4096 个 id(2 的 12 次方).
源码实现分析
与原始的 snowflake 算法不同, uid-generator 支持自定义时间戳, 工作机器 id 和序列号等各部分的位数, 以应用于不同场景. 默认分配方式如下.
sign(1bit)
固定 1bit 符号标识, 即生成的 UID 为正数.
delta seconds (28 bits)
当前时间, 相对于时间基点 "2016-05-20" 的增量值, 单位: 秒, 最多可支持约 8.7 年 (注意: 1. 这里的单位是秒, 而不是毫秒! 2. 注意这里的用词, 是 "最多" 可支持 8.7 年, 为什么是 "最多", 后面会讲)
worker id (22 bits)
机器 id, 最多可支持约 420w 次机器启动. 内置实现为在启动时由数据库分配, 默认分配策略为用后即弃, 后续可提供复用策略.
sequence (13 bits)
每秒下的并发序列, 13 bits 可支持每秒 8192 个并发.(注意下这个地方, 默认支持 qps 最大为 8192 个)
DefaultUidGenerator
DefaultUidGenerator 的产生 id 的方法与基本上就是常见的 snowflake 算法实现, 仅有一些不同, 如以秒为为单位而不是毫秒.
DefaultUidGenerator 的产生 id 的方法如下.
- protected synchronized long nextId() {
- long currentSecond = getCurrentSecond();
- // Clock moved backwards, refuse to generate uid
- if (currentSecond <lastSecond) {
- long refusedSeconds = lastSecond - currentSecond;
- throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
- }
- // At the same second, increase sequence
- if (currentSecond == lastSecond) {
- sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
- // Exceed the max sequence, we wait the next second to generate uid
- if (sequence == 0) {
- currentSecond = getNextSecond(lastSecond);
- }
- // At the different second, sequence restart from zero
- } else {
- sequence = 0L;
- }
- lastSecond = currentSecond;
- // Allocate bits for UID
- return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
- }
- CachedUidGenerator
CachedUidGenerator 支持缓存生成的 id.
基本实现原理
关于 CachedUidGenerator, 文档上是这样介绍的.
在实现上, UidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制; 采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费, 同时对 CacheLine 补齐, 避免了由 RingBuffer 带来的硬件级「伪共享」问题. 最终单机 QPS 可达 600 万.
[采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费]
使用 RingBuffer 缓存生成的 id.RingBuffer 是个环形数组, 默认大小为 8192 个, 里面缓存着生成的 id.
获取 id
会从 ringbuffer 中拿一个 id, 支持并发获取
填充 id
RingBuffer 填充时机
程序启动时, 将 RingBuffer 填充满, 缓存着 8192 个 id
在调用 getUID() 获取 id 时, 检测到 RingBuffer 中的剩余 id 个数小于总个数的 50%, 将 RingBuffer 填充满, 使其缓存 8192 个 id
定时填充 (可配置是否使用以及定时任务的周期)
[UidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制]
因为 delta seconds 部分是以秒为单位的, 所以 1 个 worker 1 秒内最多生成的 id 书为 8192 个 (2 的 13 次方).
从上可知, 支持的最大 qps 为 8192, 所以通过缓存 id 来提高吞吐量.
为什么叫借助未来时间?
因为每秒最多生成 8192 个 id, 当 1 秒获取 id 数多于 8192 时, RingBuffer 中的 id 很快消耗完毕, 在填充 RingBuffer 时, 生成的 id 的 delta seconds 部分只能使用未来的时间.
(因为使用了未来的时间来生成 id, 所以上面说的是,[最多] 可支持约 8.7 年)
源码剖析
获取 id
- @Override
- public long getUID() {
- try {
- return ringBuffer.take();
- } catch (Exception e) {
- LOGGER.error("Generate unique id exception.", e);
- throw new UidGenerateException(e);
- }
- }
RingBuffer 缓存已生成的 id
(注意: 这里的 RingBuffer 不是 Disruptor 框架中的 RingBuffer, 但是借助了很多 Disruptor 中 RingBuffer 的设计思想, 比如使用缓存行填充解决伪共享问题)
RingBuffer 为环形数组, 默认容量为 sequence 可容纳的最大值 (8192 个), 可以通过 boostPower 参数设置大小.
tail 指针, Cursor 指针用于环形数组上读写 slot:
Tail 指针
表示 Producer 生产的最大序号 (此序号从 0 开始, 持续递增).Tail 不能超过 Cursor, 即生产者不能覆盖未消费的 slot. 当 Tail 已赶上 curosr, 此时可通过 rejectedPutBufferHandler 指定 PutRejectPolicy
Cursor 指针
表示 Consumer 消费到的最小序号 (序号序列与 Producer 序列相同).Cursor 不能超过 Tail, 即不能消费未生产的 slot. 当 Cursor 已赶上 tail, 此时可通过 rejectedTakeBufferHandler 指定 TakeRejectPolicy
CachedUidGenerator 采用了双 RingBuffer,Uid-RingBuffer 用于存储 Uid,Flag-RingBuffer 用于存储 Uid 状态 (是否可填充, 是否可消费)
由于数组元素在内存中是连续分配的, 可最大程度利用 CPU cache 以提升性能. 但同时会带来「伪共享」FalseSharing 问题, 为此在 Tail,Cursor 指针, Flag-RingBuffer 中采用了 CacheLine 补齐方式.
- public class RingBuffer {
- private static final Logger LOGGER = LoggerFactory.getLogger(RingBuffer.class);
- /** Constants */
- private static final int START_POINT = -1;
- private static final long CAN_PUT_FLAG = 0L; // 用于标记当前 slot 的状态, 表示可以 put 一个 id 进去
- private static final long CAN_TAKE_FLAG = 1L; // 用于标记当前 slot 的状态, 表示可以 take 一个 id
- public static final int DEFAULT_PADDING_PERCENT = 50; // 用于控制何时填充 slots 的默认阈值: 当剩余的可用的 slot 的个数, 小于 bufferSize 的 50% 时, 需要生成 id 将 slots 填满
- /** The size of RingBuffer's slots, each slot hold a UID */
- private final int bufferSize; //slots 的大小, 默认为 sequence 可容量的最大值, 即 8192 个
- private final long indexMask;
- private final long[] slots; //slots 用于缓存已经生成的 id
- private final PaddedAtomicLong[] flags; //flags 用于存储 id 的状态 (是否可填充, 是否可消费)
- /** Tail: last position sequence to produce */
- //Tail 指针
- // 表示 Producer 生产的最大序号 (此序号从 0 开始, 持续递增).Tail 不能超过 Cursor, 即生产者不能覆盖未消费的 slot. 当 Tail 已赶上 curosr, 此时可通过 rejectedPutBufferHandler 指定 PutRejectPolicy
- private final AtomicLong tail = new PaddedAtomicLong(START_POINT); //
- /** Cursor: current position sequence to consume */
- // 表示 Consumer 消费到的最小序号 (序号序列与 Producer 序列相同).Cursor 不能超过 Tail, 即不能消费未生产的 slot. 当 Cursor 已赶上 tail, 此时可通过 rejectedTakeBufferHandler 指定 TakeRejectPolicy
- private final AtomicLong cursor = new PaddedAtomicLong(START_POINT);
- /** Threshold for trigger padding buffer*/
- private final int paddingThreshold; // 用于控制何时填充 slots 的阈值
- /** Reject put/take buffer handle policy */
- // 当 slots 满了, 无法继续 put 时的处理策略. 默认实现: 无法进行 put, 仅记录日志
- private RejectedPutBufferHandler rejectedPutHandler = this::discardPutBuffer;
- // 当 slots 空了, 无法继续 take 时的处理策略. 默认实现: 仅抛出异常
- private RejectedTakeBufferHandler rejectedTakeHandler = this::exceptionRejectedTakeBuffer;
- /** Executor of padding buffer */
- // 用于运行 [生成 id 将 slots 填满] 任务
- private BufferPaddingExecutor bufferPaddingExecutor;
RingBuffer 填充时机
程序启动时, 将 RingBuffer 填充满, 缓存着 8192 个 id
在调用 getUID() 获取 id 时, 检测到 RingBuffer 中的剩余 id 个数小于总个数的 50%, 将 RingBuffer 填充满, 使其缓存 8192 个 id
定时填充 (可配置是否使用以及定时任务的周期)
填充 RingBuffer
- /**
- * Padding buffer fill the slots until to catch the cursor
- */
- public void paddingBuffer() {
- LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
- // is still running
- if (!running.compareAndSet(false, true)) {
- LOGGER.info("Padding buffer is still running. {}", ringBuffer);
- return;
- }
- // fill the REST slots until to catch the cursor
- boolean isFullRingBuffer = false;
- while (!isFullRingBuffer) {
- // 获取生成的 id, 放到 RingBuffer 中.
- List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
- for (Long uid : uidList) {
- isFullRingBuffer = !ringBuffer.put(uid);
- if (isFullRingBuffer) {
- break;
- }
- }
- }
- // not running now
- running.compareAndSet(true, false);
- LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
- }
生成 id(上面代码中的 uidProvider.provide 调用的就是这个方法)
- /**
- * Get the UIDs in the same specified second under the max sequence
- *
- * @param currentSecond
- * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1
- */
- protected List<Long> nextIdsForOneSecond(long currentSecond) {
- // Initialize result list size of (max sequence + 1)
- int listSize = (int) bitsAllocator.getMaxSequence() + 1;
- List<Long> uidList = new ArrayList<>(listSize);
- // Allocate the first sequence of the second, the others can be calculated with the offset
- // 这里的实现很取巧
- // 因为 1 秒内生成的 id 是连续的, 所以利用第 1 个 id 来生成后面的 id, 而不用频繁调用 snowflake 算法
- long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
- for (int offset = 0; offset <listSize; offset++) {
- uidList.add(firstSeqUid + offset);
- }
- return uidList;
- }
填充缓存行解决 "伪共享"
关于伪共享, 可以参考这篇文章《伪共享 (false sharing), 并发编程无声的性能杀手》
- // 数组在物理上是连续存储的, flags 数组用来保存 id 的状态 (是否可消费, 是否可填充), 在填入 id 和消费 id 时, 会被频繁的修改.
- // 如果不进行缓存行填充, 会导致频繁的缓存行失效, 直接从内存中读数据.
- private final PaddedAtomicLong[] flags;
- //tail 和 cursor 都使用缓存行填充, 是为了避免 tail 和 cursor 落到同一个缓存行上.
- /** Tail: last position sequence to produce */
- private final AtomicLong tail = new PaddedAtomicLong(START_POINT);
- /** Cursor: current position sequence to consume */
- private final AtomicLong cursor = new PaddedAtomicLong(START_POINT)
- /**
- * Represents a padded {@link AtomicLong} to prevent the FalseSharing problem<p>
- *
- * The CPU cache line commonly be 64 bytes, here is a sample of cache line after padding:<br>
- * 64 bytes = 8 bytes (object reference) + 6 * 8 bytes (padded long) + 8 bytes (a long value)
- * @author yutianbao
- */
- public class PaddedAtomicLong extends AtomicLong {
- private static final long serialVersionUID = -3415778863941386253L;
- /** Padded 6 long (48 bytes) */
- public volatile long p1, p2, p3, p4, p5, p6 = 7L;
- /**
- * Constructors from {@link AtomicLong}
- */
- public PaddedAtomicLong() {
- super();
- }
- public PaddedAtomicLong(long initialValue) {
- super(initialValue);
- }
- /**
- * To prevent GC optimizations for cleaning unused padded references
- */
- public long sumPaddingToPreventOptimization() {
- return p1 + p2 + p3 + p4 + p5 + p6;
- }
- }
PaddedAtomicLong 为什么要这么设计?
可以参考下面文章
一个 Java 对象到底占用多大内存? https://www.cnblogs.com/magialmoon/p/3757767.html
写 Java 也得了解 CPU-- 伪共享 https://www.cnblogs.com/techyc/p/3625701.html
来源: https://www.cnblogs.com/yeyang/p/10226284.html