從mina中學習逾時程式編寫
在很多情況下,程式需要使用計時器定,在指定的時間内檢查連接配接過期。例如,要實作一個mqtt服務,為了保證QOS,在服務端發送消息後,需要等待用戶端的ack,確定用戶端接收到消息,當服務端等待一段時間後,仍未獲得用戶端ack,就會将消息重新傳送。在Mina中,每個連結都可以設定read ideal 和write ideal 時間,當連結read ideal或者write ideal逾時後,可以觸發使用者自定義的一個動作(比如關閉連結等),本文主要是探讨mina中如何實作這種逾時政策,也給大家帶來參考(IndexIdleChecker.java)。
1. 主要的資料結構
在IndexIdleChecker中,有如下屬性定義:
private final Set<AbstractIoSession>[] readIdleSessionIndex = new Set[MAX_IDLE_TIME_IN_SEC];
private final Set<AbstractIoSession>[] writeIdleSessionIndex = new Set[MAX_IDLE_TIME_IN_SEC];
分别用于記錄readIdle和writeIdle,以下僅以readIdle為例,因為writeIdle與readIdle流程基本一樣。
注意到readIdleSessionIndex的類型是數組,并且數組中的每個元素是Set類型,Set中的每一個元素都是Session(AbstractIoSession的簡寫)類型,readIdle數組實際大小為3600,mina預設的session最大過期時間為一小時,是以,數組的每一個元素記錄了這一秒内将要過期的session集合(注意,算法使用的是循環數組,因為預設最多一個小時就過期)。
IndexIdleChecker還有一個屬性:
private long lastCheckTimeMs = System.currentTimeMillis();
用于記錄上次處理過期請求的時間。
每個session對象中都有一個property,用于記錄這個session目前在readIdle數組中的位置(READ_IDLE_INDEX)。
2. Read(或者write)請求ideal處理政策
Read事件到來的方法簽名如下:
public void sessionRead(AbstractIoSession session, long timeInMs) ;
先通過session的READ_IDLE_INDEX 屬性,擷取session在readIdeal中的位置(oldIndex),之後删除readIdeal中對應oldIndex的set中的目前session,然後計算目前session下次的過期時間,将過期時間%3600後,求出其對應的readIdeal數組下标(index),将session放入readIdeal的index位置的set中,并且設定session的READ_IDLE_INDEX 的屬性值為index。
3. Worker線程
在運作時Mina會啟動一個Worker守護線程,代碼如下:
@Override
public void run() {
while (running) {
try {
sleep(GRANULARITY_IN_MS);
processIdleSession(System.currentTimeMillis());
} catch (InterruptedException e) {
break;
}
}
}
Worker程序在每次處理完session過期後,都會sleep 1s,然後進行下一次的過期處理,在processIdleSession方法中,僅需每次處理
int startIdx = ((int) (Math.max(lastCheckTimeMs, timeMs - MAX_IDLE_TIME_IN_MS + 1) / 1000L))
% MAX_IDLE_TIME_IN_SEC;
int endIdx = ((int) (timeMs / 1000L)) % MAX_IDLE_TIME_IN_SEC;
startIdx和endIdx之間的readIdeal數組即可。針對每個idx,拿出set,将set中的所有session的READ_IDLE_INDEX 置為null,并調用session中的使用者處理過程即可(比如關閉session)。(注意,如果使用者設定過期時間為-1,表示永遠不過期,此時不做任何處理)。需要注意的是,worker線程每次至少會處理一個index,有時會處理多個index,比如,目前這次處理index的時間超過了3s,之後worker又sleep 1s,那麼下次worker被喚醒,将處理這4s内的4個index。
小結:
在大多數情況下,連結線程比較活躍的情況下,session的存儲位置會在數組中不斷的向後移動(因為是循環數組,是以沒有邊界),是以當worker要處理啟動處理過期時,活躍的session的index一定會在過期處理的index之前,是以,僅有那些不活躍的到期的session才會被查詢和處理掉,worker的處理代價并不高。
使用這種方式不需要計時器等支援,實作簡單,是一種比較好的逾時處理方式。
參考:
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>3.0.0-M2</version>
</dependency>
源碼:
public class IndexedIdleChecker implements IdleChecker {
private static final int MAX_IDLE_TIME_IN_SEC = 60 * 60;
private static final long MAX_IDLE_TIME_IN_MS = MAX_IDLE_TIME_IN_SEC * 1000L;
private static final Logger LOG = LoggerFactory.getLogger(IndexedIdleChecker.class);
// Aspeedup for logs
private static final boolean IS_DEBUG = LOG.isDebugEnabled();
private static final AttributeKey<Integer> READ_IDLE_INDEX = AttributeKey.createKey(Integer.class,
"idle.read.index");
private static final AttributeKey<Integer> WRITE_IDLE_INDEX = AttributeKey.createKey(Integer.class,
"idle.write.index");
private long lastCheckTimeMs = System.currentTimeMillis();
@SuppressWarnings("unchecked")
private final Set<AbstractIoSession>[] readIdleSessionIndex = new Set[MAX_IDLE_TIME_IN_SEC];
@SuppressWarnings("unchecked")
private final Set<AbstractIoSession>[] writeIdleSessionIndex = new Set[MAX_IDLE_TIME_IN_SEC];
private static final int GRANULARITY_IN_MS = 1000;
private final Worker worker = new Worker();
private volatile boolean running = true;
@Override
public void start() {
worker.start();
}
@Override
public void destroy() {
running = false;
try {
// interrupt the sleep
worker.interrupt();
// wait for worker to stop
worker.join();
} catch (InterruptedException e) {
// interrupted, we don't care much
}
}
@Override
public void sessionRead(AbstractIoSession session, long timeInMs) {
if (IS_DEBUG) {
LOG.debug("session read event, compute idle index of session {}", session);
}
// remove from the old index position
Integer oldIndex =session.getAttribute(READ_IDLE_INDEX);
if (oldIndex != null && readIdleSessionIndex[oldIndex] != null) {
if (IS_DEBUG) {
LOG.debug("remove for old index {}", oldIndex);
}
readIdleSessionIndex[oldIndex].remove(session);
}
long idleTimeInMs = session.getConfig().getIdleTimeInMillis(IdleStatus.READ_IDLE);
// is idle enabled ?
if (idleTimeInMs <= 0L) {
if (IS_DEBUG) {
LOG.debug("no read idle configuration");
}
} else {
int nextIdleTimeInSeconds = (int) ((timeInMs +idleTimeInMs) / 1000L);
int index = nextIdleTimeInSeconds % MAX_IDLE_TIME_IN_SEC;
if (IS_DEBUG) {
LOG.debug("computed index : {}", index);
}
if (readIdleSessionIndex[index] == null) {
readIdleSessionIndex[index] =Collections
.newSetFromMap(newConcurrentHashMap<AbstractIoSession, Boolean>());
}
if (IS_DEBUG) {
LOG.debug("marking session {} idle for index {}", session, index);
}
readIdleSessionIndex[index].add(session);
session.setAttribute(READ_IDLE_INDEX, index);
}
}
@Override
public void sessionWritten(AbstractIoSession session, long timeInMs) {
if (IS_DEBUG) {
LOG.debug("session write event, compute idle index of session {}", session);
}
// remove from the old index position
Integer oldIndex =session.getAttribute(WRITE_IDLE_INDEX);
if (oldIndex != null && writeIdleSessionIndex[oldIndex] != null) {
if (IS_DEBUG) {
LOG.debug("remove for old index {}", oldIndex);
}
writeIdleSessionIndex[oldIndex].remove(session);
}
long idleTimeInMs = session.getConfig().getIdleTimeInMillis(IdleStatus.WRITE_IDLE);
// is idle enabled ?
if (idleTimeInMs <= 0L) {
if (IS_DEBUG) {
LOG.debug("no write idle configuration");
}
} else {
int nextIdleTimeInSeconds = (int) ((timeInMs +idleTimeInMs) / 1000L);
int index = nextIdleTimeInSeconds % MAX_IDLE_TIME_IN_SEC;
if (writeIdleSessionIndex[index] == null) {
writeIdleSessionIndex[index] =Collections
.newSetFromMap(newConcurrentHashMap<AbstractIoSession, Boolean>());
}
writeIdleSessionIndex[index].add(session);
session.setAttribute(WRITE_IDLE_INDEX, index);
}
}
@Override
public int processIdleSession(long timeMs) {
int counter = 0;
long delta = timeMs - lastCheckTimeMs;
if (LOG.isDebugEnabled()) {
LOG.debug("checking idle time, last = {}, now = {}, delta = {}", new Object[] { lastCheckTimeMs, timeMs,
delta });
}
if (delta < 1000) {
LOG.debug("not a second between the last checks, abort");
return 0;
}
// if (lastCheckTimeMs == 0) {
// LOG.debug("first check, we start now");
// lastCheckTimeMs = System.currentTimeMillis() - 1000;
// }
int startIdx = ((int) (Math.max(lastCheckTimeMs, timeMs - MAX_IDLE_TIME_IN_MS + 1) / 1000L))
% MAX_IDLE_TIME_IN_SEC;
int endIdx = ((int) (timeMs / 1000L)) % MAX_IDLE_TIME_IN_SEC;
LOG.debug("scaning from index {} to index {}", startIdx, endIdx);
int index = startIdx;
do {
LOG.trace("scanningindex {}", index);
// look at the read idle index
counter += processIndex(readIdleSessionIndex, index, IdleStatus.READ_IDLE);
counter += processIndex(writeIdleSessionIndex, index, IdleStatus.WRITE_IDLE);
index = (index + 1) % MAX_IDLE_TIME_IN_SEC;
} while (index != endIdx);
// save last check time for next call
lastCheckTimeMs = timeMs;
LOG.debug("detected {} idleing sessions", counter);
return counter;
}
private int processIndex(Set<AbstractIoSession>[]indexByTime, int position, IdleStatus status) {
Set<AbstractIoSession> sessions =indexByTime[position];
if (sessions == null) {
return 0;
}
int counter = 0;
for (AbstractIoSession idleSession : sessions) {
idleSession.setAttribute(status ==IdleStatus.READ_IDLE ? READ_IDLE_INDEX : WRITE_IDLE_INDEX, null);
// check if idle detection wasn't disabled since the index update
if (idleSession.getConfig().getIdleTimeInMillis(status)> 0) {
idleSession.processSessionIdle(status);
}
counter++;
}
// clear the processed index entry
indexByTime[position] = null;
return counter;
}
private class Worker extends Thread {
public Worker() {
super("IdleChecker");
setDaemon(true);
}
@Override
public void run() {
while (running) {
try {
sleep(GRANULARITY_IN_MS);
processIdleSession(System.currentTimeMillis());
} catch (InterruptedException e) {
break;
}
}
}
}
}