先來看看Hadoop是怎麼處理的:
Editlog是可以被多個線程并發寫入的,每個線程維護了自己最新的一個事務ID:
privatestaticfinalThreadLocal<TransactionId> myTransactionId = newThreadLocal<TransactionId>() {
protectedsynchronizedTransactionId initialValue() {
returnnewTransactionId(Long.MAX_VALUE);
}
};
在送出的時候,首先獲得送出時最新的事務ID:
synchronized(this){
TransactionId id = myTransactionId.get();
id.txid= txid;
然後開始同步(代碼被删減):
//拿到自己的事務ID
longmytxid = myTransactionId.get().txid;
booleansync = false;
try{
EditLogOutputStream logStream = null;
try {
//如果自己的事務未被同步,但是同步正在被其他線程處理,那麼就阻塞
while (mytxid > synctxid && isSyncRunning) {
wait(1000);
} catch (InterruptedException ie) {}
//當被喚醒或者逾時發現自己的事務已經被group commit了,那麼就傳回
if (mytxid <= synctxid) {
return;
//否則開始進行sync
isSyncRunning = true;
sync = true;
//Hadoop的editlog使用了double buffer來達到重新整理和寫不阻塞;這裡置換buffer
editLogStream.setReadyToFlush();
logStream = editLogStream;
if (logStream != null) {
logStream.flush();
} catch(IOException ex) {}
} finally{
if (sync) {
isSyncRunning = false;
//重新整理完成,喚醒阻塞線程
this.notifyAll();
而在Flume File-channel裡的group commit也是類似的方式,不過更為簡潔:
一樣是分兩個階段,每個階段都是同步方法,并且Flume的transactionid和position是分開的,每次隻需同步檔案末尾位置:
Commit();
Sync();
//在送出的時候更新最後送出位置
synchronizedvoidcommit(ByteBuffer buffer) throws IOException {
write(buffer);
lastCommitPosition= position();
//若已經被同步了則什麼也不做,傳回
synchronizedvoidsync() throwsIOException {
if(lastSyncPosition< lastCommitPosition){
getFileChannel().force(false);
lastSyncPosition = position();
syncCount++;
本文轉自MIKE老畢 51CTO部落格,原文連結:http://blog.51cto.com/boylook/1300543,如需轉載請自行聯系原作者