天天看點

Editlog與FileChannel Log的Group Commit

先來看看Hadoop是怎麼處理的:

Editlog是可以被多個線程并發寫入的,每個線程維護了自己最新的一個事務ID:

privatestaticfinalThreadLocal<TransactionId> myTransactionId = newThreadLocal<TransactionId>() {

protectedsynchronizedTransactionId initialValue() {

returnnewTransactionId(Long.MAX_VALUE);

}

};

在送出的時候,首先獲得送出時最新的事務ID:

synchronized(this){

TransactionId id = myTransactionId.get();

id.txid= txid;

然後開始同步(代碼被删減):

//拿到自己的事務ID

longmytxid = myTransactionId.get().txid;

booleansync = false;

try{

EditLogOutputStream logStream = null;

try {

//如果自己的事務未被同步,但是同步正在被其他線程處理,那麼就阻塞

while (mytxid > synctxid && isSyncRunning) {

wait(1000);

} catch (InterruptedException ie) {}

//當被喚醒或者逾時發現自己的事務已經被group commit了,那麼就傳回

if (mytxid <= synctxid) {

return;

//否則開始進行sync

isSyncRunning = true;

sync = true;

//Hadoop的editlog使用了double buffer來達到重新整理和寫不阻塞;這裡置換buffer

editLogStream.setReadyToFlush();

logStream = editLogStream;

if (logStream != null) {

logStream.flush();

} catch(IOException ex) {}

} finally{

if (sync) {

isSyncRunning = false;

//重新整理完成,喚醒阻塞線程

this.notifyAll();

而在Flume File-channel裡的group commit也是類似的方式,不過更為簡潔:

一樣是分兩個階段,每個階段都是同步方法,并且Flume的transactionid和position是分開的,每次隻需同步檔案末尾位置:

Commit();

Sync();

//在送出的時候更新最後送出位置

synchronizedvoidcommit(ByteBuffer buffer) throws IOException {

write(buffer);

lastCommitPosition= position();

//若已經被同步了則什麼也不做,傳回

synchronizedvoidsync() throwsIOException {

if(lastSyncPosition< lastCommitPosition){

getFileChannel().force(false);

lastSyncPosition = position();

syncCount++;

本文轉自MIKE老畢 51CTO部落格,原文連結:http://blog.51cto.com/boylook/1300543,如需轉載請自行聯系原作者

繼續閱讀