天天看點

從mina中學習逾時程式編寫

從mina中學習逾時程式編寫

在很多情況下,程式需要使用計時器定,在指定的時間内檢查連接配接過期。例如,要實作一個mqtt服務,為了保證QOS,在服務端發送消息後,需要等待用戶端的ack,確定用戶端接收到消息,當服務端等待一段時間後,仍未獲得用戶端ack,就會将消息重新傳送。在Mina中,每個連結都可以設定read ideal 和write ideal 時間,當連結read ideal或者write ideal逾時後,可以觸發使用者自定義的一個動作(比如關閉連結等),本文主要是探讨mina中如何實作這種逾時政策,也給大家帶來參考(IndexIdleChecker.java)。

1.    主要的資料結構

在IndexIdleChecker中,有如下屬性定義:

    private final Set<AbstractIoSession>[] readIdleSessionIndex = new Set[MAX_IDLE_TIME_IN_SEC];

    private final Set<AbstractIoSession>[] writeIdleSessionIndex = new Set[MAX_IDLE_TIME_IN_SEC];

       分别用于記錄readIdle和writeIdle,以下僅以readIdle為例,因為writeIdle與readIdle流程基本一樣。

       注意到readIdleSessionIndex的類型是數組,并且數組中的每個元素是Set類型,Set中的每一個元素都是Session(AbstractIoSession的簡寫)類型,readIdle數組實際大小為3600,mina預設的session最大過期時間為一小時,是以,數組的每一個元素記錄了這一秒内将要過期的session集合(注意,算法使用的是循環數組,因為預設最多一個小時就過期)。

IndexIdleChecker還有一個屬性:

   private long lastCheckTimeMs = System.currentTimeMillis();

用于記錄上次處理過期請求的時間。

每個session對象中都有一個property,用于記錄這個session目前在readIdle數組中的位置(READ_IDLE_INDEX)。

2.    Read(或者write)請求ideal處理政策

Read事件到來的方法簽名如下:

    public void sessionRead(AbstractIoSession session, long timeInMs) ;

先通過session的READ_IDLE_INDEX 屬性,擷取session在readIdeal中的位置(oldIndex),之後删除readIdeal中對應oldIndex的set中的目前session,然後計算目前session下次的過期時間,将過期時間%3600後,求出其對應的readIdeal數組下标(index),将session放入readIdeal的index位置的set中,并且設定session的READ_IDLE_INDEX 的屬性值為index。

3.    Worker線程

在運作時Mina會啟動一個Worker守護線程,代碼如下:

       @Override

        public void run() {

            while (running) {

                try {

                    sleep(GRANULARITY_IN_MS);

                    processIdleSession(System.currentTimeMillis());

                } catch (InterruptedException e) {

                    break;

                }

            }

       }

Worker程序在每次處理完session過期後,都會sleep 1s,然後進行下一次的過期處理,在processIdleSession方法中,僅需每次處理

        int startIdx = ((int) (Math.max(lastCheckTimeMs, timeMs - MAX_IDLE_TIME_IN_MS + 1) / 1000L))

                % MAX_IDLE_TIME_IN_SEC;

        int endIdx = ((int) (timeMs / 1000L)) % MAX_IDLE_TIME_IN_SEC;

startIdx和endIdx之間的readIdeal數組即可。針對每個idx,拿出set,将set中的所有session的READ_IDLE_INDEX 置為null,并調用session中的使用者處理過程即可(比如關閉session)。(注意,如果使用者設定過期時間為-1,表示永遠不過期,此時不做任何處理)。需要注意的是,worker線程每次至少會處理一個index,有時會處理多個index,比如,目前這次處理index的時間超過了3s,之後worker又sleep 1s,那麼下次worker被喚醒,将處理這4s内的4個index。

小結:

在大多數情況下,連結線程比較活躍的情況下,session的存儲位置會在數組中不斷的向後移動(因為是循環數組,是以沒有邊界),是以當worker要處理啟動處理過期時,活躍的session的index一定會在過期處理的index之前,是以,僅有那些不活躍的到期的session才會被查詢和處理掉,worker的處理代價并不高。

使用這種方式不需要計時器等支援,實作簡單,是一種比較好的逾時處理方式。

參考:

       <dependency>

           <groupId>org.apache.mina</groupId>

           <artifactId>mina-core</artifactId>

           <version>3.0.0-M2</version>

       </dependency>

源碼:

public class IndexedIdleChecker implements IdleChecker {

    private static final int MAX_IDLE_TIME_IN_SEC = 60 * 60;

    private static final long MAX_IDLE_TIME_IN_MS = MAX_IDLE_TIME_IN_SEC * 1000L;

    private static final Logger LOG = LoggerFactory.getLogger(IndexedIdleChecker.class);

    // Aspeedup for logs

    private static final boolean IS_DEBUG = LOG.isDebugEnabled();

    private static final AttributeKey<Integer> READ_IDLE_INDEX = AttributeKey.createKey(Integer.class,

            "idle.read.index");

    private static final AttributeKey<Integer> WRITE_IDLE_INDEX = AttributeKey.createKey(Integer.class,

            "idle.write.index");

    private long lastCheckTimeMs = System.currentTimeMillis();

    @SuppressWarnings("unchecked")

    private final Set<AbstractIoSession>[] readIdleSessionIndex = new Set[MAX_IDLE_TIME_IN_SEC];

    @SuppressWarnings("unchecked")

    private final Set<AbstractIoSession>[] writeIdleSessionIndex = new Set[MAX_IDLE_TIME_IN_SEC];

    private static final int GRANULARITY_IN_MS = 1000;

    private final Worker worker = new Worker();

    private volatile boolean running = true;

    @Override

    public void start() {

        worker.start();

    }

    @Override

    public void destroy() {

        running = false;

        try {

            // interrupt the sleep

            worker.interrupt();

            // wait for worker to stop

            worker.join();

        } catch (InterruptedException e) {

            // interrupted, we don't care much

        }

    }

    @Override

    public void sessionRead(AbstractIoSession session, long timeInMs) {

        if (IS_DEBUG) {

            LOG.debug("session read event, compute idle index of session {}", session);

        }

        // remove from the old index position

        Integer oldIndex =session.getAttribute(READ_IDLE_INDEX);

        if (oldIndex != null && readIdleSessionIndex[oldIndex] != null) {

            if (IS_DEBUG) {

                LOG.debug("remove for old index {}", oldIndex);

            }

            readIdleSessionIndex[oldIndex].remove(session);

        }

        long idleTimeInMs = session.getConfig().getIdleTimeInMillis(IdleStatus.READ_IDLE);

        // is idle enabled ?

        if (idleTimeInMs <= 0L) {

            if (IS_DEBUG) {

                LOG.debug("no read idle configuration");

            }

        } else {

            int nextIdleTimeInSeconds = (int) ((timeInMs +idleTimeInMs) / 1000L);

            int index = nextIdleTimeInSeconds % MAX_IDLE_TIME_IN_SEC;

            if (IS_DEBUG) {

                LOG.debug("computed index : {}", index);

            }

            if (readIdleSessionIndex[index] == null) {

                readIdleSessionIndex[index] =Collections

                        .newSetFromMap(newConcurrentHashMap<AbstractIoSession, Boolean>());

            }

            if (IS_DEBUG) {

                LOG.debug("marking session {} idle for index {}", session, index);

            }

            readIdleSessionIndex[index].add(session);

            session.setAttribute(READ_IDLE_INDEX, index);

        }

    }

    @Override

    public void sessionWritten(AbstractIoSession session, long timeInMs) {

        if (IS_DEBUG) {

            LOG.debug("session write event, compute idle index of session {}", session);

        }

        // remove from the old index position

        Integer oldIndex =session.getAttribute(WRITE_IDLE_INDEX);

        if (oldIndex != null && writeIdleSessionIndex[oldIndex] != null) {

            if (IS_DEBUG) {

                LOG.debug("remove for old index {}", oldIndex);

            }

            writeIdleSessionIndex[oldIndex].remove(session);

        }

        long idleTimeInMs = session.getConfig().getIdleTimeInMillis(IdleStatus.WRITE_IDLE);

        // is idle enabled ?

        if (idleTimeInMs <= 0L) {

            if (IS_DEBUG) {

                LOG.debug("no write idle configuration");

            }

        } else {

            int nextIdleTimeInSeconds = (int) ((timeInMs +idleTimeInMs) / 1000L);

            int index = nextIdleTimeInSeconds % MAX_IDLE_TIME_IN_SEC;

            if (writeIdleSessionIndex[index] == null) {

                writeIdleSessionIndex[index] =Collections

                        .newSetFromMap(newConcurrentHashMap<AbstractIoSession, Boolean>());

            }

            writeIdleSessionIndex[index].add(session);

            session.setAttribute(WRITE_IDLE_INDEX, index);

        }

    }

    @Override

    public int processIdleSession(long timeMs) {

        int counter = 0;

        long delta = timeMs - lastCheckTimeMs;

        if (LOG.isDebugEnabled()) {

            LOG.debug("checking idle time, last = {}, now = {}, delta = {}", new Object[] { lastCheckTimeMs, timeMs,

                    delta });

        }

        if (delta < 1000) {

            LOG.debug("not a second between the last checks, abort");

            return 0;

        }

        // if (lastCheckTimeMs == 0) {

        // LOG.debug("first check, we start now");

        // lastCheckTimeMs = System.currentTimeMillis() - 1000;

        // }

        int startIdx = ((int) (Math.max(lastCheckTimeMs, timeMs - MAX_IDLE_TIME_IN_MS + 1) / 1000L))

                % MAX_IDLE_TIME_IN_SEC;

        int endIdx = ((int) (timeMs / 1000L)) % MAX_IDLE_TIME_IN_SEC;

        LOG.debug("scaning from index {} to index {}", startIdx, endIdx);

        int index = startIdx;

        do {

            LOG.trace("scanningindex {}", index);

            // look at the read idle index

            counter += processIndex(readIdleSessionIndex, index, IdleStatus.READ_IDLE);

            counter += processIndex(writeIdleSessionIndex, index, IdleStatus.WRITE_IDLE);

            index = (index + 1) % MAX_IDLE_TIME_IN_SEC;

        } while (index != endIdx);

        // save last check time for next call

        lastCheckTimeMs = timeMs;

        LOG.debug("detected {} idleing sessions", counter);

        return counter;

    }

    private int processIndex(Set<AbstractIoSession>[]indexByTime, int position, IdleStatus status) {

        Set<AbstractIoSession> sessions =indexByTime[position];

        if (sessions == null) {

            return 0;

        }

        int counter = 0;

        for (AbstractIoSession idleSession : sessions) {

            idleSession.setAttribute(status ==IdleStatus.READ_IDLE ? READ_IDLE_INDEX : WRITE_IDLE_INDEX, null);

            // check if idle detection wasn't disabled since the index update

            if (idleSession.getConfig().getIdleTimeInMillis(status)> 0) {

               idleSession.processSessionIdle(status);

            }

            counter++;

        }

        // clear the processed index entry

        indexByTime[position] = null;

        return counter;

    }

    private class Worker extends Thread {

        public Worker() {

            super("IdleChecker");

            setDaemon(true);

        }

        @Override

        public void run() {

            while (running) {

                try {

                    sleep(GRANULARITY_IN_MS);

                    processIdleSession(System.currentTimeMillis());

                } catch (InterruptedException e) {

                    break;

                }

            }

        }

    }

}