天天看點

EventBus源碼分析(四):線程模型分析(2.4版本)

EventBus源碼分析(一):入口函數提綱挈領(2.4版本)

EventBus源碼分析(二):register方法儲存事件的訂閱者清單(2.4版本)

EventBus源碼分析(三):post方法釋出事件【擷取事件的所有訂閱者,反射調用訂閱者事件處理方法】(2.4版本)

EventBus源碼分析(四):線程模型分析(2.4版本)

EventBus源碼解讀詳細注釋(1)register的幕後黑手

EventBus源碼解讀詳細注釋(2)MainThread線程模型分析

EventBus源碼解讀詳細注釋(3)PostThread、MainThread、BackgroundThread、Async四種線程模式的差別

EventBus源碼解讀詳細注釋(4)register時重新整理的兩個map

EventBus源碼解讀詳細注釋(5)事件消息繼承性分析 eventInheritance含義

EventBus源碼解讀詳細注釋(6)從事件釋出到事件處理,究竟發生了什麼!

EventBus有四種線程模型

  1. PostThread模式不需線程切換,直接在釋出者線程進行事件處理。
  2. MainThread模式分類讨論:釋出者線程是主線程則直接調用事件處理方法,否則通過Handler進行線程切換,切換到主線程處理事件,該模式下事件是串行執行的。
  3. BackgroundThread模式分類讨論:釋出者線程不是主線程則在釋出者線程直接處理事件,否則線程切換至線程池處理,所有該線程模式下的事件會線上程池中用一個線程排隊串行處理(直到隊列裡邊的事件處理完之後又有新的事件釋出出來才會向線程池擷取一個新的線程)。
  4. Async模式不關心釋出者線程直接線上程池中開辟一個新的線程處理事件,和BackgroundThread模式不同的是,該線程模式的每個事件都是線上程池中開辟一個線程處理,事件之間并發處理,并不排隊。

PendingPostQueue待處理事件隊列

在分析線程模型之前,先看下各種線程模型都用到的資料結構PendingPostQueue,PendingPostQueue顧名思義是一個隊列,對待處理事件進行了排隊。

final class PendingPostQueue {
    private PendingPost head;
    private PendingPost tail;
    ...
           

PendingPostQueue維護了一個隊列連結清單,隊列的資料是PendingPost,PendingPost随後分析。

既然PendingPostQueue是一個隊列,那麼肯定提供了出隊列和入隊列的操作。

synchronized void enqueue(PendingPost pendingPost) {
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        if (tail != null) {
            tail.next = pendingPost;
            tail = pendingPost;
        } else if (head == null) {
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        notifyAll();
    }
           

可見入隊列操作就是通過隊尾指針操作将新資料插入隊列尾部。如果隊尾tail不為null,說明隊列不空,将新資料作為新的隊尾,修改隊尾指針,如果隊尾tail和隊首head都是null,說明隊列為空,将新資料同時設為隊首和隊尾。

最後通過notifyAll()釋放對象鎖,那麼可以猜測如果隊列為空出隊列操作時會等待這把鎖。

synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }

    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {
            wait(maxMillisToWait);
        }
        return poll();
    }
           

出隊列操作就直接擷取隊列頭head即可,隊列為空時擷取的head是null。此時如果調用的是poll(int maxMillisToWait)則會等待maxMillisToWait時長的鎖,逾時還沒有擷取鎖,也就是讀一個空隊列逾時了還沒有入隊列操作,則調用poll(),這時會傳回null。

PendingPost:不變模式和對象池模式

PendingPostQueue維護了一個PendingPost類型資料的隊列,那麼PendingPost是什麼呢?PendingPost是個對象池,通過靜态ArrayList實作。同時PendingPost是一個最終類,被final修飾,語義角度意味着不可被繼承,線程安全方面則表示該類絕對線程安全。

final class PendingPost {
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

    Object event;
    Subscription subscription;
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > ) {
                PendingPost pendingPost = pendingPostPool.remove(size - );
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }

    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < ) {
                pendingPostPool.add(pendingPost);
            }
        }
    }
}
           

既然是個對象池模式,那麼必然提供了向對象池擷取對象的方法和對象使用結束向對象池歸還對象的方法。

PendingPost類通過将構造器設定為私有達到隻能通過對象池擷取PendingPost對象的目的。

obtainPendingPost從對象池擷取對象,對象池中有儲存的對象則從池子中擷取對象(ArrayList尾部擷取),沒有儲存線程的對象的話就通過new建立。

releasePendingPost則将使用後的對象歸還給對象池,歸還的時候要将對象的使用痕迹擦除,同時要限制對象池大小為10000,防止對象池無限增大。

MainThread線程模式:HandlerPoster

MainThread線程模式模式由HandlerPoster實作,通過Handler的線程切換功能做到在主線程中處理事件。從名字就知道HandlerPoster也是一個Handler,是Handler的子類。

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;
    ...
           

queue是待處理事件隊列

maxMillisInsideHandleMessage表示如果事件在maxMillisInsideHandleMessage時間内都沒有處理完成的話就需要重新排程(重新處理rescheduled = true)。

void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }
           

enqueue從PendingPost的對象池擷取一個PendingPost對象,設定subscription和event屬性後就加入PendingPostQueue隊列,如果主線程沒有事件要處理,就發送一條空消息,激活handleMessage方法,在handleMessage方法裡邊對PendingPostQueue隊列裡邊的所有事件反射調用事件處理方法。

@Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
           

可知在handleMessage中時通過eventBus.invokeSubscriber(pendingPost)處理事件

void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }
    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }
           

invokeSubscriber方法取出PendingPost對象的event和subscription後先将PendingPost對象歸還給線程池,然後反射調用事件處理方法。

MainThread線程模式總結:通過Handler從工作線程切換到主線程,在主線程中通過反射調用事件處理方法。

BackgroundThread線程模式:BackgroundPoster

BackgroundPoster繼承自Runnable,某一時段内BackgroundThread模式的事件都會在BackgroundPoster的run方法中排隊處理,也就是說該時段内的所有事件是在一個線程中排隊後串行執行的(隊列中的事件處理完之後又有新的事件釋出才會新開線程)。

public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }
           

可以看到如果BackgroundPoster這個Runnable正在被線程池執行,這時候executorRunning==true,那麼在executorRunning==true情況下釋出的事件隻會進隊列,不會再次調用線程池的execute方法。這樣的話在BackgroundPoster覆寫的run方法中一定是通過死循環周遊處理隊列中的事件,全部處理後才退出死循環,設定executorRunning=fasle,此後再釋出事件才會線上程池中開辟一個新線程。

@Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll();
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }
           

Async線程模式:AsyncPoster

AsyncPoster同樣也是一個Runnable,與Backgroundposter不同的是AsyncPoster并發度更高,不是在一個線程中對隊列中的事件進行串行處理,而是每一個新添加的任務都會線上程池中開辟一個新線程執行。

public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
           

繼續閱讀