本文基于最新的 3.4.2 的版本文档进行翻译, 翻译自:
Disruptor 简介
最好的方法去理解 Disruptor 就是将它和容易理解并且相似的队列, 例如 BlockingQueue.Disruptor 其实就像一个队列一样, 用于在不同的线程之间迁移数据, 但是 Disruptor 也实现了一些其他队列没有的特性, 如:
同一个 "事件" 可以有多个消费者, 消费者之间既可以并行处理, 也可以相互依赖形成处理的先后次序(形成一个依赖图);
预分配用于存储事件内容的内存空间;
针对极高的性能目标而实现的极度优化和无锁的设计;
Disruptor 核心架构组件
Ring Buffer:Ring Buffer 在 3.0 版本以前被认为是 Disruptor 的核心组件, 但是在之后的版本中只是负责存储和更新数据. 在一些高级使用案例中用户也能进行自定义
Sequence:Disruptor 使用一组 Sequence 来作为一个手段来标识特定的组件的处理进度 ( RingBuffer/Consumer ). 每个消费者和 Disruptor 本身都会维护一个 Sequence. 虽然一个 AtomicLong 也可以用于标识进度, 但定义 Sequence 来负责该问题还有另一个目的, 那就是防止不同的 Sequence 之间的 CPU 缓存伪共享(Flase Sharing) 问题.
Sequencer:Sequencer 是 Disruptor 的真正核心. 此接口有两个实现类 SingleProducerSequencer,MultiProducerSequencer , 它们定义在生产者和消费者之间快速, 正确地传递数据的并发算法.
Sequence Barrier: 保持 Sequencer 和 Consumer 依赖的其它 Consumer 的 Sequence 的引用. 除此之外还定义了决定 Consumer 是否还有可处理的事件的逻辑.
Wait Strategy:Wait Strategy 决定了一个消费者怎么等待生产者将事件 (Event) 放入 Disruptor 中.
Event: 从生产者到消费者传递的数据叫做 Event. 它不是一个被 Disruptor 定义的特定类型, 而是由 Disruptor 的使用者定义并指定.
EventProcessor: 持有特定的消费者的 Sequence, 并且拥有一个主事件循环 (main event loop) 用于处理 Disruptor 的事件. 其中 BatchEventProcessor 是其具体实现, 实现了事件循环(event loop), 并且会回调到实现了 EventHandler 的已使用过的实例中.
EventHandler: 由用户实现的接口, 用于处理事件, 是 Consumer 的真正实现
Producer: 生产者, 只是泛指调用 Disruptor 发布事件的用户代码, Disruptor 没有定义特定接口或类型.
事件广播(Multicast Events)
这是 Disruptor 和队列最大的区别. 当你有多个消费者监听了一个 Disruptor, 所有的事件将会被发布到所有的消费者中, 相比之下队列的一个事件只能被发到一个消费者中. Disruptor 这一特性被用来需要对同一数据进行多个并行操作的情况. 如在 LMAX 系统中有三个操作可以同时进行: 日志(将数据持久到日志文件中), 复制(将数据发送到其他的机器上, 以确保存在数据远程副本), 业务逻辑处理. 也可以使用 WokrerPool 来并行处理不同的事件.
消费者依赖关系图(Consumer Dependency Graph)
为了支持真实世界中的业务并行处理流程, Disruptor 提供了多个消费者之间的协助功能. 回到上面的 LMAX 的例子, 我们可以让日志处理和远程副本赋值先执行完之后再执行业务处理流程, 这个功能被称之为 gating.gating 发生在两种场景中. 第一, 我们需要确保生产者不要超过消费者. 通过调用 RingBuffer.addGatingConsumers()增加相关的消费者至 Disruptor 来完成. 第二, 就是之前所说的场景, 通过构造包含需要必须先完成的消费者的 Sequence 的 SequenceBarrier 来实现.
引用上面的例子来说, 有三个消费者监听来自 RingBuffer 的事件. 这里有一个依赖关系图. ApplicationConsumer 依赖 JournalConsumer 和 ReplicationConsumer. 这个意味着 JournalConsumer 和 ReplicationConsumer 可以自由的并发运行. 依赖关系可以看成是从 ApplicationConsumer 的 SequenceBarrier 到 JournalConsumer 和 ReplicationConsumer 的 Sequence 的连接. 还有一点值得关注, Sequencer 与下游的消费者之间的关系. 它的角色是确保发布不会包裹 RingBuffer. 为此, 所有下游消费者的 Sequence 不能比 ring buffer 的 Sequence 小且不能比 ring buffer 的大小小. 因为 ApplicationConsumers 的 Sequence 是确保比 JournalConsumer 和 ReplicationConsumer 的 Sequence 小或等于, 所以 Sequencer 只需要检查 ApplicationConsumers 的 Sequence. 在更为普遍的应用场景中, Sequencer 只需要意识到消费者树中的叶子节点的的 Sequence 即可.
事件预分配(Event Preallocation)
Disruptor 的一个目标之一是被用在低延迟的环境中. 在一个低延迟系统中有必要去减少和降低内存的占用. 在基于 Java 的系统中, 需要减少由于 GC 导致的停顿次数(在低延迟的 C/C++ 系统中, 由于内存分配器的争用, 大量的内存分配也会导致问题).
为了满足这点, 用户可以在 Disruptor 中为事件预分配内存. 所以 EventFactory 是用户来提供, 并且 Disruptor 的 Ring Buffer 每个 entry 中都会被调用. 当将新的数据发布到 Disruptor 中时, Disruptor 的 API 将会允许用户持有所构造的对象, 以便用户可以调用这些对象的方法和更新字段到这些对象中. Disruptor 将确保这些操作是线程安全.
可选择的无锁
无锁算法实现的 Disruptor 的所有内存可见性和正确性都使用内存屏障和 CAS 操作实现. 只仅仅一个场景 BlockingWaitStrategy 中使用到了 lock. 而这仅仅是为了使用 Condition, 以便消费者线程能被 park 住当在等待一个新的事件到来的时候. 许多低延迟系统都使用自旋 (busy-wait) 来避免使用 Condition 造成的抖动. 但是自旋 (busy-wait) 的数量变多时将会导致性能的下降, 特别是 CPU 资源严重受限的情况下. 例如, 在虚拟环境中的 web 服务器.
Disruptor 使用
我们使用一个简单的例子来体验一下 Disruptor. 生产者会传递一个 long 类型的值到消费者, 消费者接受到这个值后会打印出这个值.
定义 Event
- public class LongEvent
- {
- private long value;
- public void set(long value)
- {
- this.value = value;
- }
- }
为了使用 Disruptor 的内存预分配 event, 我们需要定义一个 EventFactory:
- import com.lmax.disruptor.EventFactory;
- public class LongEventFactory implements EventFactory<LongEvent>
- {
- public LongEvent newInstance()
- {
- return new LongEvent();
- }
- }
为了让消费者处理这些事件, 所以我们这里定义一个事件处理器, 负责打印 event:
- import com.lmax.disruptor.EventHandler;
- public class LongEventHandler implements EventHandler<LongEvent>
- {
- public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
- {
- System.out.println("Event:" + event);
- }
- }
使用 Translators 发布事件
在 Disruptor 的 3.0 版本中, 由于加入了丰富的 Lambda 风格的 API, 可以用来帮组开发人员简化流程. 所以在 3.0 版本后首选使用 Event Publisher/Event Translator 来发布事件.
- import com.lmax.disruptor.RingBuffer;
- import com.lmax.disruptor.EventTranslatorOneArg;
- public class LongEventProducerWithTranslator
- {
- private final RingBuffer<LongEvent> ringBuffer;
- public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
- {
- this.ringBuffer = ringBuffer;
- }
- private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
- new EventTranslatorOneArg<LongEvent, ByteBuffer>()
- {
- public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
- {
- event.set(bb.getLong(0));
- }
- };
- public void onData(ByteBuffer bb)
- {
- ringBuffer.publishEvent(TRANSLATOR, bb);
- }
- }
这种方法的另一个优点是可以将 translator 代码放入一个单独的类中, 并且可以轻松地对它们进行独立的单元测试.
使用过时的 API 发布事件
- import com.lmax.disruptor.RingBuffer;
- public class LongEventProducer
- {
- private final RingBuffer<LongEvent> ringBuffer;
- public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
- {
- this.ringBuffer = ringBuffer;
- }
- public void onData(ByteBuffer bb)
- {
- long sequence = ringBuffer.next(); // Grab the next sequence
- try
- {
- LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
- // for the sequence
- event.set(bb.getLong(0)); // Fill with data
- }
- finally
- {
- ringBuffer.publish(sequence);
- }
- }
- }
这里我们需要把发布包裹在 try/finally 代码块中. 如果某个请求的 sequence 未被提交, 将会堵塞后续的发布操作或者其它的 producer. 特别地在多生产中如果没有提交 Sequence, 那么会造成消费者停滞, 导致只能重启消费者才能恢复.
整合
- import com.lmax.disruptor.DSL.Disruptor;
- import com.lmax.disruptor.RingBuffer;
- import com.lmax.disruptor.util.DaemonThreadFactory;
- import java.nio.ByteBuffer;
- public class LongEventMain
- {
- public static void main(String[] args) throws Exception
- {
- // The factory for the event
- LongEventFactory factory = new LongEventFactory();
- // Specify the size of the ring buffer, must be power of 2.
- int bufferSize = 1024;
- // Construct the Disruptor
- Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
- // Connect the handler
- disruptor.handleEventsWith(new LongEventHandler());
- // Start the Disruptor, starts all threads running
- disruptor.start();
- // Get the ring buffer from the Disruptor to be used for publishing.
- RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
- LongEventProducer producer = new LongEventProducer(ringBuffer);
- ByteBuffer bb = ByteBuffer.allocate(8);
- for (long l = 0; true; l++)
- {
- bb.putLong(0, l);
- producer.onData(bb);
- Thread.sleep(1000);
- }
- }
- }
我们也可以使用 Java 8 的函数式编程来写这个例子:
- import com.lmax.disruptor.DSL.Disruptor;
- import com.lmax.disruptor.RingBuffer;
- import com.lmax.disruptor.util.DaemonThreadFactory;
- import java.nio.ByteBuffer;
- public class LongEventMain
- {
- public static void main(String[] args) throws Exception
- {
- // Specify the size of the ring buffer, must be power of 2.
- int bufferSize = 1024;
- // Construct the Disruptor
- Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
- // Connect the handler
- disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event:" + event));
- // Start the Disruptor, starts all threads running
- disruptor.start();
- // Get the ring buffer from the Disruptor to be used for publishing.
- RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
- ByteBuffer bb = ByteBuffer.allocate(8);
- for (long l = 0; true; l++)
- {
- bb.putLong(0, l);
- ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
- Thread.sleep(1000);
- }
- }
- }
使用函数式编程我们可以发现很多的类都不需要了, 如: handler,translator 等.
上面的代码还可以再简化一下:
- ByteBuffer bb = ByteBuffer.allocate(8);
- for (long l = 0; true; l++)
- {
- bb.putLong(0, l);
- ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
- Thread.sleep(1000);
- }
不过这样将实例化一个对象去持有 ByteBuffer bb 变量传入 lambda 的值. 这会产生不必要的垃圾. 因此, 如果要求低 GC 压力, 则应首选将参数传递给 lambda 的调用.
提升性能的两个参数
如果想要让 Disruptor 拥有更好的性能这里有两个选项可以调整, wait strategy 和 producer 的类型.
单生产者 vs 多生产者
最好的方法在并发环境下提高性能是坚持单独写原则( Single Writer Principle). 如果你的业务场景中只有一个线程写入数据到 Disruptor, 那么你可以设置成单生产者来提升性能:
- public class LongEventMain
- {
- public static void main(String[] args) throws Exception
- {
- //.....
- // Construct the Disruptor with a SingleProducerSequencer
- Disruptor<LongEvent> disruptor = new Disruptor(
- factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
- //.....
- }
- }
性能测试:
- Multiple Producer
- Run 0, Disruptor=26,553,372 ops/sec
- Run 1, Disruptor=28,727,377 ops/sec
- Run 2, Disruptor=29,806,259 ops/sec
- Run 3, Disruptor=29,717,682 ops/sec
- Run 4, Disruptor=28,818,443 ops/sec
- Run 5, Disruptor=29,103,608 ops/sec
- Run 6, Disruptor=29,239,766 ops/sec
- Single Producer
- Run 0, Disruptor=89,365,504 ops/sec
- Run 1, Disruptor=77,579,519 ops/sec
- Run 2, Disruptor=78,678,206 ops/sec
- Run 3, Disruptor=80,840,743 ops/sec
- Run 4, Disruptor=81,037,277 ops/sec
- Run 5, Disruptor=81,168,831 ops/sec
- Run 6, Disruptor=81,699,346 ops/sec
等待策略
BlockingWaitStrategy
Disruptor 的默认策略是 BlockingWaitStrategy. 在 BlockingWaitStrategy 内部是使用锁和 condition 来控制线程的唤醒. BlockingWaitStrategy 是最低效的策略, 但其对 CPU 的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
SleepingWaitStrategy
SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多, 对 CPU 的消耗也类似, 但其对生产者线程的影响最小, 通过使用 LockSupport.parkNanos(1)来实现循环等待. 一般来说 Linux 系统会暂停一个线程约 60µs, 这样做的好处是, 生产线程不需要采取任何其他行动就可以增加适当的计数器, 也不需要花费时间信号通知条件变量. 但是, 在生产者线程和使用者线程之间移动事件的平均延迟会更高. 它在不需要低延迟并且对生产线程的影响较小的情况最好. 一个常见的用例是异步日志记录.
YieldingWaitStrategy
YieldingWaitStrategy 是可以使用在低延迟系统的策略之一. YieldingWaitStrategy 将自旋以等待序列增加到适当的值. 在循环体内, 将调用 Thread.yield(), 以允许其他排队的线程运行. 在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中, 推荐使用此策略; 例如, CPU 开启超线程的特性.
BusySpinWaitStrategy
性能最好, 适合用于低延迟的系统. 在要求极高性能且事件处理线程数小于 CPU 逻辑核心树的场景中, 推荐使用此策略; 例如, CPU 开启超线程的特性.
清除 Ring Buffer 中的对象
通过 Disruptor 传递数据时, 对象的生存期可能比预期的更长. 为避免发生这种情况, 可能需要在处理事件后清除事件. 如果只有一个事件处理程序, 则需要在处理器中清除对应的对象. 如果您有一连串的事件处理程序, 则可能需要在该链的末尾放置一个特定的处理程序来处理清除对象.
- class ObjectEvent<T>
- {
- T val;
- void clear()
- {
- val = null;
- }
- }
- public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
- {
- public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
- {
- // Failing to call clear here will result in the
- // object associated with the event to live until
- // it is overwritten once the ring buffer has wrapped
- // around to the beginning.
- event.clear();
- }
- }
- public static void main(String[] args)
- {
- Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
- () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);
- disruptor
- .handleEventsWith(new ProcessingEventHandler())
- .then(new ClearingObjectHandler());
- }
来源: https://www.cnblogs.com/luozhiyun/p/11631305.html