學習設計模式不光要學習設計模式的思想,還要去深入了解,為什麼要用這個設計模式。
如何深入了解?讀優秀的架構代碼,看别人代碼,了解它們的使用場景。 - - - 部落客老師(感謝他)
本文先介紹了觀察者模式的概念及簡單實作。再貼了netty中對觀察者的實作。最後再分享了netty這麼設計的原因。
觀察者模式 - 概念、簡單實作及netty中的觀察者
-
- 一、觀察模式的概念及意圖
- 二、觀察者模式簡單實作
- 三、netty中的觀察者模式
-
- netty觀察者的思考
一、觀察模式的概念及意圖
定義對象間的一種一對多的依賴關系,當一個對象的狀态發生改變時,所有依賴于它的對象都得到通知并被自動更新。
二、觀察者模式簡單實作
實作:定義觀察者和被觀察者兩種角色,被觀察者持有一個觀察者集合,當被觀察者完成某個動作後,周遊觀察者集合,調用update方法,逐個通知到注冊在被觀察者上的觀察者。
Java的util包中定義了Observable類和Observer接口可以幫助我們實作觀察者模式
定義一個被觀察的對象(繼承Observable類)
import java.util.Observable;
public class ObservableData extends Observable {
public void changeNum(int num) {
// Observable的方法
setChanged();
// Observable的方法
notifyObservers(num);
}
}
定義一個觀察者(實作Observer接口)
public class DataObserver implements Observer {
@Override
public void update(Observable o, Object arg) {
if (o instanceof ObservableData) {
System.out.println("觀察到了:" + arg);
}
}
}
測試類
public class Test {
public static void main(String[] args) {
ObservableData observableData = new ObservableData();
DataObserver dataObserver = new DataObserver();
// 可以加好多觀察者,觀察者觀察到資料變化後,可以做不同的事
observableData.addObserver(dataObserver);
observableData.changeNum(1);
}
}
輸出:觀察到了:1
可以看看Observer接口和Observable類的源碼
package java.util;
public interface Observer {
void update(Observable o, Object arg);
}
package java.util;
public class Observable {
private boolean changed = false;
private Vector<Observer> obs;
public Observable() {
obs = new Vector<>();
}
public synchronized void addObserver(Observer o) {
if (o == null)
throw new NullPointerException();
if (!obs.contains(o)) {
obs.addElement(o);
}
}
public synchronized void deleteObserver(Observer o) {
obs.removeElement(o);
}
public void notifyObservers() {
notifyObservers(null);
}
public void notifyObservers(Object arg) {
Object[] arrLocal;
synchronized (this) {
if (!changed)
return;
arrLocal = obs.toArray();
clearChanged();
}
for (int i = arrLocal.length-1; i>=0; i--)
((Observer)arrLocal[i]).update(this, arg);
}
public synchronized void deleteObservers() {
obs.removeAllElements();
}
protected synchronized void setChanged() {
changed = true;
}
protected synchronized void clearChanged() {
changed = false;
}
public synchronized boolean hasChanged() {
return changed;
}
public synchronized int countObservers() {
return obs.size();
}
}
我們可以看到Obserable的實作非常簡單,持有一個觀察者集合,在notifyObservers的時候,周遊集合,調用update方法。另外,Obserable類在操作觀察者集合的時候,都加了鎖,保證了線程安全。
三、netty中的觀察者模式
netty中的觀察者:futrue的listener
Future最早源于jdk中的java.util.concurrent.Future
DefaultPromise 是netty中異步channel i/o操作的結果,提供了操作結果相關的一系列方法。繼承Future,又對Futrue做了些擴充(Promise接口)。這個類非常重要,Futrue中的listener就是通過它實作的。
類結構:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL90zdOdXR650dFRVZvhmMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwIzN1ETNxQTM5EzNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
源碼:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private Object listeners;
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
}
DefaultPromise就是我們的被觀察者,持有了觀察者的集合。觀察者的集合就是這裡的listeners, 還實作了addListener、removeListener等方法。
至于這裡的listenners為什麼是Object,我們可以看下addListener0方法
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
再看DefaultFutureListeners,其實持有了一個listener數組(其實我沒搞懂他為什麼要自己實作,直接用List不香嗎?有大佬知道的,能否告知)
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize;
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
listeners[size] = l;
this.size = size + 1;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
再回到DefaultPromise, 他在setSuccess成功之後,調用了notifyListeners();
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
}
暫時忽略netty裡的線程模型,這裡不管是不是目前線程發起的,notifyListenersNow();
繼續看notifyListenersNow()
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
又調用了 notifyListener0, 在notifyListener0中,周遊了觀察者清單,最終調用了operationComplete(future)方法。通知觀察者,操作完成了。
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
觀察者接口
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
void operationComplete(F future) throws Exception;
}
netty中可以通過監聽器的方式擷取I/O操作結果,可以同時添加一個或多個。在I/O操作完成,會回調operationComplete方法,并把future作為入參傳進去。
就此,我們了解了觀察者模式,也看到了netty中觀察者模式的實作。
netty觀察者的思考
對于netty中的這個場景,觀察者模式被用在了I/O結果監聽上。首先netty是基于NIO,它的結構是異步的,I/O操作的完成時間無法預測。Future的get會直接阻塞,而逾時時間又不能精準的設定。異步通知回調operationComplete是非常棒的解決方案。
其實Android中,對一個按鈕綁定監聽時間,也是觀察者的思想
OnClickListener是觀察者,mBtn是被觀察者。
mBtn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
}
});
當對象發生改變,要通知其他對象的時候,就可以使用觀察者。
[1] https://www.runoob.com/design-pattern/observer-pattern.html
[2] 李林鋒·netty權威指南 中國工信出版社