一. Disruptor
Disruptor 是一个高性能的异步处理框架.
Disruptor 是 LMAX 在线交易平台的关键组成部分, LMAX 平台使用该框架对订单处理速度能达到 600 万 TPS, 除金融领域之外, 其他一般的应用中都可以用到 Disruptor, 它可以带来显著的性能提升. 其实 Disruptor 与其说是一个框架, 不如说是一种设计思路, 这个设计思路对于存在 "并发, 缓冲区, 生产者 - 消费者模型, 事务处理" 这些元素的程序来说, Disruptor 提出了一种大幅提升性能 (TPS) 的方案.
二. 实践
https://github.com/fengzhizi715/NetDiscovery 是基于 Vert.x,RxJava 2 等框架实现的爬虫框架.
https://github.com/fengzhizi715/NetDiscovery 默认的消息队列采用 JDK 的 ConcurrentLinkedQueue, 由于爬虫框架各个组件都可以被替换, 所以下面基于 Disruptor 实现爬虫的 Queue.
2.1 事件的封装
将爬虫的 request 封装成一个 RequestEvent, 该事件会在 Disruptor 中传输.
- import com.cv4j.netdiscovery.core.domain.Request;
- import lombok.Data;
- /**
- * Created by tony on 2018/9/1.
- */
- @Data
- public class RequestEvent {
- private Request request;
- public String toString() {
- return request.toString();
- }
- }
2.2 发布事件
下面编写事件的发布, 从 RingBuffer 中获取下一个可写入事件的序号, 将爬虫要请求的 request 设置到 RequestEvent 事件中, 最后将事件提交到 RingBuffer.
- import com.cv4j.netdiscovery.core.domain.Request;
- import com.lmax.disruptor.RingBuffer;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * Created by tony on 2018/9/2.
- */
- public class Producer {
- private final RingBuffer<RequestEvent> ringBuffer;
- private AtomicInteger count = new AtomicInteger(0); // 计数器
- public Producer(RingBuffer<RequestEvent> ringBuffer) {
- this.ringBuffer = ringBuffer;
- }
- public void pushData(Request request){
- long sequence = ringBuffer.next();
- try{
- RequestEvent event = ringBuffer.get(sequence);
- event.setRequest(request);
- }finally {
- ringBuffer.publish(sequence);
- count.incrementAndGet();
- }
- }
- /**
- * 发送到队列中到 Request 的数量
- * @return
- */
- public int getCount() {
- return count.get();
- }
- }
2.3 消费事件
RequestEvent 设置了 request 之后, 消费者需要处理具体的事件. 下面的 Consumer 仅仅是记录消费者的线程名称以及 request. 真正的 "消费" 还是需要从 DisruptorQueue 的 poll() 中获取 request , 然后在 Spider 中进行 "消费".
- import com.lmax.disruptor.WorkHandler;
- import lombok.extern.slf4j.Slf4j;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * Created by tony on 2018/9/2.
- */
- @Slf4j
- public class Consumer implements WorkHandler<RequestEvent> {
- @Override
- public void onEvent(RequestEvent requestEvent) throws Exception {
- log.info("consumer:" + Thread.currentThread().getName() + "requestEvent: value=" + requestEvent.toString());
- }
- }
2.4 DisruptorQueue 的实现
Disruptor 支持单生产者单消费者, 多生产者, 多消费者, 分组等方式.
在 https://github.com/fengzhizi715/NetDiscovery 中采用多生产者多消费者.
在 RingBuffer 创建时, ProducerType 使用 MULTI 类型表示多生产者. 创建 RingBuffer 采用了 YieldingWaitStrategy .YieldingWaitStrategy 是一种 WaitStrategy, 不同的 WaitStrategy 会有不同的性能.
YieldingWaitStrategy 性能是最好的, 适合用于低延迟的系统. 在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中, 推荐使用此策略; 例如, CPU 开启超线程的特性.
- ringBuffer = RingBuffer.create(ProducerType.MULTI,
- new EventFactory<RequestEvent>() {
- @Override
- public RequestEvent newInstance() {
- return new RequestEvent();
- }
- },
- ringBufferSize ,
- new YieldingWaitStrategy());
EventProcessor 用于处理 Disruptor 中的事件.
EventProcessor 的实现类包括: BatchEventProcessor 用于单线程批量处理事件, WorkProcessor 用于多线程处理事件.
WorkerPool 管理着一组 WorkProcessor. 创建完 ringBuffer 之后, 创建 workerPool:
- SequenceBarrier barriers = ringBuffer.newBarrier();
- for (int i = 0; i <consumers.length; i++) {
- consumers[i] = new Consumer();
- }
- workerPool = new WorkerPool<RequestEvent>(ringBuffer,
- barriers,
- new EventExceptionHandler(),
- consumers);
启动 workerPool:
- ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
- workerPool.start(Executors.newFixedThreadPool(threadNum));
最后是 DisruptorQueue 完整的代码:
- import com.cv4j.netdiscovery.core.domain.Request;
- import com.cv4j.netdiscovery.core.queue.AbstractQueue;
- import com.lmax.disruptor.*;
- import com.lmax.disruptor.DSL.ProducerType;
- import lombok.extern.slf4j.Slf4j;
- import java.util.concurrent.Executors;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * Created by tony on 2018/9/1.
- */
- @Slf4j
- public class DisruptorQueue extends AbstractQueue {
- private RingBuffer<RequestEvent> ringBuffer;
- private Consumer[] consumers = null;
- private Producer producer = null;
- private WorkerPool<RequestEvent> workerPool = null;
- private int ringBufferSize = 1024*1024; // RingBuffer 大小, 必须是 2 的 N 次方
- private AtomicInteger consumerCount = new AtomicInteger(0);
- private static final int CONSUME_NUM = 2;
- private static final int THREAD_NUM = 4;
- public DisruptorQueue() {
- this(CONSUME_NUM,THREAD_NUM);
- }
- public DisruptorQueue(int consumerNum,int threadNum) {
- consumers = new Consumer[consumerNum];
- // 创建 ringBuffer
- ringBuffer = RingBuffer.create(ProducerType.MULTI,
- new EventFactory<RequestEvent>() {
- @Override
- public RequestEvent newInstance() {
- return new RequestEvent();
- }
- },
- ringBufferSize ,
- new YieldingWaitStrategy());
- SequenceBarrier barriers = ringBuffer.newBarrier();
- for (int i = 0; i <consumers.length; i++) {
- consumers[i] = new Consumer();
- }
- workerPool = new WorkerPool<RequestEvent>(ringBuffer,
- barriers,
- new EventExceptionHandler(),
- consumers);
- ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
- workerPool.start(Executors.newFixedThreadPool(threadNum));
- producer = new Producer(ringBuffer);
- }
- @Override
- protected void pushWhenNoDuplicate(Request request) {
- producer.pushData(request);
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- @Override
- public Request poll(String spiderName) {
- Request request = ringBuffer.get(ringBuffer.getCursor() - producer.getCount() +1).getRequest();
- ringBuffer.next();
- consumerCount.incrementAndGet();
- return request;
- }
- @Override
- public int getLeftRequests(String spiderName) {
- return producer.getCount()-consumerCount.get();
- }
- public int getTotalRequests(String spiderName) {
- return super.getTotalRequests(spiderName);
- }
- static class EventExceptionHandler implements ExceptionHandler {
- public void handleEventException(Throwable ex, long sequence, Object event) {
- log.debug("handleEventException:" + ex);
- }
- public void handleOnStartException(Throwable ex) {
- log.debug("handleOnStartException:" + ex);
- }
- public void handleOnShutdownException(Throwable ex) {
- log.debug("handleOnShutdownException:" + ex);
- }
- }
- }
其中, pushWhenNoDuplicate() 是将 request 发送到 ringBuffer 中. poll() 是从 ringBuffer 中取出对应的 request , 用于爬虫进行网络请求, 解析请求等处理.
总结:
爬虫框架 GitHub 地址: https://github.com/fengzhizi715/NetDiscovery
上述代码是比较经典的 Disruptor 多生产者多消费者的代码, 亦可作为样板代码使用.
最后, 在爬虫框架是面向接口编程的, 所以替换其中的任意组件都比较方便.
Java 与 Android 技术栈: 每周更新推送原创技术文章, 欢迎扫描下方的公众号二维码并关注, 期待与您的共同成长和进步.
来源: https://juejin.im/post/5c0727dfe51d451d8b7bd1f5