在项目中,我们大多数开发者可能都使用过 EventBus, 即使没有使用过但我可以确定 Android 开发者也听说过这个牛 X 的库,从诞生到目前 EventBus 已经更新到 3.X 版本,可见生命力极强呀。那么这篇博文就从 EventBus3.0 源码的角度分析一下其内部处理流程。
注册:
- EventBus.getDefault().register(obj)
订阅 (消息接收):
- @Subscribe
- public void receive(Object event){
- }
发布消息:
- EventBus.getDefault().post(event)
注销:
- EventBus.getDefault().unregister(obj)
- EventBus.getDefault().register(obj)
这段代码做了两件事情:① EventBus.getDefault() 创建 EventBus 对象;② register(obj) 方法为 obj 该类对象注册 EventBus。 那这两个方法究竟在 EventBus 中究竟做了哪些工作呢?我们打开 EventBus 的源码看一下:
1、EventBus.getDefault() 源码如下:看到了吧,EventBus 采用单例模式创建 EventBus 对象,接下来它在构造方法中又做了什么事情呢?
- public static EventBus getDefault() {
- if (defaultInstance == null) {
- synchronized (EventBus.class) {
- if (defaultInstance == null) {
- defaultInstance = new EventBus();
- }
- }
- }
- return defaultInstance;
- }
在构造方法中其调用了有参构造方法:EventBus(EventBusBuilder builder),我们再跟进去看一看:
- public EventBus() {
- this(DEFAULT_BUILDER);
- }
这段代码对一些变量进行了初始化,现在就挑重要的变量解释一下。首先,初始化了 3 个 Map,这 3 个 Map 有什么用呢?我们再来一一详细说一下: ① Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType:key: 事件类型(如:String 类型或者自定义的事件类型),value: 该事件的订阅者的 list 集合。当发送 event 消息的时候,都是去这里找对应的订阅者。② Map当
- EventBus(EventBusBuilder builder) {
- subscriptionsByEventType = new HashMap<>();
- typesBySubscriber = new HashMap<>();
- stickyEvents = new ConcurrentHashMap<>();
- mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
- backgroundPoster = new BackgroundPoster(this);
- asyncPoster = new AsyncPoster(this);
- indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
- //默认情况下参数为(null,false,false)
- subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
- builder.strictMethodVerification, builder.ignoreGeneratedIndex);
- logSubscriberExceptions = builder.logSubscriberExceptions;
- logNoSubscriberMessages = builder.logNoSubscriberMessages;
- sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
- sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
- throwSubscriberException = builder.throwSubscriberException;
- eventInheritance = builder.eventInheritance;
- executorService = builder.executorService;
- }
和
- register()
的时候都是操作这个 Map。 ③ Map<Class<?>, Object> stickyEvents:维护的是粘性事件的集合,粘性事件也就是当 event 发送出去之后再注册粘性事件的话,该粘性事件也能接收到之前发送出去的 event 消息。 其次,初始化 3 个消息发送器如下:mainThreadPoster :该类继承自 Handler,而且在 EventBus 中 mainThreadPoster 属于主线程 Handler,这是因为 mainThreadPoster 就是为处理 "消息接收方法在主线程而消息发送在子线程" 这个问题而设计的,所以子线程向主线程发送消息必须使用主线程的 Handler。mainThreadPoster 继承 Handler 也是为了效率考虑的。 backgroundPoster:该类继承自 Runnable,重写了 run() 方法。在 run()方法中将子线程中的消息通过 EventBus 发送到主线程。所以这个消息发送器作用就是处理" 消息接收方法在子线程接而消息的发布在主线程 " 这样的问题。 asyncPoster:该类继承自 Runnable,也重写了 run() 方法,不过就像名字一样" 异步 ",也就是说不管订阅者是不是在主线程,消息接收方法都会另外开启一个线程处理消息。 然后,一个重要的初始化对象为 subscriberMethodFinder,这个对象利用反射的方法查找每一个接收消息者的方法(也即是添加了 "@Subscribe" 注解的方法)。最后就是一些对 EventBusBuilder 的一些配置信息。其中 eventInheritance 和 executorService 在接下来分析源码时会经常碰到:①eventInheritance 表示我们自定义的待发布消息事件是否允许继承,,默认情况下 eventInheritance==true。它的作用就是处理业务时后来新增加业务后不必再修改代码,只需要继承就 OK 啦(这也符合程序的 "开闭原则")如下:
- unregister()
②executorService:这个线程池是给 backgroundPoster 和 asyncPoster 用来处理消息发送的。这样做也能够提高消息发送的效率。 2、注册 register(Object subscriber) EventBus 的初始化工作已经完毕,我们继续看一下 EventBus 是怎么进行注册的,在注册过程中又搞了哪些事情?
- public class MessageEvent {
- public final String message;
- public MessageEvent(String message) {
- this.message = message;
- }
- }
- public class SubMessageEvent extends MessageEvent {
- public SubMessageEvent(String message) {
- super(message);
- }
- }
在该方法中首先取得注册者的运行时类对象,拿到运行时类对象后通过注册者注册方法查找器 SubscriberMethodFinder 利用反射的方法找到注册者类中所有的接收消息的方法,也即是所有添加了注解 "Subscribe" 的方法。最后进行通过方法 subscribe(subscriber, subscriberMethod) 为每一个接收消息的方法进行注册。流程大致就是这样的,首先 我们先看一下 findSubscriberMethods 这个方法:
- public void register(Object subscriber) {
- Class<?> subscriberClass = subscriber.getClass();
- List<SubscriberMethod> subscriberMethods = subscriberMethodFinder
- .findSubscriberMethods(subscriberClass);
- synchronized (this) {
- for (SubscriberMethod subscriberMethod : subscriberMethods) {
- subscribe(subscriber, subscriberMethod);
- }
- }
- }
- /**
- * 方法描述:获取该运行时类的所有@Subscribe注解的所有方法
- *
- * @param subscriberClass @Subscribe注解所属类的运行时类对象
- * @return 注册EventBus类中@Subscribe注解的所有方法的List集合
- */
- List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
- //先从缓存中查找是否存在消息接收的方法
- List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
- if (subscriberMethods != null) {
- return subscriberMethods;
- }
- if (ignoreGeneratedIndex) {
- //使用反射方法拿到订阅者中的订阅方法
- subscriberMethods = findUsingReflection(subscriberClass);
- } else {
- //使用apt处理器拿到订阅者中的订阅方法
- subscriberMethods = findUsingInfo(subscriberClass);
- }
- //如果该消息事件没有被订阅则抛出异常
- if (subscriberMethods.isEmpty()) {
- throw new EventBusException("Subscriber " + subscriberClass
- + " and its super classes have no public methods with the @Subscribe annotation");
- } else {
- //将查到的方法放到缓存中,以便下次使用
- METHOD_CACHE.put(subscriberClass, subscriberMethods);
- return subscriberMethods;
- }
- }
这个方法很重要,在初次使用 EventBus3.0 的时候也容易出错的一个点(3.0 新增注解):就是在订阅事件的方法上没有添加
@Subscribe 注解,所以会碰到下面这个异常:
- Caused by: org.greenrobot.eventbus.EventBusException: Subscriber class XXX and its super classes
- have no public methods with the @Subscribe annotation
说到这我们还是没有最终看到 EventBus 是怎么进行注册的,
OK,回过头来我们继续看注册:
- private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
- Class<?> eventType = subscriberMethod.eventType;
- Log.e(TAG, eventType.getSimpleName());
- //将订阅类Object对象subscriber封装为EventBus的订阅类Subscription
- Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
- CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
- if (subscriptions == null) {
- subscriptions = new CopyOnWriteArrayList<>();
- subscriptionsByEventType.put(eventType, subscriptions);
- } else {
- if (subscriptions.contains(newSubscription)) {
- throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
- + eventType);
- }
- }
- /**
- * 方法描述:对CopyOnWriteArrayList中的Subscription根据优先级的高低重新进行排序
- */
- int size = subscriptions.size();
- for (int i = 0; i <= size; i++) {
- if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
- subscriptions.add(i, newSubscription);
- break;
- }
- }
- /**
- * 方法描述:将开发者注册EventBus的类的运行时类添加到subscribedEvents中,并且把该运行时类添加到
- * typesBySubscriber中
- */
- List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
- Log.e(TAG, "typesBySubscriber的");
- if (subscribedEvents == null) {
- subscribedEvents = new ArrayList<>();
- typesBySubscriber.put(subscriber, subscribedEvents);
- }
- subscribedEvents.add(eventType);
- if (subscriberMethod.sticky) {
- if (eventInheritance) {
- // Existing sticky events of all subclasses of eventType have to be considered.
- // Note: Iterating over all events may be inefficient with lots of sticky events,
- // thus data structure should be changed to allow a more efficient lookup
- // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
- Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
- for (Map.Entry<Class<?>, Object> entry : entries) {
- Class<?> candidateEventType = entry.getKey();
- if (eventType.isAssignableFrom(candidateEventType)) {
- Object stickyEvent = entry.getValue();
- checkPostStickyEventToSubscription(newSubscription, stickyEvent);
- }
- }
- } else {
- Object stickyEvent = stickyEvents.get(eventType);
- checkPostStickyEventToSubscription(newSubscription, stickyEvent);
- }
- }
- }
上面的这段代码虽然很多,但主要做了几件事情:① 将注册的订阅者封装为新的 Subscription 类 ②将订阅者存储到 Map 集合
subscriptionsByEventType当中 ③对消息事件接收者根据优先级进行重排序 ④添加粘性消息事件
我们已经分析完了 EventBus 的注册过程,接下来我们再来分析一下 EventBus 的事件发送过程。
那么这段代码是如何实现消息的发送呢?继续源码看一下:
- EventBus.getDefault().post(event);
在这个方法中并没有真正的看到消息的分发,而是查找了待分发事件消息及其子类或者是待分发消息接口及其子类的所有事件(默认情况下我们定义的消息事件是允许继承的。 我们在项目中起初可能考虑的不是很全面,再到后来不可预料的需求到来时我们可能会继续改事件的一种情况,看到这不得不说 EventBus 真心考虑周全呀)。然后调用 postSingleEventForEventType(event, postingState, eventClass) 方法查找该事件及其子类事件的订阅者,如果没有找到就发送空消息并打印日志。好吧,很失望,到现在依然没有看到对消息事件进行分发。那我们继续跟进:postSingleEventForEventType(event, postingState, eventClass);
- public void post(Object event) {
- PostingThreadState postingState = currentPostingThreadState.get();
- List<Object> eventQueue = postingState.eventQueue;
- eventQueue.add(event);//将该事件添加到事件队列当中
- //事件没有分发则开始分发
- if (!postingState.isPosting) {
- //判断消息接收者是否在主线程
- postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
- postingState.isPosting = true;
- if (postingState.canceled) {
- throw new EventBusException("Internal error. Abort state was not reset");
- }
- try {
- //循环发送消息
- while (!eventQueue.isEmpty()) {
- postSingleEvent(eventQueue.remove(0), postingState);
- }
- } finally {
- postingState.isPosting = false;
- postingState.isMainThread = false;
- }
- }
- }
- 从上面的代码中可以得知,待发送的消息首先存储到一个消息list集合当中,然后再不断的循环发送消息。发送消息时利用的方法是postSingleEvent(Object event, PostingThreadState postingState ),OK,我们继续跟进:
- private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
- Class<?> eventClass = event.getClass();
- boolean subscriptionFound = false;
- //默认情况下Event事件允许继承,即默认情况下eventInheritance==true
- if (eventInheritance) {
- //查找event事件及event子类事件
- List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
- int countTypes = eventTypes.size();
- for (int h = 0; h < countTypes; h++) {
- Class<?> clazz = eventTypes.get(h);
- subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
- }
- } else {
- subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
- }
- if (!subscriptionFound) {
- ...
- }
- }
好吧,小眼一瞄仍然没有对消息进行分发,而是查找事件的所有订阅者然后对所有订阅者进行了一层封装,封装成 PostingThreadState。那我们还是继续吧,我们跟进 postToSubscription(subscription, event, postingState.isMainThread) 这个方法:
- private boolean postSingleEventForEventType(Object event, PostingThreadState postingState,
- Class<?> eventClass) {
- CopyOnWriteArrayList<Subscription> subscriptions;
- synchronized (this) {
- //从Map中取出所有订阅了eventClass事件的所有订阅者
- subscriptions = subscriptionsByEventType.get(eventClass);
- }
- //如果该事件的订阅者存在则向每一个订阅者发布消息事件
- if (subscriptions != null && !subscriptions.isEmpty()) {
- for (Subscription subscription : subscriptions) {
- postingState.event = event;
- postingState.subscription = subscription;
- boolean aborted = false;
- try {
- postToSubscription(subscription, event, postingState.isMainThread);
- aborted = postingState.canceled;
- } finally {
- postingState.event = null;
- postingState.subscription = null;
- postingState.canceled = false;
- }
- if (aborted) {
- break;
- }
- }
- return true;
- }
- return false;
- }
看到这是不是有一种 "复行数十步,豁然开朗" 的感觉。是的,在这个方法中我们终于看到了对消息事件进行 4 中不同情况下的分发了。根据消息接收的 threadMode 分别进行了不同的处理: POSTING:EventBus 默认情况下的 threadMode 类型,这里意思就是如果消息发布和消息接收在同一线程情况下就直接调用 invokeSubscriber(subscription, event) 对消息进行发送。这种情况下事件传递是同步完成的,事件传递完成时,所有的订阅者将已经被调用一次了。这个 ThreadMode 意味着最小的开销,因为它完全避免了线程的切换。 MAIN:消息接收在主线程中进行(此种情况适合进行 UI 操作),如果消息发布也在主线程就直接调用 invokeSubscriber(subscription, event) 对消息进行发送(这种情况是 POSTING 的情况),如果消息发布不在主线程中进行,那么调用 mainThreadPoster.enqueue(subscription, event) 进行处理。他是怎么处理的呢?我们跟进去瞧瞧:
- private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
- switch (subscription.subscriberMethod.threadMode) {
- case POSTING:
- invokeSubscriber(subscription, event);
- break;
- case MAIN:
- if (isMainThread) {
- invokeSubscriber(subscription, event);
- } else {
- mainThreadPoster.enqueue(subscription, event);
- }
- break;
- case BACKGROUND:
- if (isMainThread) {
- backgroundPoster.enqueue(subscription, event);
- } else {
- invokeSubscriber(subscription, event);
- }
- break;
- case ASYNC:
- asyncPoster.enqueue(subscription, event);
- break;
- default:
- throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
- }
- }
为了说明问题,我们整个类贴出来:由于是主线程向子线程发送消息所以 Looper 采用的是主线程 Looper,Handler 也就是主线程 Handler,其内部维护了一个 PendingPost 的对象池,这样做也是为了提高内存利用率,这也不是重点,我们直接看重点,在 enqueue(Subscription subscription, Object event) 方法中利用 HandlerPoster 发送空消息,HandlerPoster 也重写了 handleMessage 方法,在 handleMessage 方法中又调用 eventBus.invokeSubscriber(pendingPost) 进行消息发送,我们跟进去之后发现最终还是调用了 invokeSubscriber(subscription, event) 对消息进行发送。 BACKGROUND:这种情况是消息接收在子线程(此种模式下适合在接收者方法中做 IO 等耗时操作)。那么如果消息发布也在某个子线程中进行的就直接调用 invokeSubscriber(subscription, event) 对消息进行发送,如果消息发布在主线程当中应该尽可能快的将消息发送出去以免造成主线程阻塞,所以这时候就交给 backgroundPoster 去处理。它是怎么处理的呢?我们进去看一看:
- final class HandlerPoster extends Handler {
- private final PendingPostQueue queue;
- private final int maxMillisInsideHandleMessage;
- private final EventBus eventBus;
- private boolean handlerActive;
- //默认情况下EventBus创建HandlerPoster的Looper为MainLooper,最大maxMillisInsideHandleMessage==10ms
- HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
- super(looper);
- this.eventBus = eventBus;
- this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
- queue = new PendingPostQueue();
- }
- /**
- * 方法描述:将订阅者与消息实体之间的映射存到队列PendingPostQueue当中
- *
- * @param subscription 订阅者
- * @param event 消息事件
- */
- void enqueue(Subscription subscription, Object event) {
- PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
- synchronized (this) {
- queue.enqueue(pendingPost);
- if (!handlerActive) {
- handlerActive = true;
- if (!sendMessage(obtainMessage())) {
- throw new EventBusException("Could not send handler message");
- }
- }
- }
- }
- @Override
- public void handleMessage(Message msg) {
- boolean rescheduled = false;
- try {
- long started = SystemClock.uptimeMillis();
- while (true) {
- //同步取出消息队列
- PendingPost pendingPost = queue.poll();
- if (pendingPost == null) {
- synchronized (this) {
- // Check again, this time in synchronized
- pendingPost = queue.poll();
- if (pendingPost == null) {
- handlerActive = false;
- return;
- }
- }
- }
- eventBus.invokeSubscriber(pendingPost);
- long timeInMethod = SystemClock.uptimeMillis() - started;
- //消息发送超时
- if (timeInMethod >= maxMillisInsideHandleMessage) {
- if (!sendMessage(obtainMessage())) {
- throw new EventBusException("Could not send handler message");
- }
- rescheduled = true;
- return;
- }
- }
- } finally {
- handlerActive = rescheduled;
- }
- }
- }
backgroundPoster 对 Runnable 进行了重写,而且和 HandlerPoster 一样也采用了对象池提高效率,当然重点是其开启了线程池处理消息的发送,这也是避免阻塞主线程的举措。当然其最终还是调用了 invokeSubscriber()-----》invokeSubscriber(subscription, event) 方法。ASYNC:这种情况是消息接收在子线程(如果消息发布在子线程中进行,那么该子线程既不同于消息发布的子线程,又不在主线程,而是接收消息是一个独立于主线程又不同于消息发布的子线程)。由于在这种模式下每一个新添加的任务都会在线程池中开辟一个新线程执行,所以并发量更高效。而且最终还是会调用 invokeSubscriber(subscription, event) 方法对消息进行分发。
- final class BackgroundPoster implements Runnable {
- ....
- private volatile boolean executorRunning;
- BackgroundPoster(EventBus eventBus) {
- this.eventBus = eventBus;
- queue = new PendingPostQueue();
- }
- public void enqueue(Subscription subscription, Object event) {
- PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
- synchronized (this) {
- ...
- eventBus.getExecutorService().execute(this);
- }
- }
- }
- @Override
- public void run() {
- try {
- while (true) {
- ...
- eventBus.invokeSubscriber(pendingPost);
- }
- } finally {
- executorRunning = false;
- }
- }
- }
既然 4 种模式下均是调用了 invokeSubscriber(subscription, event) 方法,那我们最后再看一下这个方法:
看到了吧,这个方法中就是利用反射将消息发送给每一个消息的订阅者。到此我们就完整的看完了 EventBus 的工作流程及主要代码的分析过程。真心不容易呀,已经被累跪啦!
- void invokeSubscriber(Subscription subscription, Object event) {
- try {
- subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
- } catch (InvocationTargetException e) {
- handleSubscriberException(subscription, event, e.getCause());
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("Unexpected exception", e);
- }
- }
来源: http://blog.csdn.net/u012810020/article/details/70056134