一. 概述
在整个 Android 的源码世界里, 有两大利剑, 其一是 Binder IPC 机制,, 另一个便是消息机制(由 Handler/Looper/MessageQueue 等构成的).
Android 有大量的消息驱动方式来进行交互, 比如 Android 的四剑客 Activity, Service, Broadcast, ContentProvider 的启动过程的交互, 都离不开消息机制, Android 某种意义上也可以说成是一个以消息驱动的系统. 消息机制涉及 MessageQueue/Message/Looper/Handler 这 4 个类.
1.1 模型
消息机制主要包含:
Message: 消息分为硬件产生的消息 (如按钮, 触摸) 和软件生成的消息;
MessageQueue: 消息队列的主要功能向消息池投递消息(
MessageQueue.enqueueMessage
)和取走消息池的消息(MessageQueue.next);
Handler: 消息辅助类, 主要功能向消息池发送各种消息事件 (Handler.sendMessage) 和处理相应消息事件(
- Handler.handleMessage
- );
Looper: 不断循环执行(Looper.loop), 按分发机制将消息分发给目标处理者.
1.2 架构图
- 1.3 Demo
- public class MainActivity extends AppCompatActivity {
- private Button mButton;
- private final String TAG="MessageTest";
- private int ButtonCount = 0;
- private MyThread myThread;
- private Handler mHandler;
- private int mMessageCount = 0;
- class MyThread extends Thread {
- private Looper mLooper;
- @Override
- public void run() {
- super.run();
- /* Initialize the current thread as a looper */
- Looper.prepare();
- synchronized (this) {
- mLooper = Looper.myLooper();
- notifyAll();
- }
- /* Run the message queue in this thread */
- Looper.loop();
- }
- public Looper getLooper(){
- if (!isAlive()) {
- return null;
- }
- // If the thread has been started, wait until the looper has been created.
- synchronized (this) {
- while (isAlive() && mLooper == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
- }
- return mLooper;
- }
- }
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_main);
- mButton = (Button)findViewById(R.id.button);
- mButton.setOnClickListener(new View.OnClickListener() {
- public void onClick(View v) {
- // Perform action on click
- Log.d(TAG, "Send Message"+ ButtonCount);
- ButtonCount++;
- /* 按下按键后通过 mHandler 发送一个消息 */
- Message msg = new Message();
- mHandler.sendMessage(msg);
- }
- });
- myThread = new MyThread();
- myThread.start();
- /* 创建一个 handle 实例(详见 4.3.2), 这个 handle 为线程 myThread 服务, 当收到 mesg 时会调用设置的回调函数 */
- mHandler = new Handler(myThread.getLooper(), new Handler.Callback() {
- @Override
- public boolean handleMessage(Message msg) {
- Log.d(TAG, "get Message"+ mMessageCount);
- mMessageCount++;
- return false;
- }
- });
- }
- }
大概流程: 先创建的一个线程, 该线程中调用了 Looper.prepare()(详见 2.1)和 Looper.loop()(详见 2.2)方法, 接着启动了该线程, 紧接着初始化了一个 Handler 实例(详见 4.3.2). 用于服务 message, 在按下按键后通过 mHandler 发送了一个消息(详见 4.2), 此时 handleMessage 被回调(详见 4.1). 接下来进行详细分析.
该 Demo 中有个两点 getLooper 方法, 当外界调用该方法时, 他会判断当前 mLooper 是否为空, 空的话就会一直等待.
为什么要这么做?
因为在创建线程后去获取 mLooper, 此时线程的 run 方法可能还为运行, 所以此时 mLooper 值应该为 null;
当运行了 Looper.prepare()方法创建了 looper 后, 通过 Looper.myLooper()获取到 mLooper, 再 notifyAll;
二. Looper
- 2.1 Looper.prepare()
- public static void prepare() {
- prepare(true); 1
- }
- private static void prepare(boolean quitAllowed) {
- if (sThreadLocal.get() != null) { 2
- throw new RuntimeException("Only one Looper may be created per thread");
- }
- sThreadLocal.set(new Looper(quitAllowed)); 3
- }
1: 无参情况下调用 prepare(true), 形参置 true 表示允许退出.
2: sThreadLocal 会先去获取本地的数据, 如果能获取到说明已经 prepare 过, 则抛出异常.
3: 设置 sThreadLocal 数据
sThreadLocal 是 ThreadLocal 类型(static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();)
ThreadLocal: 线程本地存储区, 每个线程都有自己的私有本地存储区域, 不同的线程之间彼此不能访问对方的存储区.
接下来看下刚保存的 TLS 区域的 Looper 对象:
- private Looper(boolean quitAllowed) {
- mQueue = new MessageQueue(quitAllowed); 1
- mThread = Thread.currentThread(); 2
- }
1: 创建一个消息队列
2: 获取当前线程对象
这里为该线程创建了一个消息队列 MessageQueue 的构造函数中调用的 hal 层的本地方法:
- MessageQueue(boolean quitAllowed) {
- mQuitAllowed = quitAllowed;
- mPtr = nativeInit();
- }
这个流程的分析先到这.
- 2.2 Looper.loop()
- public static void loop() {
- final Looper me = myLooper(); 1
- if (me == null) {
- throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
- }
- final MessageQueue queue = me.mQueue;
- // Make sure the identity of this thread is that of the local process,
- // and keep track of what that identity token actually is.
- Binder.clearCallingIdentity();
- final long ident = Binder.clearCallingIdentity();
- for (;;) {
- Message msg = queue.next(); // might block 2
- if (msg == null) {
- // No message indicates that the message queue is quitting.
- return;
- }
- // This must be in a local variable, in case a UI event sets the logger
- Printer logging = me.mLogging;
- if (logging != null) {
- logging.println(">>>>> Dispatching to" + msg.target + " " +
- msg.callback + ":" + msg.what);
- }
- msg.target.dispatchMessage(msg); 3
- if (logging != null) {
- logging.println("<<<<<Finished to" + msg.target + " " + msg.callback);
- }
- // Make sure that during the course of dispatching the
- // identity of the thread wasn't corrupted.
- final long newIdent = Binder.clearCallingIdentity();
- if (ident != newIdent) {
- Log.wtf(TAG, "Thread identity changed from 0x"
- + Long.toHexString(ident) + "to 0x"
- + Long.toHexString(newIdent) + "while dispatching to"
- + msg.target.getClass().getName() + " "
- + msg.callback + "what=" + msg.what);
- }
- msg.recycleUnchecked(); 4
- }
- }
1: 获取 TLS 中存储的 Looper 对象
2: 获取消息, 没有消息的时候会阻塞(详见 3.1)
3: 分发消息(详见 4.1)
4: 回收消息到消息池(详见 5.1)
三. MesageQueue
- 3.1 next()
- Message next() {
- // Return here if the message loop has already quit and been disposed.
- // This can happen if the application tries to restart a looper after quit
- // which is not supported.
- final long ptr = mPtr;
- if (ptr == 0) {
- return null;
- }
- int pendingIdleHandlerCount = -1; // -1 only during first iteration
- int nextPollTimeoutMillis = 0;
- for (;;) {
- if (nextPollTimeoutMillis != 0) {
- Binder.flushPendingCommands();
- }
- nativePollOnce(ptr, nextPollTimeoutMillis); 1
- synchronized (this) {
- // Try to retrieve the next message. Return if found.
- final long now = SystemClock.uptimeMillis();
- Message prevMsg = null;
- Message msg = mMessages;
- if (msg != null && msg.target == null) {
- // Stalled by a barrier. Find the next asynchronous message in the queue.
- do {
- prevMsg = msg;
- msg = msg.next;
- } while (msg != null && !msg.isAsynchronous()); 2
- }
- if (msg != null) {
- if (now < msg.when) { 3
- // Next message is not ready. Set a timeout to wake up when it is ready.
- nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
- } else {
- // Got a message.
- mBlocked = false;
- if (prevMsg != null) {
- prevMsg.next = msg.next;
- } else {
- mMessages = msg.next;
- }
- msg.next = null;
- if (false) Log.v("MessageQueue", "Returning message:" + msg);
- return msg;
- }
- } else {
- // No more messages.
- nextPollTimeoutMillis = -1; 4
- }
- // Process the quit message now that all pending messages have been handled.
- if (mQuitting) { 5
- dispose();
- return null;
- }
- // If first time idle, then get the number of idlers to run.
- // Idle handles only run if the queue is empty or if the first message
- // in the queue (possibly a barrier) is due to be handled in the future.
- if (pendingIdleHandlerCount < 0
- && (mMessages == null || now < mMessages.when)) { 6
- pendingIdleHandlerCount = mIdleHandlers.size();
- }
- if (pendingIdleHandlerCount <= 0) {
- // No idle handlers to run. Loop and wait some more.
- mBlocked = true;
- continue;
- }
- if (mPendingIdleHandlers == null) { 7
- mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
- }
- mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
- }
- // Run the idle handlers.
- // We only ever reach this code block during the first iteration.
- for (int i = 0; i < pendingIdleHandlerCount; i++) {
- final IdleHandler idler = mPendingIdleHandlers[i];
- mPendingIdleHandlers[i] = null; // release the reference to the handler
- boolean keep = false;
- try {
- keep = idler.queueIdle(); 8
- } catch (Throwable t) {
- Log.wtf("MessageQueue", "IdleHandler threw exception", t);
- }
- if (!keep) {
- synchronized (this) {
- mIdleHandlers.remove(idler);
- }
- }
- }
- // Reset the idle handler count to 0 so we do not run them again.
- pendingIdleHandlerCount = 0; 9
- // While calling an idle handler, a new message could have been delivered
- // so go back and look again for a pending message without waiting.
- nextPollTimeoutMillis = 0;
- }
- }
1: 调用本地 epoll 方法, 当没有消息时会阻塞在这, 阻塞时间为 nextPollTimeoutMillis(详见 6.1.1)
2: 查找消息队列中的异步消息(详见 4.2)
3: 如果当前时间小于异步消息的触发时间, 则设置下一轮 poll 的超时时间(相当于休眠时间), 否则返回将要执行的异步消息.
4: 没有异步消息, 下轮 poll 则无限等待, 直到新的消息来临
5: 检测下退出标志
6: 如果消息队列未空或是第一个 msg(消息刚放进队列且未达到触发时间), 则执行空闲的 handler
7: IdleHandler 一个临时存放数组对象(下面可以看到一个列表转数组的方法被调用)
8: 运行空闲的 handler(只有第一次循环时会运行 idle handle)
9: 重置 idle handler 计数, 防止下次运行
往往在第一次进入 next 函数循环时, 在 nativePollOnce 阻塞之后, 都会执行 idle handle 函数.
获取到异步消息, 立马把该消息返回给上一层, 否则继续循环等待新的消息产生.
- 3.2 enqueueMessage()
- boolean enqueueMessage(Message msg, long when) {
- if (msg.target == null) { 1
- throw new IllegalArgumentException("Message must have a target.");
- }
- if (msg.isInUse()) { 2
- throw new IllegalStateException(msg + "This message is already in use.");
- }
- synchronized (this) {
- if (mQuitting) {
- IllegalStateException e = new IllegalStateException(
- msg.target + "sending message to a Handler on a dead thread");
- Log.w("MessageQueue", e.getMessage(), e);
- msg.recycle();
- return false;
- }
- msg.markInUse();
- msg.when = when;
- Message p = mMessages;
- boolean needWake;
- if (p == null || when == 0 || when < p.when) { 3
- msg.next = p;
- mMessages = msg;
- needWake = mBlocked;
- } else {
- // Inserted within the middle of the queue. Usually we don't have to wake
- // up the event queue unless there is a barrier at the head of the queue
- // and the message is the earliest asynchronous message in the queue.
- needWake = mBlocked && p.target == null && msg.isAsynchronous();
- Message prev;
- for (;;) {
- prev = p;
- p = p.next;
- if (p == null || when < p.when) { 4
- break;
- }
- if (needWake && p.isAsynchronous()) {
- needWake = false;
- }
- }
- msg.next = p; // invariant: p == prev.next
- prev.next = msg;
- }
- // We can assume mPtr != 0 because mQuitting is false.
- if (needWake) {
- nativeWake(mPtr); 5
- }
- }
- return true;
- }
1: 判断该消息是否有 handler, 每个 msg 必须有个对应的 handler;
2: 判断该消息是否已经使用;
3: 判断是否有已经准备好的消息 (表头消息) 或当前发送消息的延时时间为 0 或 next ready msg 延时时间大于当前消息延时时间则将当前消息变为新的表头.;
根据判断当前阻塞标志, 来觉得是否需要唤醒;
4: 根据时间将消息插入到消息对列中;
5: 上文分析在 next()方法中会被阻塞, 在这里就可以唤醒阻塞(详见 6.1.2);
四. Handler
4.1 消息分发
- public void dispatchMessage(Message msg) {
- if (msg.callback != null) {
- handleCallback(msg); 1
- } else {
- if (mCallback != null) { 2
- if (mCallback.handleMessage(msg)) {
- return;
- }
- }
- handleMessage(msg); 3
- }
- }
1: 如果该 msg 设置了回调函数, 则直接调用回调方法
- message.callback.run()
- ;
2: 当 handler 设置了回调函数, 则回调方法
- mCallback.handleMessage(msg)
- ;
3: 调用 handler 自身的方法 handleMessage, 该方法默认为空, 一般通过子类覆盖来完成具体的逻辑;
我们 Demo 程序中, 是使用第二种方法, 设置回调来实现具体的逻辑, 分发消息的本意是响应消息的对应的执行方法.
4.2 消息发送
可以看到调用 sendMessage 方法后, 最终调用的是 enqueueMessage 方法.
- public final boolean sendMessage(Message msg)
- {
- return sendMessageDelayed(msg, 0);
- }
- public final boolean sendMessageDelayed(Message msg, long delayMillis)
- {
- if (delayMillis < 0) {
- delayMillis = 0;
- }
- return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
- }
可以看到发送消息时都有一个时间参数选择, 该参数就是我们前面分析的延时触发时间(相对时间).
- public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
- MessageQueue queue = mQueue; 1
- if (queue == null) {
- RuntimeException e = new RuntimeException(
- this + "sendMessageAtTime() called with no mQueue");
- Log.w("Looper", e.getMessage(), e);
- return false;
- }
- return enqueueMessage(queue, msg, uptimeMillis);
- }
- private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
- msg.target = this; 2
- if (mAsynchronous) {
- msg.setAsynchronous(true);
- }
- return queue.enqueueMessage(msg, uptimeMillis);
- }
1: 判断 handler 创建时, 传进来的消息对列是否为空(详见 4.3)
2: 消息的 target 为该对象本身, handler 类型
这里有对发生的消息进行异步标志设置, 通过判断 mAsynchronous 标志, 该标志是在创建 handler 时初始化的(详见 4.3);
Handler.enqueueMessage 方法调用的是 MessageQueue.enqueueMessage 方法(详见 3.2);
4.3 创建 Handler
4.3.1 无参构造
- public Handler() {
- this(null, false);
- }
- public Handler(Callback callback, boolean async) {
- if (FIND_POTENTIAL_LEAKS) {
- final Class<? extends Handler> klass = getClass();
- if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
- (klass.getModifiers() & Modifier.STATIC) == 0) {
- Log.w(TAG, "The following Handler class should be static or leaks might occur:" +
- klass.getCanonicalName());
- }
- }
- mLooper = Looper.myLooper();
- if (mLooper == null) {
- throw new RuntimeException(
- "Can't create handler inside thread that has not called Looper.prepare()");
- }
- mQueue = mLooper.mQueue;
- mCallback = callback;
- mAsynchronous = async;
- }
无参构造方式比起我们 Demo 中的方式, 它自己回调用 Looper.myLooper()静态方法获取 looper;
4.3.2 有参构造
- public Handler(Looper looper, Callback callback) {
- this(looper, callback, false); 1
- }
- public Handler(Looper looper, Callback callback, boolean async) {
- mLooper = looper;
- mQueue = looper.mQueue;
- mCallback = callback;
- mAsynchronous = async;
- }
1: 调用有参构造函数创建 handler 且异步标志置 false 说明该 handler 发送的消息都为同步消息.
Demo 中的 handler 就是使用该方式创建, 自己传入 looper 参数.
五. Message
- 5.1 recycle()
- public void recycle() {
- if (isInUse()) {
- if (gCheckRecycle) {
- throw new IllegalStateException("This message cannot be recycled because it"
- + "is still in use.");
- }
- return;
- }
- recycleUnchecked();
- }
- void recycleUnchecked() {
- // Mark the message as in use while it remains in the recycled object pool.
- // Clear out all other details.
- flags = FLAG_IN_USE; 1
- what = 0;
- arg1 = 0;
- arg2 = 0;
- obj = null;
- replyTo = null;
- sendingUid = -1;
- when = 0;
- target = null;
- callback = null;
- data = null;
- synchronized (sPoolSync) {
- if (sPoolSize <MAX_POOL_SIZE) { 2
- next = sPool;
- sPool = this;
- sPoolSize++;
- }
- }
- }
1: 将该消息标志置为使用中并清除其他参数为 default
2: 将该消息加入消息池, 当消息池未满时
将消息回收到消息池都是将消息加入到消息池的链表表头.
- 5.2 obtain()
- public static Message obtain() {
- synchronized (sPoolSync) {
- if (sPool != null) {
- Message m = sPool; 1
- sPool = m.next;
- m.next = null;
- m.flags = 0; // clear in-use flag
- sPoolSize--;
- return m;
- }
- }
- return new Message(); 2
- }
1: 从消息池表头拿出一个消息
2: 如果消息池为空则创建一个消息
可以看出每次从消息池取出消息都是从链表的表头取出, 再对消息的计数做减法.
六. HAL 层
native 层本身也有一套完整的消息机制, 用于处理 native 的消息;
在整个消息机制中, MessageQueue 是连接 java 层和 native 层的纽带;
6.1 MessageQueue
文件
- android_os_MessageQueue.c
- static JNINativeMethod gMessageQueueMethods[] = {
- /* name, signature, funcPtr */
- { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
- { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
- { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
- { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
- { "nativeIsIdling", "(J)Z", (void*)android_os_MessageQueue_nativeIsIdling }
- };
以上可以看出上层调用 nativePollOnce 方法实质是调用 HAL 层的 android_os_MessageQueue_nativePollOnce 方法
- 6.1.1 nativePollOnce
- static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jclass clazz,
- jlong ptr, jint timeoutMillis) {
- NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
- nativeMessageQueue->pollOnce(env, timeoutMillis);
- }
- void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) {
- mInCallback = true;
- mLooper->pollOnce(timeoutMillis);
- mInCallback = false;
- if (mExceptionObj) {
- env->Throw(mExceptionObj);
- env->DeleteLocalRef(mExceptionObj);
- mExceptionObj = NULL;
- }
- }
通过源码可以看出消息队列中的 pollOnce 实质是调用的 looper 中的 pollOnce 方法(详见 6.2.1)
- 6.1.2 nativeWake
- static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
- NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
- return nativeMessageQueue->wake();
- }
- void NativeMessageQueue::wake() {
- mLooper->wake();
- }
通过源码可以看出消息队列中的 wake 实质是调用的 looper 中的 wake 方法(详见 6.2.4)
- 6.1.3 nativeInit
- static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
- NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
- if (!nativeMessageQueue) {
- jniThrowRuntimeException(env, "Unable to allocate native queue");
- return 0;
- }
- nativeMessageQueue->incStrong(env);
- return reinterpret_cast<jlong>(nativeMessageQueue);
- }
- NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
- mLooper = Looper::getForThread();
- if (mLooper == NULL) {
- mLooper = new Looper(false);
- Looper::setForThread(mLooper);
- }
- }
可以看到 hal 层和 java 层中创建 looper 的时序几乎是一样的, 先创建一个消息对列, 再创建一个 looper(Looper 的构造详见 6.2.3);
- 6.1.4 nativeIsIdling
- static jboolean android_os_MessageQueue_nativeIsIdling(JNIEnv* env, jclass clazz, jlong ptr) {
- NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
- return nativeMessageQueue->getLooper()->isIdling();
- }
- bool Looper::isIdling() const {
- return mIdling;
- }
还是调用 looper 中的方法, 来看看这个标志具体表示什么状态:
- // We are about to idle.
- mIdling = true;
- struct epoll_event eventItems[EPOLL_MAX_EVENTS];
- int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
- // No longer idling.
- mIdling = false;
以上代码片为 Looper::pollInner 中的一段, 在 wait 时是空闲, 当有数据来临时是非空闲的;
以前也用过这样的方法来判断线程是否在使用, 想不到在这里也看到了这种方法;
- 6.1.5 nativeDestroy
- static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
- NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
- nativeMessageQueue->decStrong(env);
- }
- 6.2 Looper
- Looper.cpp: system/core/lib/libutils
- 6.2.1 Looper::pollOnce
- int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
- int result = 0;
- for (;;) {
- while (mResponseIndex <mResponses.size()) {
- const Response& response = mResponses.itemAt(mResponseIndex++);
- int ident = response.request.ident;
- if (ident>= 0) {
- int fd = response.request.fd;
- int events = response.events;
- void* data = response.request.data;
- #if DEBUG_POLL_AND_WAKE
- ALOGD("%p ~ pollOnce - returning signalled identifier %d:"
- "fd=%d, events=0x%x, data=%p",
- this, ident, fd, events, data);
- #endif
- if (outFd != NULL) *outFd = fd;
- if (outEvents != NULL) *outEvents = events;
- if (outData != NULL) *outData = data;
- return ident;
- }
- }
- if (result != 0) {
- #if DEBUG_POLL_AND_WAKE
- ALOGD("%p ~ pollOnce - returning result %d", this, result);
- #endif
- if (outFd != NULL) *outFd = 0;
- if (outEvents != NULL) *outEvents = 0;
- if (outData != NULL) *outData = NULL;
- return result;
- }
- result = pollInner(timeoutMillis);
- }
- }
Looper::pollOnce 是通过调用 Looper::pollInner 方法实现;
- 6.2.2 Looper::pollInner
- int Looper::pollInner(int timeoutMillis) {
- #if DEBUG_POLL_AND_WAKE
- ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
- #endif
- // Adjust the timeout based on when the next message is due.
- if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
- nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
- int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
- if (messageTimeoutMillis>= 0
- && (timeoutMillis <0 || messageTimeoutMillis < timeoutMillis)) {
- timeoutMillis = messageTimeoutMillis;
- }
- #if DEBUG_POLL_AND_WAKE
- ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
- this, mNextMessageUptime - now, timeoutMillis);
- #endif
- }
- // Poll.
- int result = POLL_WAKE;
- mResponses.clear();
- mResponseIndex = 0;
- // We are about to idle.
- mIdling = true;
- struct epoll_event eventItems[EPOLL_MAX_EVENTS];
- int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); 1
- // No longer idling.
- mIdling = false;
- // Acquire lock.
- mLock.lock();
- // Check for poll error.
- if (eventCount < 0) { 2
- if (errno == EINTR) {
- goto Done;
- }
- ALOGW("Poll failed with an unexpected error, errno=%d", errno);
- result = POLL_ERROR;
- goto Done;
- }
- // Check for poll timeout.
- if (eventCount == 0) { 3
- #if DEBUG_POLL_AND_WAKE
- ALOGD("%p ~ pollOnce - timeout", this);
- #endif
- result = POLL_TIMEOUT;
- goto Done;
- }
- // Handle all events.
- #if DEBUG_POLL_AND_WAKE
- ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
- #endif
- /* 处理 epoll 后的所有事件 */
- for (int i = 0; i < eventCount; i++) {
- int fd = eventItems[i].data.fd;
- uint32_t epollEvents = eventItems[i].events;
- if (fd == mWakeReadPipeFd) { 4
- if (epollEvents & EPOLLIN) {
- awoken();
- } else {
- ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
- }
- } else {
- ssize_t requestIndex = mRequests.indexOfKey(fd);
- if (requestIndex>= 0) {
- int events = 0;
- if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
- if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
- if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
- if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
- pushResponse(events, mRequests.valueAt(requestIndex));
- } else {
- ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is"
- "no longer registered.", epollEvents, fd);
- }
- }
- }
- Done: ;
- // Invoke pending message callbacks.
- mNextMessageUptime = LLONG_MAX;
- while (mMessageEnvelopes.size() != 0) {
- nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
- const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
- if (messageEnvelope.uptime <= now) {
- // Remove the envelope from the list.
- // We keep a strong reference to the handler until the call to handleMessage
- // finishes. Then we drop it so that the handler can be deleted *before*
- // we reacquire our lock.
- { // obtain handler
- sp<MessageHandler> handler = messageEnvelope.handler;
- Message message = messageEnvelope.message;
- mMessageEnvelopes.removeAt(0);
- mSendingMessage = true;
- mLock.unlock();
- #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
- ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
- this, handler.get(), message.what);
- #endif
- handler->handleMessage(message);
- } // release handler
- mLock.lock();
- mSendingMessage = false;
- result = POLL_CALLBACK;
- } else {
- // The last message left at the head of the queue determines the next wakeup time.
- mNextMessageUptime = messageEnvelope.uptime;
- break;
- }
- }
- // Release lock.
- mLock.unlock();
- // Invoke all response callbacks.
- for (size_t i = 0; i <mResponses.size(); i++) {
- Response& response = mResponses.editItemAt(i);
- if (response.request.ident == POLL_CALLBACK) {
- int fd = response.request.fd;
- int events = response.events;
- void* data = response.request.data;
- #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
- ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
- this, response.request.callback.get(), fd, events, data);
- #endif
- int callbackResult = response.request.callback->handleEvent(fd, events, data);
- if (callbackResult == 0) {
- removeFd(fd);
- }
- // Clear the callback reference in the response structure promptly because we
- // will not clear the response vector itself until the next poll.
- response.request.callback.clear();
- result = POLL_CALLBACK;
- }
- }
- return result;
- }
1: 等待 mEpollFd 有事件产生, 等待时间为 timeoutMilli;
当上层发消息时且判断需要唤醒, 则会往管道的读端写入数据用于唤醒(详见 6.2.3);
2: 检测 poll 是否出错;
3: 检测 poll 是否超时;
4: 如果是因为往管道读端写入数据被唤醒, 则都去并清空管道中的数据;
- 6.2.3 Looper::Looper()
- Looper::Looper(bool allowNonCallbacks) :
- mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
- mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
- int wakeFds[2];
- int result = pipe(wakeFds); 1
- LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
- mWakeReadPipeFd = wakeFds[0];
- mWakeWritePipeFd = wakeFds[1];
- result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK); 2
- LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
- errno);
- result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
- LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
- errno);
- mIdling = false;
- // Allocate the epoll instance and register the wake pipe.
- mEpollFd = epoll_create(EPOLL_SIZE_HINT);
- LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
- struct epoll_event eventItem;
- memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
- eventItem.events = EPOLLIN; 3
- eventItem.data.fd = mWakeReadPipeFd;
- result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); 4
- LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
- errno);
- }
1: 创建一个无名管道, wakeFds[0]: 读文件描述符, wakeFds[1]: 写文件描述符;
2: 更改为无阻塞方式;
3: EPOLLIN: 连接到达, 有数据来临;
4: 监测管道读端是否有数据来临;
- 6.2.4 Looper::wake()
- void Looper::wake() {
- #if DEBUG_POLL_AND_WAKE
- ALOGD("%p ~ wake", this);
- #endif
- ssize_t nWrite;
- do {
- nWrite = write(mWakeWritePipeFd, "W", 1);
- } while (nWrite == -1 && errno == EINTR);
- if (nWrite != 1) {
- if (errno != EAGAIN) {
- ALOGW("Could not write wake signal, errno=%d", errno);
- }
- }
- }
唤醒只是向管道的写端写入一个字节数据, epoll_wait 则会得到返回;
总结
在这里做个总结针对 java 层(因为 native 层的消息机制未进行详细分析不过估计和 java 层的流程差不多);
当调用 j 静态方法 Looper.prepare()初始化后, 再调用 Looper.loop()方法进行消息循环处理;
Looper.loop()方法中调用 MesageQueue.next()方法检索新消息, 没有则阻塞, 有则将消息插入消息链表头后立即返回;
阻塞方式是调用本地的 nativePollOnce()方法实现, 其原理是利用 epoll 管道文件描述符实现;
Looper.loop()调用 dispatchMessage 方法实现消息的分发处理;
发送一个消息的实质是调用个 MessageQueue.enqueueMessage()方法往消息链表中插入一个消息, 插入位置的条件为延时时间;
然后再调用一个本地方法 nativeWake 对前面阻塞的进行唤醒, 实质是往管道中写入一个字节数据;
参考
来源: https://segmentfault.com/a/1190000017054343