摘要: 消息訂閱與分發機制在系統開發中有非常多的使用場景,有程序内的實作(觀察者模式實作,Event, Listener),也系統間的實作(例如 各種 message queue)。Guava提供了一個程序内非常輕量級的實作 EventBus,可以很好的實作子產品之間的解耦。并且提供了同步和異步的實作版本。
EventBus 看到這個名字,相信你已經大概明白是個什麼東東了。
Event就是事件,Bus 這裡不是巴士汽車的意思,在計算機領域一般翻譯為總線。那麼EventBus就是裝載Event,然後做分發的。若做過java swing開發,肯定記得XXEvent,以及XXEventListener接口,都會有一個方法onEvent(XXEvent event)。沒錯EventBus幹的就是相同的事情,但借着采用jdk1.5引入的注解,使得開發消息釋出與訂閱系統非常簡潔友善,無需實作接口或繼承基類。
關于消息釋出與訂閱系統的應用場景,其實在服務端系統開發中也經常用到,遊戲中的任務系統或者一些電商系統的活動系統中經常會用到。例如,遊戲中收集幾個道具就完成某個任務,擷取獎勵。或者很多平台的簽到系統,累計簽到多少天獲得獎勵。這裡就使用Guava中的EventBus開發一個簽到系統。
public class SignInEvent {
// 簽到天數
private int count;
public SignInEvent(int count) {
this.count = count;
}
public int getCount() {
return count;
}
}
public class SignInProcessor {
@Subscribe
public void signIn(SignInEvent event) {
int count = event.getCount();
// TODO 根據簽到的天數發放獎勵
System.out.println("簽到" + count + "天");
}
}
public class AppTest {
@Test
public void eventBusTest() {
EventBus signInEventBus = new EventBus("SignInEventBus");
SignInProcessor processor = new SignInProcessor();
signInEventBus.register(processor);
signInEventBus.post(new SignInEvent(2));
}
}
事件處理類SignInProcessor并不需要實作某個接口,隻需要在需要處理的方法上加上@Subscribe注解,這裡也并沒有限制SignInProcessor隻能處理SignInEvent,要在SignInProcessor添加其他事件的處理邏輯也隻需要添加一個方法,添加注解@Subscribe,第一個參數傳入需要處理的事件執行個體。
public class SignInProcessor {
@Subscribe
public void signIn(SignInEvent event) {
int count = event.getCount();
// TODO 根據簽到的天數發放獎勵
System.out.println("簽到" + count + "天");
}
@Subscribe
public void logout(LogoutEvent event) {
// 擷取登出的時間
Date date = event.getTime();
// TODO
}
}
這裡先說下,EventBus是如何确定一個類中的某個方法來處理相應的事件的呢,Guava中提供了一個接口
interface SubscriberFindingStrategy {
Multimap<Class<?>, EventSubscriber> findAllSubscribers(Object source);
}
并提供了一個機遇注解的實作
class AnnotatedSubscriberFinder implements SubscriberFindingStrategy
findAllSubscribers的實作邏輯
@Override
public Multimap<Class<?>, EventSubscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, EventSubscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
EventSubscriber subscriber = makeSubscriber(listener, method);
methodsInListener.put(eventType, subscriber);
}
return methodsInListener;
}
1. 擷取listener對象類,以及父類中所有被@Subscriber注解的方法,而且這個方法有且隻有一個參數
2. 擷取這些方法的第一個參數的類型
3. 建立EventSubscriber,儲存listener執行個體,以及對應的Method執行個體,友善以後反射調用方法處理事件
講完如何查找處理方法後,再看下具體的事件對象是如何釋出出去的呢?這裡的具體邏輯就在EventBus中的post方法中
public void post(Object event) {
// 擷取事件對象的所有父類以及父類實作的接口
// 擷取父類的目的為的是可以把event釋出給某些接收父類Event的處理方法
Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());
boolean dispatched = false;
for (Class<?> eventType : dispatchTypes) {
// subscribersByType 是一個非線程安全的集合,是以在操作的時候需要添加鎖
subscribersByTypeLock.readLock().lock();
try {
Set<EventSubscriber> wrappers = subscribersByType.get(eventType);
if (!wrappers.isEmpty()) {
dispatched = true;
for (EventSubscriber wrapper : wrappers) {
// 放入目前線程對應的Queue中,這裡使用到ThreadLocal變量
enqueueEvent(event, wrapper);
}
}
} finally {
subscribersByTypeLock.readLock().unlock();
}
}
// 若未能找到對應Event的處理器而且目前事件的類型不是DeadEvent就把傳入的事件包裝成DeadEvent
if (!dispatched && !(event instanceof DeadEvent)) {
post(new DeadEvent(this, event));
}
// 從目前線程的對應的隊列中事件處理器和事件,并處理事件
dispatchQueuedEvents();
}
從EventBus中的post方法處理邏輯來看,事件的分發和處理是在同一個線程中同步處理的。
但是很多時候事件的處理邏輯比較複雜耗時,需要将事件的分發和處理異步。事件的處理不阻塞分發的主線程。
Guava提供了AsyncEventBus,就是将分發和處理異步化。AsyncEventBus的實作并不複雜。
AsyncEventBus 繼承自 EventBus, 将EventBus中的存放待分發的事件隊列eventsToDispatch從ThreadLocal<Queue<EventWithSubscriber>>換成了ConcurrentLinkedQueue<EventWithSubscriber> 支援多個線程并發通路擷取事件處理,AsyncEventBus的構造函數需要傳入一個Executor,可以根據實際需要傳入定制的線程池。
某些場景下,在事件處理類的執行個體中需要儲存事件相關狀态,多線程并發通路的時候可能出現問題。Guava提供了注解@AllowConcurrentEvents,它的用途标記多個線程能否同時調用同一個事件處理器的處理方法來處理相依的事件。
具體處理邏輯在AnnotatedSubscriberFinder類中的
private static EventSubscriber makeSubscriber(Object listener, Method method) {
EventSubscriber wrapper;
// 這裡判斷事件處理方法是否有被@AllowConcurrentEvents注解
if (methodIsDeclaredThreadSafe(method)) {
wrapper = new EventSubscriber(listener, method);
} else {
// 若沒有被@AllowConcurrentEvents注解,多個線程在處理的時候就需要同步調用該處理器來處理事件
wrapper = new SynchronizedEventSubscriber(listener, method);
}
return wrapper;
}
好了,Guava中的EventBus相關使用及實作基本講完了。其實并不複雜。需要你對下面的相關類和處理機制比較熟悉
1. ThreadLocal
2. ConcurrentLinkedQueue
3. 反射擷取類的父類和接口,這裡使用Guava中的TypeToken封裝類,已經反射方法調用
4. ReentrantReadWriteLock
5. Cache,Guava對常用的緩存做了一些封裝,下一篇将會講到
版權聲明:本文為CSDN部落客「weixin_34326179」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_34326179/article/details/92083966