天天看點

音視訊開發(四十三):Android 消息機制(Looper Handler MessageQueue Message )

目錄

  1. Android消息機制流程
  2. Handler
  3. Message
  4. MessageQueue
  5. Looper
  6. HandleThread

篇外話

在“音視訊開發之旅系列”之外,把自己比較薄弱的Java&Android基礎也抽時間進行學習加強些,這也更符合自己的内心追求和自我期待。并行的開始另外一段學習旅程,從Handler消息機制開啟,結合消息機制的流程以及源碼進行學習分析。

一、Android消息機制流程

我們先通過下面兩張圖來對Android消息機制流程以及關鍵類之間的關系有個了解,後面我們再結合源碼一一進行分析。

消息機制的流程

音視訊開發(四十三):Android 消息機制(Looper Handler MessageQueue Message )

Handler、Message、MessageQueue、Looper之間的關系 

音視訊開發(四十三):Android 消息機制(Looper Handler MessageQueue Message )

二、Handler

Handler有兩個主要的用途:

  1. 排程消息在某個時間點執行;
  2. 不同線程之間通信

2.1 全局變量

final Looper mLooper;     //有Looper的引用
 final MessageQueue mQueue;//有MessageQueue的引用
 final Callback mCallback;
 final boolean mAsynchronous;
 IMessenger mMessenger;
           

2.2 構造方法

public Handler() {
        this(null, false);
    }
    public Handler(@NonNull Looper looper) {
        this(looper, null, false);
    }

 public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
        mLooper = looper;
        mQueue = looper.mQueue;
        mCallback = callback;
        mAsynchronous = async;
    }
           

2.3 擷取Message

//從Message複用池中擷取一個Message   
 public final Message obtainMessage()
    {
        return Message.obtain(this);
    }

//和上面的方法基本一緻,差異在于從複用池中擷取到Message後給what指派
    public final Message obtainMessage(int what)
    {
        return Message.obtain(this, what);
    }
//...其他obtainMessage類似
           

2.4 發送消息

音視訊開發(四十三):Android 消息機制(Looper Handler MessageQueue Message )

 下面我們挑幾個發送方法來看下

sendMessage: 發送一個Message,when為目前的時間

MessageQueue根據when進行比對插入位置

public final boolean sendMessage(Message msg)
    {
        return sendMessageDelayed(msg, 0);
    }

   public final boolean sendMessageDelayed(Message msg, long delayMillis)
    {
        if (delayMillis < 0) {
            delayMillis = 0;
        }
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }

    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        ......
        return enqueueMessage(queue, msg, uptimeMillis);
    }
           

post:從消息複用池中擷取Message,設定Message的Callback

public final boolean post(Runnable r)
    {
       return  sendMessageDelayed(getPostMessage(r), 0);
    }
    
    private static Message getPostMessage(Runnable r) {
        Message m = Message.obtain();
        m.callback = r;
        return m;
    }
           

postAtFrontOfQueue(): 将消息插入到隊列頭部

通過調用sendMessageAtFrontOfQueue 加入一個when為0的message到隊列,即插入到隊列的頭部,需要注意的是 MessageQueue#enqueueMessage的插入到連結清單中時是根據when比較的(when < p.when),如果之前已經有多個when等于0的消息在隊列中,這個新的會加入到前面when也為0的後面。

public final boolean postAtFrontOfQueue(Runnable r)
    {
        return sendMessageAtFrontOfQueue(getPostMessage(r));
    }

    public final boolean sendMessageAtFrontOfQueue(Message msg) {
        MessageQueue queue = mQueue;
        ......
        //第三個參數為0,即Message的when為0,插入到隊列的頭部,注意到MessageQueue#enqueueMessage的插入到連結清單中時是根據when比較的(when < p.when),如果之前已經有多個when等于0的消息在隊列中,這個新的會加入到前面when也為0的後面。
        return enqueueMessage(queue, msg, 0);
    }
           

2.5 派發消息 dispatchMessage

優先級如下:

Message的回調方法callback.run() >

Handler的回調方法mCallback.handleMessage(msg) > Handler的預設方法handleMessage(msg)

public void dispatchMessage(@NonNull Message msg) {
      //Message的回調方法,優先級最高  
    if (msg.callback != null) {
            handleCallback(msg);
        } else {
            //Handler的mCallBack優先級次之
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            //Handler的handleMessage方法優先級最低(大部分都是在該方法中實作Message的處理)
            handleMessage(msg);
        }
    }
           
本文福利, 免費領取C++音視訊學習資料包、技術視訊,内容包括(音視訊開發,面試題,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,編解碼,推拉流,srs)↓↓↓↓↓↓見下面↓↓文章底部點選免費領取↓↓

三、Message

全局變量

//一些重要的變量

    public int arg1;
    public int arg2;
    public Object obj;
    public long when;
    Bundle data;
    Handler target;  //Message中有個Handler的引用
    Runnable callback;
    
    //Message有next指針,可以組成單向連結清單
    Message next;


    public static final Object sPoolSync = new Object();
    
    //複用池中的第一個Message
    private static Message sPool;
    
    //複用池的大小,預設最大50個(如果短時間内有超過複用池最大數量的Message會怎樣,重新new)
    private static int sPoolSize = 0;
    private static final int MAX_POOL_SIZE = 50;
           

構造方法

檢視下是否有可以複用的message,如果有,複用池的中可複用的Message個數減一,傳回該Message;如果沒有重新new一個。注意複用池預設最大數量為50。

public static Message obtain() {
       synchronized (sPoolSync) {
           //檢視下是否有可以複用的message
           if (sPool != null) {
               //取出第一個Message
               Message m = sPool;
               sPool = m.next;
               m.next = null;
               m.flags = 0; // clear in-use flag
               //複用池的中可複用的Message個數減一
               sPoolSize--;
               return m;
           }
       }
       //如果複用池中沒有Message了重新new
       return new Message();
   }
           

recycleUnchecked

//标記一個Message時異步消息,正常的情況都是同步的Message,當遇到同步屏障的時候,優先執行第一個異步消息。關于同步屏障,我們在MessageQueue中在結合next等方法再介紹。

public void setAsynchronous(boolean async) {
        if (async) {
            flags |= FLAG_ASYNCHRONOUS;
        } else {
            flags &= ~FLAG_ASYNCHRONOUS;
        }
    }


void recycleUnchecked() {
        // Mark the message as in use while it remains in the recycled object pool.
        // Clear out all other details.
        flags = FLAG_IN_USE;
        what = 0;
        arg1 = 0;
        arg2 = 0;
        obj = null;
        replyTo = null;
        sendingUid = UID_NONE;
        workSourceUid = UID_NONE;
        when = 0;
        target = null;
        callback = null;
        data = null;

        synchronized (sPoolSync) {
            //可以複用的message為50個,如果超過了就不會再複用
            if (sPoolSize < MAX_POOL_SIZE) {
                next = sPool;
                sPool = this;
                sPoolSize++;
            }
        }
    }
           

//toString和dumpDebug可以Dump出message資訊,遇到一些問題時可以幫助分析

android.os.Message#toString(long)

android.os.Message#dumpDebug

四、MessageQueue

MessageQueue是一個單連結清單優先隊列

Message不能直接添加到MessageQueue中,要通過Handler以及相對應的Looper進行添加。

變量

//MessageQueue連結清單中的第一個Message
Message mMessages;
           

next:從消息隊列中取出下一條要執行的消息

如果是同步屏障消息,找到第一個隊列中中第一個異步消息

如果第一個Message的執行時間比目前時間見還要晚,記錄還要多久開始執行;否則就找到下一條要執行的Message。

後面的Looper的loop方法會從過queue.next調用該方法,擷取需要執行的下一個Message,其中會調用到阻塞的native方法nativePollOnce,該方法用于“等待”, 直到下一條消息可用為止. 如果在此調用期間花費的時間很長, 表明對應線程沒有實際工作要做,不會是以會出現ANR,ANR和這個沒有半毛錢關系。

關鍵代碼如下:

Message next() {

        //native層MessageQueue的指針
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

     ......
        for (;;) {
           
            //阻塞操作,當等待nextPollTimeoutMillis時長,或者消息隊列被喚醒
            //nativePollOnce用于“等待”, 直到下一條消息可用為止. 如果在此調用期間花費的時間很長, 表明對應線程沒有實際工作要做,不會是以會出現ANR,ANR和這個沒有半毛錢關系。
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                
                //建立一個新的Message指向 目前消息隊列的頭
                Message msg = mMessages;
                
                //如果是同步屏障消息,找到第一個隊列中中第一個異步消息
                if (msg != null && msg.target == null) {
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                
                if (msg != null) {
                    //如果第一個Message的執行時間比目前時間見還要晚,記錄還要多久開始執行
                    if (now < msg.when) {                    
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        //否則從連結清單中取出目前的Message ,并且把連結清單中next指向指向下一個Message
                        
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        //取出目前的Message的值,next置為空
                        msg.next = null;
                        msg.markInUse();
                        return msg;
                    }
                } 
                .....

                //android.os.MessageQueue#quit時mQuitting為true
                //如果需要退出,立即執行并傳回一個null的Message,android.os.Looper.loop收到一個null的message後退出Looper循環
                if (mQuitting) {
                    dispose();
                    return null;
                }
        ......
            if (pendingIdleHandlerCount <= 0) {
                                // 注意這裡,如果沒有消息需要執行,mBlocked标記為true,在enqueueMessage會根據該标記判斷是否調用nativeWake喚醒
                                mBlocked = true;
                                continue;
                            }
        ......
    }
    ......
}
           

enqueueMessage:向消息隊列中插入一條Message

如果消息連結清單為空,或者插入的Message比消息連結清單第一個消息要執行的更早,直接插入到頭部

否則在連結清單中找到合适位置插入,通常情況下不需要喚醒事件隊列,以下兩個情況除外:

  1. 消息連結清單中隻有剛插入的這一個Message,并且mBlocked為true即,正在阻塞狀态,收到一個消息後也進入喚醒
  2. 連結清單的頭是一個同步屏障,并且該條消息是第一條異步消息

喚醒誰?MessageQueue.next中被阻塞的nativePollOnce

具體實作如下,

關于如何找到合适的位置?這涉及到連結清單的插入算法:引入一個prev變量,該變量指向p也message(如果是for循環的内部第一次執行),然後把p進行向next移動,和需要插入的Message進行比較when

關鍵代碼如下:

boolean enqueueMessage(Message msg, long when) {
        ......

        synchronized (this) {
          
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            
            //如果消息連結清單為空,或者插入的Message比消息連結清單第一個消息要執行的更早,直接插入到頭部
            if (p == null || when == 0 || when < p.when) {
                
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
               
                //否則在連結清單中找到合适位置插入
                //通常情況下不需要喚醒事件隊列,除非連結清單的頭是一個同步屏障,并且該條消息是第一條異步消息
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                //具體實作如下,這個畫張圖來說明
                //連結清單引入一個prev變量,該變量指向p也message(如果是for循環的内部第一次執行),然後把p進行向next移動,和需要插入的Message進行比較when
                Message prev;
                
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; 
                prev.next = msg;
            }

            //如果插入的是異步消息,并且消息連結清單第一條消息是同步屏障消息。
    //或者消息連結清單中隻有剛插入的這一個Message,并且mBlocked為true即,正在阻塞狀态,收到一個消息後也進入喚醒
喚醒誰?MessageQueue.next中被阻塞的nativePollOnce
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
           

簡單着看下native的epoll (這塊還沒有深入分析,後面篇章補上吧)

nativePollOnce 和 nativeWake 利用 epoll 系統調用, 該系統調用可以監視檔案描述符中的 IO 事件. nativePollOnce 在某個檔案描述符上調用 

epoll_wait

, 而 nativeWake 寫入一個 IO 操作到描述符

epoll屬于IO複用模式調用,調用

epoll_wait

等待. 然後 核心從等待狀态中取出 epoll 等待線程, 并且該線程繼續處理新消息

 removeMessages: 移除消息連結清單中對應的消息

需要注意的是,在該函數的實作中分為了頭部meg的移除,和非頭部的msg的移除。

移除消息連結清單中頭部的和需要移除相同的msg

eg:msg1.what=0;msg2.what=0;msg3.what=0; msg4.what=1; 需要移除what為0的msg,即移除前三個

移除消息連結清單中非頭部的對應的消息,eg:msg1.what=1;msg2.what=0;msg3.what=0; 需要移除what為0的消息,即移除後續的消息,處處展現連結清單的查詢和移除算法

關鍵代碼如下:

void removeMessages(Handler h, int what, Object object) {
  ......
        synchronized (this) {
            Message p = mMessages;

           
            //移除消息連結清單中頭部的和需要移除相同的msg eg:msg1.what=0;msg2.what=0;msg3.what=0; msg4.what=1; 需要移除what為0的msg,即移除前三個
            while (p != null && p.target == h && p.what == what
                   && (object == null || p.obj == object)) {
                Message n = p.next;
                mMessages = n;
                p.recycleUnchecked();
                p = n;
            }

         
            //移除消息連結清單中非頭部的對應的消息,eg:msg1.what=1;msg2.what=0;msg3.what=0; 需要移除what為0的消息,即移除後續的消息,處處展現連結清單的查詢和移除算法
            while (p != null) {
                Message n = p.next;
                if (n != null) {
                    if (n.target == h && n.what == what
                        && (object == null || n.obj == object)) {
                        Message nn = n.next;
                        n.recycleUnchecked();
                        p.next = nn;
                        continue;
                    }
                }
                p = n;
            }
        }
    }
           

postSyncBarrier:發送同步屏障消息 

同步屏障也是一個message,隻不過這個Message的target為null,. 通過ViewRootImpl#scheduleTraversals()發送同步屏障消息

同步屏障消息的插入位置并不是都是消息連結清單的頭部,而是根據when等資訊而定:如果when不為0,消息連結清單也不空,在消息連結清單中找到同步屏障要插入入的位置;如果prev為空,該條同步消息插入到隊列的頭部。

關鍵代碼如下:

/**
     * android.view.ViewRootImpl#scheduleTraversals()發送同步屏障消息
     * @param when
     * @return
     */
    private int postSyncBarrier(long when) {
        // Enqueue a new sync barrier token.
        // We don't need to wake the queue because the purpose of a barrier is to stall it.
        synchronized (this) {
            final int token = mNextBarrierToken++;

            //同步屏障也是一個message,隻不過這個Message的target為null

            final Message msg = Message.obtain();
            msg.markInUse();
            msg.when = when;
            msg.arg1 = token;

            Message prev = null;
            Message p = mMessages;
            if (when != 0) {
                //如果when不為0,消息連結清單也不空,在消息連結清單中找到同步屏障要插入入的位置
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
            }
            if (prev != null) { // invariant: p == prev.next
                msg.next = p;
                prev.next = msg;
            } else {
                //如果prev為空,該條同步消息插入到隊列的頭部
                msg.next = p;
                mMessages = msg;
            }
            return token;
        }
    }
           

dump:MessageQueue資訊

有時候我們需要dump出目前looper的Message資訊來分析一些問題,比不,是否Queue中有很多消息,如果太多就影響隊列中後面的Message的執行,可能造成邏輯處理比較慢,甚至可能導緻ANR等情況,MessageQueue的預設複用池是50個,如果太多排隊的Message也會影響性能。通過dump Message資訊可以幫助分析。

mHandler.getLooper().dump(new PrintWriterPrinter(writer), prefix);

void dump(Printer pw, String prefix, Handler h) {
        synchronized (this) {
            long now = SystemClock.uptimeMillis();
            int n = 0;
            for (Message msg = mMessages; msg != null; msg = msg.next) {
                if (h == null || h == msg.target) {
                    pw.println(prefix + "Message " + n + ": " + msg.toString(now));
                }
                n++;
            }
            pw.println(prefix + "(Total messages: " + n + ", polling=" + isPollingLocked()
                    + ", quitting=" + mQuitting + ")");
        }
    }
           

五、Looper

Looper主要涉及到構造、prepare和loop幾個重要的方法,在保證一個線程有且隻有一個Looper的設計上,采用了ThreadLocal以及代碼邏輯的控制。

變量

//一些重要的變量  
    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

    final MessageQueue mQueue;

    final Thread mThread;
           

構造方法

在構造Looper的時候 建立和Looper一一對應的MessageQueue

private Looper(boolean quitAllowed) {
        //在構造Looper的時候 new一一對應的MessageQueue
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }
           

prepare

我們這裡可以看到消息機制是 如何保證一個線程隻有一個Looper。

//quitAllowed參數是否允許quit,UI線程的Looper不允許退出,其他的允許退出
private static void prepare(boolean quitAllowed) {
    //保證一個線程隻能有一個Looper,這裡的sThreadLocal
    if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
    }
           

loop

我們在MessageQueue的next方法已經分析過nativePollOnce這個方法可能會阻塞,直到拿到message。

如果next傳回一個null的Message退出Looper循環,否則進行msg的派發。

取出的msg執行完之後,會加入到回收池中等待複用。recycleUnchecked我們在Message中也已經分析過了。不清楚的可以再回看。

public static void loop() {
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
    ……
    for (;;) {
            //next方法是一個會阻塞的方法,MessageQueue的next方法前面我們已經分析過nativePollOnce這個方法會可能阻塞,直到拿到message。
            Message msg = queue.next(); 
            //收到為空的msg,Loop循環退出。那麼何時會收到為空的msg呐?quit
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }
            //msg的派發,msg.target就是Handler,即調用Handler的dispatchMessage派發消息
            msg.target.dispatchMessage(msg);

            ……
            //msg回收
            msg.recycleUnchecked();
        }
           

六、HandleThread

HandlerThread是一個帶有Looper的Thread。

全局變量

public class HandlerThread extends Thread {
    int mPriority;//線程優先級
    int mTid = -1;//線程id
    Looper mLooper;
    private Handler mHandler;
    ......
}
           

構造方法

public HandlerThread(String name) {
        super(name);
        //用于run時設定線程的優先級Process.setThreadPriority(mPriority);

        mPriority = Process.THREAD_PRIORITY_DEFAULT;
    }

    public HandlerThread(String name, int priority) {
        super(name);
        mPriority = priority;
    }
           

run方法

進行Looper的prepare和loop的調用,配置好Looper環境

@Override
    public void run() {
        //線程id
        mTid = Process.myTid();
        //調用Looper的prepare方法,把目前該線程關聯的唯一的Looper加入到sThreadLocal中
        Looper.prepare();
        
        synchronized (this) {
            //從sThreadLocal中擷取Looper
            mLooper = Looper.myLooper();
            
            notifyAll();
        }
        //設定線程的優先級,預設THREAD_PRIORITY_DEFAULT,如果是背景業務可以配置為THREAD_PRIORITY_BACKGROUND,根據具體場景進行設定
        Process.setThreadPriority(mPriority);
        //可以做一些預設定的操作
        onLooperPrepared();
        //開始looper循環
        Looper.loop();
        mTid = -1;
    }
           

使用HandlerThread的一般流程如下

// Step 1: 建立并啟動HandlerThread線程,内部包含Looper
HandlerThread handlerThread = new HandlerThread("xxx");
handlerThread.start();

// Step 2: 建立Handler
Handler handler = new Handler(handlerThread.getLooper());

handler.sendMessage(msg);
           

這樣有一個弊端,就是每次使用Handler都要new HandlerThread,而Thread又是比較占用記憶體,

能不能減少Thread的建立,或者說是Thread的複用.

并且實作Message能夠得到及時執行,不被隊列中前面的Message阻塞;

這的确是一個有很有意思很有挑戰的事情。

本文福利, 免費領取C++音視訊學習資料包、技術視訊,内容包括(音視訊開發,面試題,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,編解碼,推拉流,srs)↓↓↓↓↓↓見下面↓↓文章底部點選免費領取↓↓

繼續閱讀