android消息机制解析

前言

  • 我们知道,android应用程序是支持多线程的。android应用程序线程有主线程和子线程之分,其中,主线程是由Activity管理服务ActivityManagerService请求Zygote进程创建的,而子线程是由主线程或者其他子线程创建的。android应用程序在启动完成之后,会主动进入一个消息循环中,即android应用程序主线程是具有消息循环的。
  • android应用程序的主线程又称为UI线程,它是负责响应界面事件的。因此,我们要避免在它里面执行耗时操作,因为执行耗时的操作会导致主线程长时间不能响应界面事件,从而影响用户体验。我们一般把一个耗时的操作看作是一个后台任务,并且放在一个子线程中执行。在这些后台任务中,有的是一次性的,有的是需要不定期执行的。
  • 对于不定期执行的后台任务来说,它们有两种主要方式。第一种方式就是每当条件满足的时候,就创建一个子线程来执行一个不定期的后台任务;而当这个不定期的任务执行完成之后,新创建的子线程就随之退出。第二种方式就是创建一个具有消息循环的子线程,每当条件满足的时候,就将一个不定期的后台任务封装成一个消息发送到这个子线程的消息队列中去执行,每当条件不满足时,这个子线程就会因为它的消息队列为空而进入睡眠等待状态。虽然第一种方式创建的子线程不需要具有消息循环,但是不断地创建和销毁子线程是有代价的,因此,我们更倾向于采用第二种方式来执行那些不定期的后台任务。从这个角度来看,我们就希望android应用程序子线程像主线程一样具有消息循环。
  • 无论是对于一次性的后台任务来说,还是对于定期性的后台任务来说,它们在执行的过程中可能都会涉及到界面操作。例如,我们通常把一个下载任务放在一个子线程中来执行,而在下载过程中,一般需要将下载进度显示在应用程序的界面上。在这种情况下,用来执行它们的子线程一般就会将一个与界面相关的操作封装成一个消息,并且发送到主线程的消息队列中,以便在主线程中处理这个消息时,可以执行一个相关的页面操作。为了简化子线程执行与界面相关的操作,我们通常将它们与主线程的消息循环关联起来。从这个角度来讲,我们也可以说android应用程序子线程需要有消息循环。
  • 综上所述,我们就可以得到android应用程序线程的三个消息循环模型。第一种是应用程序主线程消息循环模型。第二种是与界面无关的应用程序子线程消息循环的。第三种是与界面相关的应用程序子线程消息循环模型。我们可以使用ActivityThread、HandlerThread和AsnycTask这三个类来分别实现上述三种消息循环模型。

android消息机制原理

在android框架或者应用程序开发中,随处可见Handler类和Looper类的使用。android框架Java层核心服务运行在SystemServer进程,SystemServer进程由Zygote进程启动,当进入SystemServer进程后就会进入它的main()方法,在main()方法中会调用run()方法。

1
2
3
4
5
6
7
private void run() {
...
Looper.prepareMainLooper(); // 创建消息队列
...
Looper.loop(); // 创建消息循环
...
}

在这个方法中,首先通过调用Looper的prepareMainLooper()方法创建一个消息队列,然后调用loop()方法让进程的主线程进入消息循环。而对于应用程序而言,它的主类,也就是入口,是框架层的ActivityThread.java类。当应用程序被启动之后,就会进入ActivityThread.java的main()方法。

1
2
3
4
5
6
7
8
9
10
11
public final class ActivityThread {
...
public static void main(String[] args) {
...
Looper.prepareMainLooper();
...
Looper.loop();
...
}
...
}

在ActivityThread的main()方法和SystemServer的run()方法里调用了相同的代码,即它们分别调用了Looper类的prepareMainLooper()方法和loop()方法来创建一个消息队列和消息循环。

因此,可以总结出,系统SystemServer进程的主线程通过循环不断地从这个消息队列中获取消息,然后对获取的消息进行处理,这样就实现了通过消息来驱动应用程序和SystemServer进程的执行,这就是android的消息处理机制。由此可见,android的消息机制由4个重要的类组成,分别是消息Message、消息队列MessageQueue、消息循环Looper、消息的发送者和处理者Handler。

android消息机制

在消息处理机制中,消息都存放在一个消息队列中,应用程序和SystemServer进程的主线程就是围绕这个消息队列进入一个无限循环中,直到应用程序和SystemServer进程退出。当应用程序和SystemServer的Handler向消息队列发送一个消息之后,消息队列就有消息了,这时候应用程序和SystemServer进程的主线程就会把它取出来,并分发给相应的Handler进行处理。如果队列中没有消息,应用程序的主线程就会进入空闲等待状态,等待下一个消息的到来。因此,android系统消息机制的核心是以下三个方面:

  • 创建消息队列,之后进入无限循环读取消息
  • 发送消息
  • 处理消息,当发送消息之后,无限循环读到这个消息,之后就分发给相应的处理者来处理

创建消息队列和进入消息循环过程

创建消息队列和消息循环以ActivityThread中创建消息队列、消息循环为例,因为这是为每个应用程序主线程默认的消息循环和消息队列。

下面开始分析这个流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class ActivityThread {
...
public static void main(String[] args) {
...
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false);
...
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
...
}

ActivityThread的main()方法首先创建一个ActivityThread实例thread,然后通过Looper.prepareMainLooper()创建一个消息队列,最后通过Looper.loop()进入消息循环。接下来看Looper的prepareMainLooper()方法和loop()方法创建消息队列和消息循环的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public final class Looper {
private static final String TAG = "Looper";
// 表示当前线程,即应用程序的主线程
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private static Looper sMainLooper;
final MessageQueue mQueue; // 消息队列,所有消息都放在这个队列中
final Thread mThread;
private Printer mLogging;
private long mTraceTag;
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
public static Looper getMainLooper() {
synchronized (Looper.class) {
return sMainLooper;
}
}
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
Message msg = queue.next();
if (msg == null) {
return;
}
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
final long traceTag = me.mTraceTag;
if (traceTag != 0) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
try {
msg.target.dispatchMessage(msg);
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
msg.recycleUnchecked();
}
}
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
public static @NonNull MessageQueue myQueue() {
return myLooper().mQueue;
}
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
...
}

prepareMainLooper()方法通过调用prepare()方法在线程中创建一个Looper对象,并且把创建的这个Looper对象保存在sThreadLocal变量中。在调用prepare()方法创建Looper对象的时候,传入的参数是false,这个false最后会传入到MessageQueue中去。当创建Looper对象的时候会调用它的构造方法来创建一个消息队列MessageQueue,并且赋值给Looper类的变量mQueue。而对于创建消息队列MessageQueue,就会进入到它的构造方法。

1
2
3
4
5
6
7
8
public final class MessageQueue {
...
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
...
}

参数quitAllowed通过looper的构造函数来传递,值为false,把这个值赋给mQuitAllowed变量,如果它的值为true,就表示消息队列退出。接着MessageQueue的构造方法调用了nativeInit()方法,这是一个native方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static const JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
{ "nativeSetFileDescriptorEvents", "(JII)V",
(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}

变量gMessageQueueMethods是jni里一个方法注册表变量,包含了Java方法对应到CPP文件里的方法映射表,映射表里的内容是一一对应的关系,在Java调用native方法的时候,通过它在CPP文件里找到对应的方法。在这里,nativeInit()对应android_os_MessageQueue_nativeInit()方法,而在这个方法中通过new关键字又新创建了一个NativeMessageQueue消息队列,于是进入NativeMessageQueue的构造函数。

1
2
3
4
5
6
7
8
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}

在NativeMessageQueue()函数中创建了一个native的looper对象,它与Java层中的Looper不一样,但是是一一对应的。通过看native的looper源代码,发现android消息机制的核心就在native的Looper中。下面是native的Looper对象创建过程。

1
2
3
4
5
6
7
8
9
10
11
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();
}

在创建native的Looper对象的过程中,会进入native的Looper的构造函数,native的Looper的构造函数首先调用了Linux的eventfd()函数来创建一个文件描述符mWakeEventFd。这个文件描述符被用户空间当作一个事件等待/响应机制,靠内核去响应用户事件。而参数EFD_NONBLOCK类似O_NONBLOCK标志,用来设置文件描述符,主要是为了节省对fcntl的额外调用。最后,Looper的构造函数调用rebuildEpollLocked()函数来进一步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void Looper::rebuildEpollLocked() {
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event));
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}

这个函数就是android消息机制的核心,用了Linux中的epoll机制。

  • epoll机制:它是一种I/O多路复用技术,是select、poll的加强版,使用epoll会涉及epoll_create()、epoll_ctl()、epoll_wait()这三个重要的函数
  • epoll_create():创建epoll文件描述符
  • epoll_ctl():设置监控文件描述符
  • epoll_wait():等待监控的文件描述符发生IO事件

由于select/poll能监控的文件描述符的数量限制,一般情况是1024,并且随着文件描述符的增加,效率线性下降。而在android系统中,Looper要监控触摸事件、按键事件和键盘事件等文件描述符,由此可见,他监控的文件描述符非常多。epoll是为了大批量文件描述符而作改进的poll,使用它可以很好地避免效率低下的问题。

继续看Looper类的rebuildEpollLocked()函数,首先通过epoll_create()函数来创建一个epoll专用的文件描述符,EPOLL_SIZE_HINT = 8,表示在mEpollFd这个文件描述符上能监控的最大文件描述符数是8。接着调用epoll_ctl()函数来告诉epoll要监控mEpollFd文件描述符的是什么事件,在这里就是监控mWakeEventFd文件描述符的EPOLLIN事件,当mWakeEventFd有事件触发的时候,就唤醒当前正在等待的线程。而一般情况下有两个线程,一个线程是否有事件发生,如果没有,这个线程就进入等待状态。另外一个线程写一个事件,来唤醒等待的线程。

mWakeEventFd就是在上一步Looper构造函数中通过eventfd()创建的。Eventfd早期用来完成两个线程之间事件触发,现在已经支持两个线程之间的事件触发。它类似管道pipe,其实在android早期的版本中,就是使用管道pipe而不是Eventfd。

经过上述过程,消息队列的创建就完成了,它的核心是在native里使用了Linux的eventfd和epoll机制。

回到前面Looper.java类的loop()方法,它使应用程序进入了消息循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static void loop() {
// 首先获取前面创建的looper对象
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 通过looper对象来获得MessageQueue对象
final MessageQueue queue = me.mQueue;
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
// 在for死循环里通过MessageQueue的next()方法不断从消息队列中获取下一个要处理的消息msg
Message msg = queue.next();
// 如果消息msg为空就直接return回去
if (msg == null) {
return;
}
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
final long traceTag = me.mTraceTag;
if (traceTag != 0) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
try {
// 如果消息msg不为空,就调用msg的target对象的dispatchMessage()方法来处理该消息,target是一个Handler对象,它的值就是应用程序中用来send message的Handler对象,Handler在消息发送的时候,框架会把消息的Handler赋值给target
msg.target.dispatchMessage(msg);
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
msg.recycleUnchecked();
}
}

接下来是MessageQueue的next()方法从消息队列中获取下一个要处理消息的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
Message next() {
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1;
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// 去检查下一个消息,如果没有发现就返回
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
// now为现在的事件,when表示msg的执行时间
if (now < msg.when) {
// 如果现在事件小于消息要执行的时间,就等到消息要执行的时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 如果现在时间大于消息要执行的时间,就通过return msg,直接把msg返回给Looper的 loop()方法
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
nextPollTimeoutMillis = -1;
}
if (mQuitting) {
dispose();
return null;
}
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null;
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
pendingIdleHandlerCount = 0;
nextPollTimeoutMillis = 0;
}
}

next()方法也是通过for循环不断获取msg,循环中nativePollOnce()是一个native方法,它查看当前消息队列中是否有消息,如果有消息就获得,如果没有消息就等待。

接下来看看nativePollOnce()方法的实现。

1
2
3
4
5
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

android_os_MessageQueue_nativePollOnce()函数首先得到NativeMessageQueue指针对象,然后调用它的pollOnce()函数来进一步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}

NativeMessageQueue的pollOnce()函数中,变量mLooper是C++层的Looper的对象,它在创建消息队列时被创建,这里调用它的pollOnce()函数来进一步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);
}
}

Looper的pollOnce()函数通过调用pollInner()函数来进一步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
int Looper::pollInner(int timeoutMillis) {
...
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
mPolling = true;
// 调用epoll机制的epoll_wait()函数来看mEpollFd所监控的文件描述符是否有事件发生。这里的timeoutMillis表示设置监控的超时时间,当mEpollFd所监控的文件描述符发生了要监控的事件或监控时间超出了超时时间timeoutMillis,线程就从epoll_wait()函数返回,否则,线程就会在epoll_wait()函数中一直进入睡眠状态
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
mPolling = false;
mLock.lock();
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// 如果返回值eventCount小于0,就说明发生了错误
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
// 如果返回值eventCount等于0,就说明超时了
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
// 如果返回值eventCount大于0,说明监控有事件发生了。
// mEpollFd所监控的文件描述符是mWakeEventFd,而mWakeEventFd是通过eventfd创建的。如果mWakeEventFd发生了EPOLLIN事件,就说明应用程序的消息队列里有新的消息需要处理,接下来就会返回。awoken()函数用来读取事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
...
return result;
}

下面是awoken()函数。

1
2
3
4
5
6
7
8
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
uint64_t counter;
TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}

awoken()函数通过read()函数独处这个事件,这个事件在应用程序发送消息时写进。

消息的发送过程

当消息循环和消息队列创建好了之后,就可以往消息队列发送消息。在了解这个过程之前,先学习整个Handler的构成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public class Handler {
...
// 子类来实现这个方法接收msg
public void handleMessage(Message msg) {
}
// 此为looper循环的msg.target.dispatchMessage()调用过来
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
public Handler() {
this(null, false);
}
public Handler(Callback callback) {
this(callback, false);
}
public Handler(Looper looper) {
this(looper, null, false);
}
public Handler(Looper looper, Callback callback) {
this(looper, callback, false);
}
public Handler(boolean async) {
this(null, async);
}
public Handler(Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public Handler(Looper looper, Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public final boolean sendMessage(Message msg)
{
return sendMessageDelayed(msg, 0);
}
public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
// 这个方法首先设置了这个消息的目标处理对象target,即设置这个消息最终由谁来处理,这里将它赋值为this,表示这个消息最终由目前发送消息的Handler对象来处理,接着调用queue的enqueueMessage()方法来把这个消息加入到应用程序队列中去,queue是一个MessageQueue对象
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
...
final Looper mLooper;
final MessageQueue mQueue;
final Callback mCallback;
final boolean mAsynchronous;
IMessenger mMessenger;
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 表示目前消息队列为空,这时应用程序的主线程处于空闲等待状态,也就是说它在等待消息的发送,这时候如果有消息就把消息放在消息队列的前面,接着唤醒应用程序的主线程
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 表示消息的队列不为空,也就是说这时候它还有消息没有处理完,在忙着处理其它消息,这时不要唤醒应用程序的主线程
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p;
prev.next = msg;
}
// 当把消息加入到消息队列后,如果应用程序的主线程正处于空闲等待状态,就调用nativeWake()方法来唤醒主线程,它是一个jni调用
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
1
2
3
4
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}

android_os_MessageQueue_nativeWake()函数首先得到NativeMessageQueue指针对象,然后调用它的wake()函数进一步操作。

1
2
3
void NativeMessageQueue::wake() {
mLooper->wake();
}

NativeMessageQueue类的wake()函数,又通过C++层mLooper对象的wake()函数进一步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal: %s", strerror(errno));
}
}
}

Looper.cpp里的wake()函数通过调用TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)))实现。这句代码调用了一个write()函数,于是mWakeEventFd就发生了一个event事件,这时前面监听的epoll机制就起作用了,会唤醒应用程序的主线程,于是应用程序的主线程就会从前面C++层的Looper类的pollInner()函数一层层返回,最后返回到Java层的MessageQueue类的next()方法中。

消息的处理过程

当发送消息之后,C++层的Looper唤醒了应用程序的主线程,之后会一层层地返回,最后返回到MessageQueue的next()方法。next()方法是在Looper.java的loop()方法循环中被调用的,于是next()方法就会在loop()方法中返回。接下来就看看loop()方法怎样来进行消息的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void loop() {
...
for (;;) {
Message msg = queue.next();
if (msg == null) {
return;
}
...
try {
msg.target.dispatchMessage(msg);
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
...
}
}

loop()方法通过queue的next()方法获得了消息msg,如果它不为空,就会调用target对象的dispatchMessage()方法来处理这个消息。接下来看看dispatchMessage()方法的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}

如果没有对mCallback进行设置,就调用handleMessage()方法来进一步处理,而handleMessage()方法又被子类实现,也就是mHandler实现了,于是进入它的handleMessage()方法。

1
2
3
4
5
6
Handler mHandler = new Handler {
@Override
public void handleMessage(Message msg) {
// 子类的具体实现
}
}

至此,android的消息机制就分析完毕。