Android6.0 消息机制原理解析
消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的Handler进行处理;如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来。在编写Android应用程序时,当程序执行的任务比较繁重时,为了不阻塞UI主线程而导致ANR的发生,我们通常的做法的创建一个子线程来完成特定的任务。在创建子线程时,有两种选择,一种通过创建Thread对象来创建一个无消息循环的子线程;还有一种就是创建一个带有消息循环的子线程,而创建带有消息循环的子线程由于两种实现方法,一种是直接利用Android给我们封装好的HandlerThread类来直接生成一个带有消息循环的线程对象,另一种方法是在实现线程的run()方法内使用以下方式启动一个消息循环:
一、消息机制使用
通常消息都是有一个消息线程和一个Handler组成,下面我们看PowerManagerService中的一个消息Handler:
mHandlerThread=newServiceThread(TAG, Process.THREAD_PRIORITY_DISPLAY,false/*allowIo*/); mHandlerThread.start(); mHandler=newPowerManagerHandler(mHandlerThread.getLooper());
这里的ServiceThread就是一个HandlerThread,创建Handler的时候,必须把HandlerThread的looper传进去,否则就是默认当前线程的looper。
而每个handler,大致如下:
privatefinalclassPowerManagerHandlerextendsHandler{
publicPowerManagerHandler(Looperlooper){
super(looper,null,true/*async*/);
}
@Override
publicvoidhandleMessage(Messagemsg){
switch(msg.what){
caseMSG_USER_ACTIVITY_TIMEOUT:
handleUserActivityTimeout();
break;
caseMSG_SANDMAN:
handleSandman();
break;
caseMSG_SCREEN_BRIGHTNESS_BOOST_TIMEOUT:
handleScreenBrightnessBoostTimeout();
break;
caseMSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT:
checkWakeLockAquireTooLong();
Messagem=mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT);
m.setAsynchronous(true);
mHandler.sendMessageDelayed(m,WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
break;
}
}
}
二、消息机制原理
那我们先来看下HandlerThread的主函数run函数:
publicvoidrun(){
mTid=Process.myTid();
Looper.prepare();
synchronized(this){
mLooper=Looper.myLooper();//赋值后notifyall,主要是getLooper函数返回的是mLooper
notifyAll();
}
Process.setThreadPriority(mPriority);
onLooperPrepared();
Looper.loop();
mTid=-1;
}
再来看看Lopper的prepare函数,最后新建了一个Looper对象,并且放在线程的局部变量中。
publicstaticvoidprepare(){
prepare(true);
}
privatestaticvoidprepare(booleanquitAllowed){
if(sThreadLocal.get()!=null){
thrownewRuntimeException("OnlyoneLoopermaybecreatedperthread");
}
sThreadLocal.set(newLooper(quitAllowed));
}
Looper的构造函数中创建了MessageQueue
privateLooper(booleanquitAllowed){
mQueue=newMessageQueue(quitAllowed);
mThread=Thread.currentThread();
}
我们再来看下MessageQueue的构造函数,其中nativeInit是一个native方法,并且把返回值保存在mPtr显然是用long型变量保存的指针
MessageQueue(booleanquitAllowed){
mQuitAllowed=quitAllowed;
mPtr=nativeInit();
}
native函数中主要创建了NativeMessageQueue对象,并且把指针变量返回了。
staticjlongandroid_os_MessageQueue_nativeInit(JNIEnv*env,jclassclazz){
NativeMessageQueue*nativeMessageQueue=newNativeMessageQueue();
if(!nativeMessageQueue){
jniThrowRuntimeException(env,"Unabletoallocatenativequeue");
return0;
}
nativeMessageQueue->incStrong(env);
returnreinterpret_cast<jlong>(nativeMessageQueue);
}
NativeMessageQueue构造函数就是获取mLooper,如果没有就是新建一个Looper
NativeMessageQueue::NativeMessageQueue():
mPollEnv(NULL),mPollObj(NULL),mExceptionObj(NULL){
mLooper=Looper::getForThread();
if(mLooper==NULL){
mLooper=newLooper(false);
Looper::setForThread(mLooper);
}
}
然后我们再看下Looper的构造函数,显示调用了eventfd创建了一个fd,eventfd它的主要是用于进程或者线程间的通信,我们可以看下这篇博客eventfd介绍
Looper::Looper(boolallowNonCallbacks):
mAllowNonCallbacks(allowNonCallbacks),mSendingMessage(false),
mPolling(false),mEpollFd(-1),mEpollRebuildRequired(false),
mNextRequestSeq(0),mResponseIndex(0),mNextMessageUptime(LLONG_MAX){
mWakeEventFd=eventfd(0,EFD_NONBLOCK);
LOG_ALWAYS_FATAL_IF(mWakeEventFd<0,"Couldnotmakewakeeventfd.errno=%d",errno);
AutoMutex_l(mLock);
rebuildEpollLocked();
}
2.1c层创建epoll
我们再来看下rebuildEpollLocked函数,创建了epoll,并且把mWakeEventFd加入epoll,而且把mRequests的fd也加入epoll
voidLooper::rebuildEpollLocked(){
//Closeoldepollinstanceifwehaveone.
if(mEpollFd>=0){
#ifDEBUG_CALLBACKS
ALOGD("%p~rebuildEpollLocked-rebuildingepollset",this);
#endif
close(mEpollFd);
}
//Allocatethenewepollinstanceandregisterthewakepipe.
mEpollFd=epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd<0,"Couldnotcreateepollinstance.errno=%d",errno);
structepoll_eventeventItem;
memset(&eventItem,0,sizeof(epoll_event));//zerooutunusedmembersofdatafieldunion
eventItem.events=EPOLLIN;
eventItem.data.fd=mWakeEventFd;
intresult=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,mWakeEventFd,&eventItem);
LOG_ALWAYS_FATAL_IF(result!=0,"Couldnotaddwakeeventfdtoepollinstance.errno=%d",
errno);
for(size_ti=0;i<mRequests.size();i++){
constRequest&request=mRequests.valueAt(i);
structepoll_eventeventItem;
request.initEventItem(&eventItem);
intepollResult=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,request.fd,&eventItem);
if(epollResult<0){
ALOGE("Erroraddingepolleventsforfd%dwhilerebuildingepollset,errno=%d",
request.fd,errno);
}
}
}
继续回到HandlerThread的run函数,我们继续分析Looper的loop函数
publicvoidrun(){
mTid=Process.myTid();
Looper.prepare();
synchronized(this){
mLooper=Looper.myLooper();
notifyAll();
}
Process.setThreadPriority(mPriority);
onLooperPrepared();
Looper.loop();
mTid=-1;
}
我们看看Looper的loop函数:
publicstaticvoidloop(){
finalLooperme=myLooper();
if(me==null){
thrownewRuntimeException("NoLooper;Looper.prepare()wasn'tcalledonthisthread.");
}
finalMessageQueuequeue=me.mQueue;//得到Looper的mQueue
//Makesuretheidentityofthisthreadisthatofthelocalprocess,
//andkeeptrackofwhatthatidentitytokenactuallyis.
Binder.clearCallingIdentity();
finallongident=Binder.clearCallingIdentity();
for(;;){
Messagemsg=queue.next();//mightblock这个函数会阻塞,阻塞主要是epoll_wait
if(msg==null){
//Nomessageindicatesthatthemessagequeueisquitting.
return;
}
//Thismustbeinalocalvariable,incaseaUIeventsetsthelogger
Printerlogging=me.mLogging;//自己打的打印
if(logging!=null){
logging.println(">>>>>Dispatchingto"+msg.target+""+
msg.callback+":"+msg.what);
}
msg.target.dispatchMessage(msg);
if(logging!=null){
logging.println("<<<<<Finishedto"+msg.target+""+msg.callback);
}
//Makesurethatduringthecourseofdispatchingthe
//identityofthethreadwasn'tcorrupted.
finallongnewIdent=Binder.clearCallingIdentity();
if(ident!=newIdent){
Log.wtf(TAG,"Threadidentitychangedfrom0x"
+Long.toHexString(ident)+"to0x"
+Long.toHexString(newIdent)+"whiledispatchingto"
+msg.target.getClass().getName()+""
+msg.callback+"what="+msg.what);
}
msg.recycleUnchecked();
}
}
MessageQueue类的next函数主要是调用了nativePollOnce函数,后面就是从消息队列中取出一个Message
Messagenext(){
//Returnhereifthemessageloophasalreadyquitandbeendisposed.
//Thiscanhappeniftheapplicationtriestorestartalooperafterquit
//whichisnotsupported.
finallongptr=mPtr;//之前保留的指针
if(ptr==0){
returnnull;
}
intpendingIdleHandlerCount=-1;//-1onlyduringfirstiteration
intnextPollTimeoutMillis=0;
for(;;){
if(nextPollTimeoutMillis!=0){
Binder.flushPendingCommands();
}
nativePollOnce(ptr,nextPollTimeoutMillis);
下面我们主要看下nativePollOnce这个native函数,把之前的指针强制转换成NativeMessageQueue,然后调用其pollOnce函数
staticvoidandroid_os_MessageQueue_nativePollOnce(JNIEnv*env,jobjectobj,
jlongptr,jinttimeoutMillis){
NativeMessageQueue*nativeMessageQueue=reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env,obj,timeoutMillis);
}
2.2c层epoll_wait阻塞
pollOnce函数,这个函数前面的while一般都没有只是处理了indent大于0的情况,这种情况一般没有,所以我们可以直接看pollInner函数
intLooper::pollOnce(inttimeoutMillis,int*outFd,int*outEvents,void**outData){
intresult=0;
for(;;){
while(mResponseIndex<mResponses.size()){
constResponse&response=mResponses.itemAt(mResponseIndex++);
intident=response.request.ident;
if(ident>=0){
intfd=response.request.fd;
intevents=response.events;
void*data=response.request.data;
#ifDEBUG_POLL_AND_WAKE
ALOGD("%p~pollOnce-returningsignalledidentifier%d:"
"fd=%d,events=0x%x,data=%p",
this,ident,fd,events,data);
#endif
if(outFd!=NULL)*outFd=fd;
if(outEvents!=NULL)*outEvents=events;
if(outData!=NULL)*outData=data;
returnident;
}
}
if(result!=0){
#ifDEBUG_POLL_AND_WAKE
ALOGD("%p~pollOnce-returningresult%d",this,result);
#endif
if(outFd!=NULL)*outFd=0;
if(outEvents!=NULL)*outEvents=0;
if(outData!=NULL)*outData=NULL;
returnresult;
}
result=pollInner(timeoutMillis);
}
}
pollInner函数主要就是调用epoll_wait阻塞,并且java层会计算每次阻塞的时间传到c层,等待有mWakeEventFd或者之前addFd的fd有事件过来,才会epoll_wait返回。
intLooper::pollInner(inttimeoutMillis){
#ifDEBUG_POLL_AND_WAKE
ALOGD("%p~pollOnce-waiting:timeoutMillis=%d",this,timeoutMillis);
#endif
//Adjustthetimeoutbasedonwhenthenextmessageisdue.
if(timeoutMillis!=0&&mNextMessageUptime!=LLONG_MAX){
nsecs_tnow=systemTime(SYSTEM_TIME_MONOTONIC);
intmessageTimeoutMillis=toMillisecondTimeoutDelay(now,mNextMessageUptime);
if(messageTimeoutMillis>=0
&&(timeoutMillis<0||messageTimeoutMillis<timeoutMillis)){
timeoutMillis=messageTimeoutMillis;
}
#ifDEBUG_POLL_AND_WAKE
ALOGD("%p~pollOnce-nextmessagein%"PRId64"ns,adjustedtimeout:timeoutMillis=%d",
this,mNextMessageUptime-now,timeoutMillis);
#endif
}
//Poll.
intresult=POLL_WAKE;
mResponses.clear();//清空mResponses
mResponseIndex=0;
//Weareabouttoidle.
mPolling=true;
structepoll_eventeventItems[EPOLL_MAX_EVENTS];
inteventCount=epoll_wait(mEpollFd,eventItems,EPOLL_MAX_EVENTS,timeoutMillis);//epoll_wait主要线程阻塞在这,这个阻塞的时间也是有java层传过来的
//Nolongeridling.
mPolling=false;
//Acquirelock.
mLock.lock();
//Rebuildepollsetifneeded.
if(mEpollRebuildRequired){
mEpollRebuildRequired=false;
rebuildEpollLocked();
gotoDone;
}
//Checkforpollerror.
if(eventCount<0){
if(errno==EINTR){
gotoDone;
}
ALOGW("Pollfailedwithanunexpectederror,errno=%d",errno);
result=POLL_ERROR;
gotoDone;
}
//Checkforpolltimeout.
if(eventCount==0){
#ifDEBUG_POLL_AND_WAKE
ALOGD("%p~pollOnce-timeout",this);
#endif
result=POLL_TIMEOUT;
gotoDone;
}
//Handleallevents.
#ifDEBUG_POLL_AND_WAKE
ALOGD("%p~pollOnce-handlingeventsfrom%dfds",this,eventCount);
#endif
for(inti=0;i<eventCount;i++){
intfd=eventItems[i].data.fd;
uint32_tepollEvents=eventItems[i].events;
if(fd==mWakeEventFd){//通知唤醒线程的事件
if(epollEvents&EPOLLIN){
awoken();
}else{
ALOGW("Ignoringunexpectedepollevents0x%xonwakeeventfd.",epollEvents);
}
}else{
ssize_trequestIndex=mRequests.indexOfKey(fd);//之前addFd的事件
if(requestIndex>=0){
intevents=0;
if(epollEvents&EPOLLIN)events|=EVENT_INPUT;
if(epollEvents&EPOLLOUT)events|=EVENT_OUTPUT;
if(epollEvents&EPOLLERR)events|=EVENT_ERROR;
if(epollEvents&EPOLLHUP)events|=EVENT_HANGUP;
pushResponse(events,mRequests.valueAt(requestIndex));//放在mResponses中
}else{
ALOGW("Ignoringunexpectedepollevents0x%xonfd%dthatis"
"nolongerregistered.",epollEvents,fd);
}
}
}
Done:;
//Invokependingmessagecallbacks.
mNextMessageUptime=LLONG_MAX;
while(mMessageEnvelopes.size()!=0){//这块主要是c层的消息,java层的消息是自己管理的
nsecs_tnow=systemTime(SYSTEM_TIME_MONOTONIC);
constMessageEnvelope&messageEnvelope=mMessageEnvelopes.itemAt(0);
if(messageEnvelope.uptime<=now){
//Removetheenvelopefromthelist.
//WekeepastrongreferencetothehandleruntilthecalltohandleMessage
//finishes.Thenwedropitsothatthehandlercanbedeleted*before*
//wereacquireourlock.
{//obtainhandler
sp<MessageHandler>handler=messageEnvelope.handler;
Messagemessage=messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage=true;
mLock.unlock();
#ifDEBUG_POLL_AND_WAKE||DEBUG_CALLBACKS
ALOGD("%p~pollOnce-sendingmessage:handler=%p,what=%d",
this,handler.get(),message.what);
#endif
handler->handleMessage(message);
}//releasehandler
mLock.lock();
mSendingMessage=false;
result=POLL_CALLBACK;
}else{
//Thelastmessageleftattheheadofthequeuedeterminesthenextwakeuptime.
mNextMessageUptime=messageEnvelope.uptime;
break;
}
}
//Releaselock.
mLock.unlock();
//Invokeallresponsecallbacks.
for(size_ti=0;i<mResponses.size();i++){//这是之前addFd的事件的处理,主要是遍历mResponses,然后调用其回调
Response&response=mResponses.editItemAt(i);
if(response.request.ident==POLL_CALLBACK){
intfd=response.request.fd;
intevents=response.events;
void*data=response.request.data;
#ifDEBUG_POLL_AND_WAKE||DEBUG_CALLBACKS
ALOGD("%p~pollOnce-invokingfdeventcallback%p:fd=%d,events=0x%x,data=%p",
this,response.request.callback.get(),fd,events,data);
#endif
//Invokethecallback.Notethatthefiledescriptormaybeclosedby
//thecallback(andpotentiallyevenreused)beforethefunctionreturnsso
//weneedtobealittlecarefulwhenremovingthefiledescriptorafterwards.
intcallbackResult=response.request.callback->handleEvent(fd,events,data);
if(callbackResult==0){
removeFd(fd,response.request.seq);
}
//Clearthecallbackreferenceintheresponsestructurepromptlybecausewe
//willnotcleartheresponsevectoritselfuntilthenextpoll.
response.request.callback.clear();
result=POLL_CALLBACK;
}
}
returnresult;
}
继续分析Looper的loop函数,可以增加自己的打印来调试代码,之前调用Message的target的dispatchMessage来分配消息
for(;;){
Messagemsg=queue.next();//mightblock
if(msg==null){
//Nomessageindicatesthatthemessagequeueisquitting.
return;
}
//Thismustbeinalocalvariable,incaseaUIeventsetsthelogger
Printerlogging=me.mLogging;//自己的打印
if(logging!=null){
logging.println(">>>>>Dispatchingto"+msg.target+""+
msg.callback+":"+msg.what);
}
msg.target.dispatchMessage(msg);
if(logging!=null){
logging.println("<<<<<Finishedto"+msg.target+""+msg.callback);
}
//Makesurethatduringthecourseofdispatchingthe
//identityofthethreadwasn'tcorrupted.
finallongnewIdent=Binder.clearCallingIdentity();
if(ident!=newIdent){
Log.wtf(TAG,"Threadidentitychangedfrom0x"
+Long.toHexString(ident)+"to0x"
+Long.toHexString(newIdent)+"whiledispatchingto"
+msg.target.getClass().getName()+""
+msg.callback+"what="+msg.what);
}
msg.recycleUnchecked();
}
}
2.3增加调试打印
我们先来看自己添加打印,可以通过Lopper的setMessageLogging函数来打印
publicvoidsetMessageLogging(@NullablePrinterprinter){
mLogging=printer;
}
Printer就是一个interface
publicinterfacePrinter{
/**
*Writealineoftexttotheoutput.Thereisnoneedtoterminate
*thegivenstringwithanewline.
*/
voidprintln(Stringx);
}
2.4java层消息分发处理
再来看消息的分发,先是调用Handler的obtainMessage函数
Messagemsg=mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); msg.setAsynchronous(true); mHandler.sendMessageDelayed(msg,WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
先看obtainMessage调用了Message的obtain函数
publicfinalMessageobtainMessage(intwhat)
{
returnMessage.obtain(this,what);
}
Message的obtain函数就是新建一个Message,然后其target就是设置成其Handler
publicstaticMessageobtain(Handlerh,intwhat){
Messagem=obtain();//就是新建一个Message
m.target=h;
m.what=what;
returnm;
}
我们再联系之前分发消息
msg.target.dispatchMessage(msg);最后就是调用Handler的dispatchMessage函数,最后在Handler中,最后会根据不同的情况对消息进行处理。
publicvoiddispatchMessage(Messagemsg){
if(msg.callback!=null){
handleCallback(msg);//这种就是用post形式发送,带Runnable的
}else{
if(mCallback!=null){//这种是handler传参的时候就是传入了mCallback回调了
if(mCallback.handleMessage(msg)){
return;
}
}
handleMessage(msg);//最后就是在自己实现的handleMessage处理
}
}
2.3java层消息发送
我们再看下java层的消息发送,主要也是调用Handler的sendMessagepost之类函数,最终都会调用下面这个函数
publicbooleansendMessageAtTime(Messagemsg,longuptimeMillis){
MessageQueuequeue=mQueue;
if(queue==null){
RuntimeExceptione=newRuntimeException(
this+"sendMessageAtTime()calledwithnomQueue");
Log.w("Looper",e.getMessage(),e);
returnfalse;
}
returnenqueueMessage(queue,msg,uptimeMillis);
}
我们再来看java层发送消息最终都会调用enqueueMessage函数
privatebooleanenqueueMessage(MessageQueuequeue,Messagemsg,longuptimeMillis){
msg.target=this;
if(mAsynchronous){
msg.setAsynchronous(true);
}
returnqueue.enqueueMessage(msg,uptimeMillis);
}
最终在enqueueMessage中,把消息加入消息队列,然后需要的话就调用c层的nativeWake函数
booleanenqueueMessage(Messagemsg,longwhen){
if(msg.target==null){
thrownewIllegalArgumentException("Messagemusthaveatarget.");
}
if(msg.isInUse()){
thrownewIllegalStateException(msg+"Thismessageisalreadyinuse.");
}
synchronized(this){
if(mQuitting){
IllegalStateExceptione=newIllegalStateException(
msg.target+"sendingmessagetoaHandleronadeadthread");
Log.w(TAG,e.getMessage(),e);
msg.recycle();
returnfalse;
}
msg.markInUse();
msg.when=when;
Messagep=mMessages;
booleanneedWake;
if(p==null||when==0||when<p.when){
//Newhead,wakeuptheeventqueueifblocked.
msg.next=p;
mMessages=msg;
needWake=mBlocked;
}else{
//Insertedwithinthemiddleofthequeue.Usuallywedon'thavetowake
//uptheeventqueueunlessthereisabarrierattheheadofthequeue
//andthemessageistheearliestasynchronousmessageinthequeue.
needWake=mBlocked&&p.target==null&&msg.isAsynchronous();
Messageprev;
for(;;){
prev=p;
p=p.next;
if(p==null||when<p.when){
break;
}
if(needWake&&p.isAsynchronous()){
needWake=false;
}
}
msg.next=p;//invariant:p==prev.next
prev.next=msg;
}
//WecanassumemPtr!=0becausemQuittingisfalse.
if(needWake){
nativeWake(mPtr);
}
}
returntrue;
}
我们看下这个native方法,最后也是调用了Looper的wake函数
staticvoidandroid_os_MessageQueue_nativeWake(JNIEnv*env,jclassclazz,jlongptr){
NativeMessageQueue*nativeMessageQueue=reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
voidNativeMessageQueue::wake(){
mLooper->wake();
}
Looper类的wake,函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了继续先发送c层消息,然后处理之前addFd的事件,然后处理java层的消息。
voidLooper::wake(){
#ifDEBUG_POLL_AND_WAKE
ALOGD("%p~wake",this);
#endif
uint64_tinc=1;
ssize_tnWrite=TEMP_FAILURE_RETRY(write(mWakeEventFd,&inc,sizeof(uint64_t)));
if(nWrite!=sizeof(uint64_t)){
if(errno!=EAGAIN){
ALOGW("Couldnotwritewakesignal,errno=%d",errno);
}
}
}
2.4c层发送消息
在c层也是可以发送消息的,主要是调用Looper的sendMessageAtTime函数,参数有有一个handler是一个回调,我们把消息放在mMessageEnvelopes中。
voidLooper::sendMessageAtTime(nsecs_tuptime,constsp<MessageHandler>&handler,
constMessage&message){
#ifDEBUG_CALLBACKS
ALOGD("%p~sendMessageAtTime-uptime=%"PRId64",handler=%p,what=%d",
this,uptime,handler.get(),message.what);
#endif
size_ti=0;
{//acquirelock
AutoMutex_l(mLock);
size_tmessageCount=mMessageEnvelopes.size();
while(i<messageCount&&uptime>=mMessageEnvelopes.itemAt(i).uptime){
i+=1;
}
MessageEnvelopemessageEnvelope(uptime,handler,message);
mMessageEnvelopes.insertAt(messageEnvelope,i,1);
//Optimization:IftheLooperiscurrentlysendingamessage,thenwecanskip
//thecalltowake()becausethenextthingtheLooperwilldoafterprocessing
//messagesistodecidewhenthenextwakeuptimeshouldbe.Infact,itdoes
//notevenmatterwhetherthiscodeisrunningontheLooperthread.
if(mSendingMessage){
return;
}
}//releaselock
//Wakethepolllooponlywhenweenqueueanewmessageatthehead.
if(i==0){
wake();
}
}
当在pollOnce中,在epoll_wait之后,会遍历mMessageEnvelopes中的消息,然后调用其handler的handleMessage函数
while(mMessageEnvelopes.size()!=0){
nsecs_tnow=systemTime(SYSTEM_TIME_MONOTONIC);
constMessageEnvelope&messageEnvelope=mMessageEnvelopes.itemAt(0);
if(messageEnvelope.uptime<=now){
//Removetheenvelopefromthelist.
//WekeepastrongreferencetothehandleruntilthecalltohandleMessage
//finishes.Thenwedropitsothatthehandlercanbedeleted*before*
//wereacquireourlock.
{//obtainhandler
sp<MessageHandler>handler=messageEnvelope.handler;
Messagemessage=messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage=true;
mLock.unlock();
#ifDEBUG_POLL_AND_WAKE||DEBUG_CALLBACKS
ALOGD("%p~pollOnce-sendingmessage:handler=%p,what=%d",
this,handler.get(),message.what);
#endif
handler->handleMessage(message);
}//releasehandler
mLock.lock();
mSendingMessage=false;
result=POLL_CALLBACK;
}else{
//Thelastmessageleftattheheadofthequeuedeterminesthenextwakeuptime.
mNextMessageUptime=messageEnvelope.uptime;
break;
}
}
有一个Looper_test.cpp文件,里面介绍了很多Looper的使用方法,我们来看下
sp<StubMessageHandler>handler=newStubMessageHandler();
mLooper->sendMessageAtTime(now+ms2ns(100),handler,Message(MSG_TEST1));
StubMessageHandler继承MessageHandler就必须实现handleMessage方法
classStubMessageHandler:publicMessageHandler{
public:
Vector<Message>messages;
virtualvoidhandleMessage(constMessage&message){
messages.push(message);
}
};
我们再顺便看下Message和MessageHandler类
structMessage{
Message():what(0){}
Message(intwhat):what(what){}
/*Themessagetype.(interpretationisleftuptothehandler)*/
intwhat;
};
/**
*InterfaceforaLoopermessagehandler.
*
*TheLooperholdsastrongreferencetothemessagehandlerwheneverithas
*amessagetodelivertoit.MakesuretocallLooper::removeMessages
*toremoveanypendingmessagesdestinedforthehandlersothatthehandler
*canbedestroyed.
*/
classMessageHandler:publicvirtualRefBase{
protected:
virtual~MessageHandler(){}
public:
/**
*Handlesamessage.
*/
virtualvoidhandleMessage(constMessage&message)=0;
};
2.5c层addFd
我们也可以在Looper.cpp的addFd中增加fd放入线程epoll中,当fd有数据来我们也可以处理相应的数据,下面我们先来看下addFd函数,我们注意其中有一个callBack回调
intLooper::addFd(intfd,intident,intevents,Looper_callbackFunccallback,void*data){
returnaddFd(fd,ident,events,callback?newSimpleLooperCallback(callback):NULL,data);
}
intLooper::addFd(intfd,intident,intevents,constsp<LooperCallback>&callback,void*data){
#ifDEBUG_CALLBACKS
ALOGD("%p~addFd-fd=%d,ident=%d,events=0x%x,callback=%p,data=%p",this,fd,ident,
events,callback.get(),data);
#endif
if(!callback.get()){
if(!mAllowNonCallbacks){
ALOGE("InvalidattempttosetNULLcallbackbutnotallowedforthislooper.");
return-1;
}
if(ident<0){
ALOGE("InvalidattempttosetNULLcallbackwithident<0.");
return-1;
}
}else{
ident=POLL_CALLBACK;
}
{//acquirelock
AutoMutex_l(mLock);
Requestrequest;
request.fd=fd;
request.ident=ident;
request.events=events;
request.seq=mNextRequestSeq++;
request.callback=callback;
request.data=data;
if(mNextRequestSeq==-1)mNextRequestSeq=0;//reservesequencenumber-1
structepoll_eventeventItem;
request.initEventItem(&eventItem);
ssize_trequestIndex=mRequests.indexOfKey(fd);
if(requestIndex<0){
intepollResult=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,fd,&eventItem);//加入epoll
if(epollResult<0){
ALOGE("Erroraddingepolleventsforfd%d,errno=%d",fd,errno);
return-1;
}
mRequests.add(fd,request);//放入mRequests中
}else{
intepollResult=epoll_ctl(mEpollFd,EPOLL_CTL_MOD,fd,&eventItem);//更新
if(epollResult<0){
if(errno==ENOENT){
//TolerateENOENTbecauseitmeansthatanolderfiledescriptorwas
//closedbeforeitscallbackwasunregisteredandmeanwhileanew
//filedescriptorwiththesamenumberhasbeencreatedandisnow
//beingregisteredforthefirsttime.Thiserrormayoccurnaturally
//whenacallbackhastheside-effectofclosingthefiledescriptor
//beforereturningandunregisteringitself.Callbacksequencenumber
//checksfurtherensurethattheraceisbenign.
//
//Unfortunatelyduetokernellimitationsweneedtorebuildtheepoll
//setfromscratchbecauseitmaycontainanoldfilehandlethatweare
//nowunabletoremovesinceitsfiledescriptorisnolongervalid.
//Nosuchproblemwouldhaveoccurredifwewereusingthepollsystem
//callinstead,butthatapproachcarriesothersdisadvantages.
#ifDEBUG_CALLBACKS
ALOGD("%p~addFd-EPOLL_CTL_MODfailedduetofiledescriptor"
"beingrecycled,fallingbackonEPOLL_CTL_ADD,errno=%d",
this,errno);
#endif
epollResult=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,fd,&eventItem);
if(epollResult<0){
ALOGE("Errormodifyingoraddingepolleventsforfd%d,errno=%d",
fd,errno);
return-1;
}
scheduleEpollRebuildLocked();
}else{
ALOGE("Errormodifyingepolleventsforfd%d,errno=%d",fd,errno);
return-1;
}
}
mRequests.replaceValueAt(requestIndex,request);
}
}//releaselock
return1;
}
在pollOnce函数中,我们先寻找mRequests中匹配的fd,然后在pushResponse中新建一个Response,然后把Response和Request匹配起来。
}else{
ssize_trequestIndex=mRequests.indexOfKey(fd);
if(requestIndex>=0){
intevents=0;
if(epollEvents&EPOLLIN)events|=EVENT_INPUT;
if(epollEvents&EPOLLOUT)events|=EVENT_OUTPUT;
if(epollEvents&EPOLLERR)events|=EVENT_ERROR;
if(epollEvents&EPOLLHUP)events|=EVENT_HANGUP;
pushResponse(events,mRequests.valueAt(requestIndex));
}else{
ALOGW("Ignoringunexpectedepollevents0x%xonfd%dthatis"
"nolongerregistered.",epollEvents,fd);
}
}
下面我们就会遍历mResponses中的Response,然后调用其request中的回调
for(size_ti=0;i<mResponses.size();i++){
Response&response=mResponses.editItemAt(i);
if(response.request.ident==POLL_CALLBACK){
intfd=response.request.fd;
intevents=response.events;
void*data=response.request.data;
#ifDEBUG_POLL_AND_WAKE||DEBUG_CALLBACKS
ALOGD("%p~pollOnce-invokingfdeventcallback%p:fd=%d,events=0x%x,data=%p",
this,response.request.callback.get(),fd,events,data);
#endif
//Invokethecallback.Notethatthefiledescriptormaybeclosedby
//thecallback(andpotentiallyevenreused)beforethefunctionreturnsso
//weneedtobealittlecarefulwhenremovingthefiledescriptorafterwards.
intcallbackResult=response.request.callback->handleEvent(fd,events,data);
if(callbackResult==0){
removeFd(fd,response.request.seq);
}
//Clearthecallbackreferenceintheresponsestructurepromptlybecausewe
//willnotcleartheresponsevectoritselfuntilthenextpoll.
response.request.callback.clear();
result=POLL_CALLBACK;
}
}
同样我们再来看看Looper_test.cpp是如何使用的?
Pipepipe; StubCallbackHandlerhandler(true); handler.setCallback(mLooper,pipe.receiveFd,Looper::EVENT_INPUT);
我们看下handler的setCallback函数
classCallbackHandler{
public:
voidsetCallback(constsp<Looper>&looper,intfd,intevents){
looper->addFd(fd,0,events,staticHandler,this);//就是调用了looper的addFd函数,并且回调
}
protected:
virtual~CallbackHandler(){}
virtualinthandler(intfd,intevents)=0;
private:
staticintstaticHandler(intfd,intevents,void*data){//这个就是回调函数
returnstatic_cast<CallbackHandler*>(data)->handler(fd,events);
}
};
classStubCallbackHandler:publicCallbackHandler{
public:
intnextResult;
intcallbackCount;
intfd;
intevents;
StubCallbackHandler(intnextResult):nextResult(nextResult),
callbackCount(0),fd(-1),events(-1){
}
protected:
virtualinthandler(intfd,intevents){//这个是通过回调函数再调到这里的
callbackCount+=1;
this->fd=fd;
this->events=events;
returnnextResult;
}
};
我们结合Looper的addFd一起来看,当callback是有的,我们新建一个SimpleLooperCallback
intLooper::addFd(intfd,intident,intevents,Looper_callbackFunccallback,void*data){
returnaddFd(fd,ident,events,callback?newSimpleLooperCallback(callback):NULL,data);
}
这里的Looper_callbackFunc是一个typedef
typedefint(*Looper_callbackFunc)(intfd,intevents,void*data);
我们再来看SimpleLooperCallback
classSimpleLooperCallback:publicLooperCallback{
protected:
virtual~SimpleLooperCallback();
public:
SimpleLooperCallback(Looper_callbackFunccallback);
virtualinthandleEvent(intfd,intevents,void*data);
private:
Looper_callbackFuncmCallback;
};SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunccallback):
mCallback(callback){
}
SimpleLooperCallback::~SimpleLooperCallback(){
}
intSimpleLooperCallback::handleEvent(intfd,intevents,void*data){
returnmCallback(fd,events,data);
}
最后我们是调用callback->handleEvent(fd,events,data),而callback就是SimpleLooperCallback,这里的data,之前传进来的就是CallbackHandler的this指针
因此最后就是调用了staticHandler,而data->handler,就是this->handler,最后是虚函数就调用到了StubCallbackHandler的handler函数中了。
当然我们也可以不用这么复杂,直接使用第二个addFd函数,当然callBack我们需要自己定义一个类来实现LooperCallBack类就行了,这样就简单多了。
intaddFd(intfd,intident,intevents,constsp<LooperCallback>&callback,void*data);
2.6java层addFd
一直以为只能在c层的Looper中才能addFd,原来在java层也通过jni做了这个功能。
我们可以在MessageQueue中的addOnFileDescriptorEventListener来实现这个功能
publicvoidaddOnFileDescriptorEventListener(@NonNullFileDescriptorfd,
@OnFileDescriptorEventListener.Eventsintevents,
@NonNullOnFileDescriptorEventListenerlistener){
if(fd==null){
thrownewIllegalArgumentException("fdmustnotbenull");
}
if(listener==null){
thrownewIllegalArgumentException("listenermustnotbenull");
}
synchronized(this){
updateOnFileDescriptorEventListenerLocked(fd,events,listener);
}
}
我们再来看看OnFileDescriptorEventListener这个回调
publicinterfaceOnFileDescriptorEventListener{
publicstaticfinalintEVENT_INPUT=1<<0;
publicstaticfinalintEVENT_OUTPUT=1<<1;
publicstaticfinalintEVENT_ERROR=1<<2;
/**@hide*/
@Retention(RetentionPolicy.SOURCE)
@IntDef(flag=true,value={EVENT_INPUT,EVENT_OUTPUT,EVENT_ERROR})
public@interfaceEvents{}
@EventsintonFileDescriptorEvents(@NonNullFileDescriptorfd,@Eventsintevents);
}
接着调用了updateOnFileDescriptorEventListenerLocked函数
privatevoidupdateOnFileDescriptorEventListenerLocked(FileDescriptorfd,intevents,
OnFileDescriptorEventListenerlistener){
finalintfdNum=fd.getInt$();
intindex=-1;
FileDescriptorRecordrecord=null;
if(mFileDescriptorRecords!=null){
index=mFileDescriptorRecords.indexOfKey(fdNum);
if(index>=0){
record=mFileDescriptorRecords.valueAt(index);
if(record!=null&&record.mEvents==events){
return;
}
}
}
if(events!=0){
events|=OnFileDescriptorEventListener.EVENT_ERROR;
if(record==null){
if(mFileDescriptorRecords==null){
mFileDescriptorRecords=newSparseArray<FileDescriptorRecord>();
}
record=newFileDescriptorRecord(fd,events,listener);//fd保存在FileDescriptorRecord对象
mFileDescriptorRecords.put(fdNum,record);//mFileDescriptorRecords然后保存在
}else{
record.mListener=listener;
record.mEvents=events;
record.mSeq+=1;
}
nativeSetFileDescriptorEvents(mPtr,fdNum,events);//调用native函数
}elseif(record!=null){
record.mEvents=0;
mFileDescriptorRecords.removeAt(index);
}
}
native最后调用了NativeMessageQueue的setFileDescriptorEvents函数
staticvoidandroid_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv*env,jclassclazz,
jlongptr,jintfd,jintevents){
NativeMessageQueue*nativeMessageQueue=reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->setFileDescriptorEvents(fd,events);
}
setFileDescriptorEvents函数,这个addFd就是调用的第二个addFd,因此我们可以肯定NativeMessageQueue继承了LooperCallback
voidNativeMessageQueue::setFileDescriptorEvents(intfd,intevents){
if(events){
intlooperEvents=0;
if(events&CALLBACK_EVENT_INPUT){
looperEvents|=Looper::EVENT_INPUT;
}
if(events&CALLBACK_EVENT_OUTPUT){
looperEvents|=Looper::EVENT_OUTPUT;
}
mLooper->addFd(fd,Looper::POLL_CALLBACK,looperEvents,this,
reinterpret_cast<void*>(events));
}else{
mLooper->removeFd(fd);
}
}
果然是,需要实现handleEvent函数
classNativeMessageQueue:publicMessageQueue,publicLooperCallback{
public:
NativeMessageQueue();
virtual~NativeMessageQueue();
virtualvoidraiseException(JNIEnv*env,constchar*msg,jthrowableexceptionObj);
voidpollOnce(JNIEnv*env,jobjectobj,inttimeoutMillis);
voidwake();
voidsetFileDescriptorEvents(intfd,intevents);
virtualinthandleEvent(intfd,intevents,void*data);
handleEvent就是在looper中epoll_wait之后,当我们增加的fd有数据就会调用这个函数
intNativeMessageQueue::handleEvent(intfd,intlooperEvents,void*data){
intevents=0;
if(looperEvents&Looper::EVENT_INPUT){
events|=CALLBACK_EVENT_INPUT;
}
if(looperEvents&Looper::EVENT_OUTPUT){
events|=CALLBACK_EVENT_OUTPUT;
}
if(looperEvents&(Looper::EVENT_ERROR|Looper::EVENT_HANGUP|Looper::EVENT_INVALID)){
events|=CALLBACK_EVENT_ERROR;
}
intoldWatchedEvents=reinterpret_cast<intptr_t>(data);
intnewWatchedEvents=mPollEnv->CallIntMethod(mPollObj,
gMessageQueueClassInfo.dispatchEvents,fd,events);//调用回调
if(!newWatchedEvents){
return0;//unregisterthefd
}
if(newWatchedEvents!=oldWatchedEvents){
setFileDescriptorEvents(fd,newWatchedEvents);
}
return1;
}
最后在java的MessageQueue中的dispatchEvents就是在jni层反调过来的,然后调用之前注册的回调函数
//Calledfromnativecode.
privateintdispatchEvents(intfd,intevents){
//Getthefiledescriptorrecordandanystatethatmightchange.
finalFileDescriptorRecordrecord;
finalintoldWatchedEvents;
finalOnFileDescriptorEventListenerlistener;
finalintseq;
synchronized(this){
record=mFileDescriptorRecords.get(fd);//通过fd得到FileDescriptorRecord
if(record==null){
return0;//spurious,nolistenerregistered
}
oldWatchedEvents=record.mEvents;
events&=oldWatchedEvents;//filtereventsbasedoncurrentwatchedset
if(events==0){
returnoldWatchedEvents;//spurious,watchedeventschanged
}
listener=record.mListener;
seq=record.mSeq;
}
//Invokethelisteneroutsideofthelock.
intnewWatchedEvents=listener.onFileDescriptorEvents(//listener回调
record.mDescriptor,events);
if(newWatchedEvents!=0){
newWatchedEvents|=OnFileDescriptorEventListener.EVENT_ERROR;
}
//Updatethefiledescriptorrecordifthelistenerchangedthesetof
//eventstowatchandthelisteneritselfhasn'tbeenupdatedsince.
if(newWatchedEvents!=oldWatchedEvents){
synchronized(this){
intindex=mFileDescriptorRecords.indexOfKey(fd);
if(index>=0&&mFileDescriptorRecords.valueAt(index)==record
&&record.mSeq==seq){
record.mEvents=newWatchedEvents;
if(newWatchedEvents==0){
mFileDescriptorRecords.removeAt(index);
}
}
}
}
//Returnthenewsetofeventstowatchfornativecodetotakecareof.
returnnewWatchedEvents;
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。