天天看點

設計模式:觀察者模式 - 概念、簡單實作及netty中的觀察者

學習設計模式不光要學習設計模式的思想,還要去深入了解,為什麼要用這個設計模式。

如何深入了解?讀優秀的架構代碼,看别人代碼,了解它們的使用場景。 - - - 部落客老師(感謝他)

本文先介紹了觀察者模式的概念及簡單實作。再貼了netty中對觀察者的實作。最後再分享了netty這麼設計的原因。

觀察者模式 - 概念、簡單實作及netty中的觀察者

    • 一、觀察模式的概念及意圖
    • 二、觀察者模式簡單實作
    • 三、netty中的觀察者模式
      • netty觀察者的思考

一、觀察模式的概念及意圖

定義對象間的一種一對多的依賴關系,當一個對象的狀态發生改變時,所有依賴于它的對象都得到通知并被自動更新。

二、觀察者模式簡單實作

實作:定義觀察者和被觀察者兩種角色,被觀察者持有一個觀察者集合,當被觀察者完成某個動作後,周遊觀察者集合,調用update方法,逐個通知到注冊在被觀察者上的觀察者。

Java的util包中定義了Observable類和Observer接口可以幫助我們實作觀察者模式

定義一個被觀察的對象(繼承Observable類)

import java.util.Observable;
public class ObservableData extends Observable {
    
    public void changeNum(int num) {
        // Observable的方法
        setChanged();
        // Observable的方法
        notifyObservers(num);
    }

}
           

定義一個觀察者(實作Observer接口)

public class DataObserver implements Observer {

    @Override
    public void update(Observable o, Object arg) {
        if (o instanceof ObservableData) {
            System.out.println("觀察到了:" + arg);
        }
    }
} 
           

測試類

public class Test {

    public static void main(String[] args) {
        ObservableData observableData = new ObservableData();
        DataObserver dataObserver = new DataObserver();
        // 可以加好多觀察者,觀察者觀察到資料變化後,可以做不同的事
        observableData.addObserver(dataObserver);
        observableData.changeNum(1);
    }
}
           

輸出:觀察到了:1

可以看看Observer接口和Observable類的源碼

package java.util;
public interface Observer {
    void update(Observable o, Object arg);
}

           
package java.util;

public class Observable {
    private boolean changed = false;
    private Vector<Observer> obs;

    public Observable() {
        obs = new Vector<>();
    }

    public synchronized void addObserver(Observer o) {
        if (o == null)
            throw new NullPointerException();
        if (!obs.contains(o)) {
            obs.addElement(o);
        }
    }
    public synchronized void deleteObserver(Observer o) {
        obs.removeElement(o);
    }
    public void notifyObservers() {
        notifyObservers(null);
    }
    public void notifyObservers(Object arg) {
        Object[] arrLocal;

        synchronized (this) {
            if (!changed)
                return;
            arrLocal = obs.toArray();
            clearChanged();
        }

        for (int i = arrLocal.length-1; i>=0; i--)
            ((Observer)arrLocal[i]).update(this, arg);
    }
    public synchronized void deleteObservers() {
        obs.removeAllElements();
    }
    protected synchronized void setChanged() {
        changed = true;
    }
    protected synchronized void clearChanged() {
        changed = false;
    }
    public synchronized boolean hasChanged() {
        return changed;
    }
    public synchronized int countObservers() {
        return obs.size();
    }
}

           

我們可以看到Obserable的實作非常簡單,持有一個觀察者集合,在notifyObservers的時候,周遊集合,調用update方法。另外,Obserable類在操作觀察者集合的時候,都加了鎖,保證了線程安全。

三、netty中的觀察者模式

netty中的觀察者:futrue的listener

Future最早源于jdk中的java.util.concurrent.Future

DefaultPromise 是netty中異步channel i/o操作的結果,提供了操作結果相關的一系列方法。繼承Future,又對Futrue做了些擴充(Promise接口)。這個類非常重要,Futrue中的listener就是通過它實作的。

類結構:

設計模式:觀察者模式 - 概念、簡單實作及netty中的觀察者

源碼:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {


    private Object listeners;  
    
    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            addListener0(listener);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }
}
           

DefaultPromise就是我們的被觀察者,持有了觀察者的集合。觀察者的集合就是這裡的listeners, 還實作了addListener、removeListener等方法。

至于這裡的listenners為什麼是Object,我們可以看下addListener0方法

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
}
           

再看DefaultFutureListeners,其實持有了一個listener數組(其實我沒搞懂他為什麼要自己實作,直接用List不香嗎?有大佬知道的,能否告知)

final class DefaultFutureListeners {

    private GenericFutureListener<? extends Future<?>>[] listeners;
    private int size;
    private int progressiveSize; 
    
    public void add(GenericFutureListener<? extends Future<?>> l) {
        GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        final int size = this.size;
        if (size == listeners.length) {
            this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
        }
        listeners[size] = l;
        this.size = size + 1;

        if (l instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }
           

再回到DefaultPromise, 他在setSuccess成功之後,調用了notifyListeners();

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    @Override
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }  
}  
           

暫時忽略netty裡的線程模型,這裡不管是不是目前線程發起的,notifyListenersNow();

繼續看notifyListenersNow()

private void notifyListenersNow() {
        Object listeners;
        synchronized (this) {
            // Only proceed if there are listeners to notify and we are not already notifying listeners.
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            listeners = this.listeners;
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) {
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                notifyListener0(this, (GenericFutureListener<?>) listeners);
            }
            synchronized (this) {
                if (this.listeners == null) {
                    // Nothing can throw from within this method, so setting notifyingListeners back to false does not
                    // need to be in a finally block.
                    notifyingListeners = false;
                    return;
                }
                listeners = this.listeners;
                this.listeners = null;
            }
        }
    }
           

又調用了 notifyListener0, 在notifyListener0中,周遊了觀察者清單,最終調用了operationComplete(future)方法。通知觀察者,操作完成了。

private void notifyListeners0(DefaultFutureListeners listeners) {
        GenericFutureListener<?>[] a = listeners.listeners();
        int size = listeners.size();
        for (int i = 0; i < size; i ++) {
            notifyListener0(this, a[i]);
        }
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
           

觀察者接口

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    void operationComplete(F future) throws Exception;
}
           

netty中可以通過監聽器的方式擷取I/O操作結果,可以同時添加一個或多個。在I/O操作完成,會回調operationComplete方法,并把future作為入參傳進去。

就此,我們了解了觀察者模式,也看到了netty中觀察者模式的實作。

netty觀察者的思考

對于netty中的這個場景,觀察者模式被用在了I/O結果監聽上。首先netty是基于NIO,它的結構是異步的,I/O操作的完成時間無法預測。Future的get會直接阻塞,而逾時時間又不能精準的設定。異步通知回調operationComplete是非常棒的解決方案。

其實Android中,對一個按鈕綁定監聽時間,也是觀察者的思想

OnClickListener是觀察者,mBtn是被觀察者。

mBtn.setOnClickListener(new View.OnClickListener() {
      @Override
      public void onClick(View v) {
      }
});
           

當對象發生改變,要通知其他對象的時候,就可以使用觀察者。

[1] https://www.runoob.com/design-pattern/observer-pattern.html

[2] 李林鋒·netty權威指南 中國工信出版社