前言
之前一段時間寫了篇文章DataNode資料進行中心DataXceiver從大的方向了解了下datanode讀寫操作的過程.但是并沒有具體細粒度的去關注讀寫操作中的細節以及可能存在的問題,本篇文章算是對這方面的一個補充吧.盡管本文所涉及的範圍面看起來很窄,但是所呈現出來的結果一定會讓你有所收獲的.
DFSOutputStream寫資料以及周邊相關類,變量
本文主要闡述的datanode寫資料的過程,而寫資料過程中,第一個聯系到的就是DFSOutputStream對象類.但其實這隻是其中的一個大類,内部還包括了與其内部對象類的各種互動,協同的操作.下面花簡短的篇幅介紹這幾個類.
DataStreamer
資料流類,這是資料寫操作時調用的主要類,DFSOutputStream的start()方法調用的就是dataStreamer的線程run方法,DFSOutputStream的主操作都是依靠内部對象類dataStreamer完成實作,可以說,這二者的聯系最為緊密.
ResponseProcessor
ResponseProcessor類是DataStreamer中的内部類,主要作用是接收pipeline中datanode的ack回複,它是一個線程類.給出源碼中的注釋:
//
// Processes responses from the datanodes. A packet is removed
// from the ackQueue when its response arrives.
//
DFSPacket
資料包類,在DataStreamer和DFSOutputStream中都是用的這個類進行資料的傳輸的,給出源碼中的注釋:
/****************************************************************
* DFSPacket is used by DataStreamer and DFSOutputStream.
* DFSOutputStream generates packets and then ask DatStreamer
* to send them to datanodes.
****************************************************************/
除了以上3個大類需要了解之外,還有幾個變量同樣需要重視,因為這些變量會在後面的分析中經常出現.
1.dataQueue(List<DFSPacket>)
待發送資料包清單
2.ackQueue(List<DFSPacket>)
資料包回複清單,資料包發送成功後,dfsPacket将會從dataQueue移到ackQueue中.
3.pipeline
pipeline是一個經常看見的名詞,中文翻譯的意思是"管道",但是這個詞我在網上也搜了相關的更好的解釋,稍稍比較好了解的方式是"流水線模型",也有些人把它與設計模式中的責任鍊模式相挂鈎,是以這個詞用中文翻譯總是不能很好的表達他的原意,在後面的篇幅中還會繼續提到.
DataStreamer資料流對象
了解寫資料的具體細節,需要首先了解DataStreamer的實作機理,因為DFSOutputStream的主操作無非是調用了dataStreamer的内部方法.DataStreamer源碼中的注釋很好的解釋了DataStreamer所做的事,學習DataStreamer可以從閱讀他的注釋開始.
/*********************************************************************
*
* The DataStreamer class is responsible for sending data packets to the
* datanodes in the pipeline. It retrieves a new blockid and block locations
* from the namenode, and starts streaming packets to the pipeline of
* Datanodes. Every packet has a sequence number associated with
* it. When all the packets for a block are sent out and acks for each
* if them are received, the DataStreamer closes the current block.
*
* The DataStreamer thread picks up packets from the dataQueue, sends it to
* the first datanode in the pipeline and moves it from the dataQueue to the
* ackQueue. The ResponseProcessor receives acks from the datanodes. When an
* successful ack for a packet is received from all datanodes, the
* ResponseProcessor removes the corresponding packet from the ackQueue.
*
* In case of error, all outstanding packets are moved from ackQueue. A new
* pipeline is setup by eliminating the bad datanode from the original
* pipeline. The DataStreamer now starts sending packets from the dataQueue.
*
*********************************************************************/
如果看不懂這麼多的英文,沒有關系,我特地對其進行了翻譯,幫助大家了解:
DataStreamer對象類是負責發送data packets資料包到pipeline中的各個datanode中.
它會從namenode中尋求一個新的blockId和block的位置資訊,然後開始以流式的方式在pipeline
的datanode中進行packet資料包的傳輸.每個包有屬于它自己的一個數字序列号.當屬于一個block
塊的所有的資料包發生完畢并且對應的ack回複都被接收到了, 則表明此次的block寫入完成,dataStreamer
将會關閉目前block塊.
DataStreamer線程從dataQueue中選取packets資料包,發送此資料包給pipeline中的首個datanode,
然後移動此資料從dataQueue清單到ackQueue.ResponseProcessor會從各個datanode中接收ack回複.
當對于一個packet的成功的ack回複被所有的datanode接收到了,ResponseProcessor将會從ackQueue列
表中移除相應的packet包.
當出現錯誤的時候,所有的未完成的packet資料包将會從ackQueue中移除掉.一個新的
pipeline會被重建立立,建立立的pipeline會除掉壞的datanode.DataStreamer會從dataQueue
中重新發送資料包
OK,讀完官方注釋,想必或多或少已經對其中的機理有所了解.下圖是我做的一張結構簡圖:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICMygTM1MjM0EDOxIDM2EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
這張圖對應的程式邏輯在run()方法中,首先在while循環中會擷取一個dataPacket資料包:
one = dataQueue.getFirst(); // regular data packet
然後在接下來的操作中會出現packet的轉移
// send the packet
SpanId spanId = SpanId.INVALID;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
if (scope != null) {
spanId = scope.getSpanId();
scope.detach();
one.setTraceScope(scope);
}
scope = null;
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
}
}
然後發送資料到遠端datanode節點
// write out data to remote datanode
try (TraceScope ignored = dfsClient.getTracer().
newScope("DataStreamer#writeTo", spanId)) {
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
...
dataStreamer發送完資料包之後,responseProcessor程序會收到來自datanode的ack回複,如果對于一個block塊,收到了pipeline中datanode所有的ack回複資訊,則代表這個block塊發送完成了.pipeline的datanode的建構分為2種情形,代表着2種情形的資料傳輸
BlockConstructionStage.PIPELINE_SETUP_CREATE
BlockConstructionStage.PIPELINE_SETUP_APPEND
第一種情況在新配置設定塊的時候進行的,從namenode上擷取新的blockId和位置,然後連接配接上第一個datanode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if (LOG.isDebugEnabled()) {
LOG.debug("Allocating new block: " + this);
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
}
/**
* Open a DataStreamer to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated.
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
protected LocatedBlock nextBlockOutputStream() throws IOException {
...
//
// Connect to first DataNode in the list.
//
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
...
pipeline的第一階段可以用下圖表示
然後另外一個階段是第一個datanode節點向其他剩餘節點建立連接配接
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if (LOG.isDebugEnabled()) {
LOG.debug("Append to block {}", block);
}
setupPipelineForAppendOrRecovery();
if (streamerClosed) {
continue;
}
initDataStreaming();
}
後面是建立連接配接的代碼
/**
* Open a DataStreamer to a DataNode pipeline so that
* it can be written to.
* This happens when a file is appended or data streaming fails
* It keeps on trying until a pipeline is setup
*/
private void setupPipelineForAppendOrRecovery() throws IOException {
// Check number of datanodes. Note that if there is no healthy datanode,
// this must be internal error because we mark external error in striped
// outputstream only when all the streamers are in the DATA_STREAMING stage
...
setupPipelineInternal(nodes, storageTypes);
}
protected void setupPipelineInternal(DatanodeInfo[] datanodes,
StorageType[] nodeStorageTypes) throws IOException {
...
// set up the pipeline again with the remaining nodes
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
...
用圖形展示的效果如下
pipeline的異常重建發生在datanode io處理這塊
public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = null;
while (!streamerClosed && dfsClient.clientRunning) {
...
DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = processDatanodeOrExternalError();
...
/**
* If this stream has encountered any errors, shutdown threads
* and mark the stream as closed.
*
* @return true if it should sleep for a while after returning.
*/
private boolean processDatanodeOrExternalError() throws IOException {
if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) {
return false;
}
...
if (response != null) {
LOG.info("Error Recovery for " + block +
" waiting for responder to exit. ");
return true;
}
closeStream();
// move packets from ack queue to front of the data queue
synchronized (dataQueue) {
dataQueue.addAll(0, ackQueue);
ackQueue.clear();
}
// If we had to recover the pipeline five times in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) {
LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success."));
streamerClosed = true;
return false;
}
setupPipelineForAppendOrRecovery();
...
ResponseProcessor回複擷取類
進入responseProcessor類的主運作方法:
@Override
public void run() {
setName("ResponseProcessor for block " + block);
PipelineAck ack = new PipelineAck();
TraceScope scope = null;
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
// read an ack from the pipeline
long begin = Time.monotonicNow();
ack.readFields(blockReplyStream);
...
這裡會從blockReplyStream輸入流中讀取ack傳回資訊,要特别注意的是,這裡的讀到的ack與之前的ackQueue中的ack并不是指同一個對象.這個ack指的是PipelineAck,主要的作用是擷取其中的seqno序列号.
long seqno = ack.getSeqno();
判斷是否是有效的block回複
assert seqno != PipelineAck.UNKOWN_SEQNO :
"Ack for unknown seqno should be a failed ack: " + ack;
if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
continue;
}
然後取出ack DFSPacket資料包,比較序列号,判斷是否一緻
// a success ack for a data packet
DFSPacket one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
if (one.getSeqno() != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block +
one.getSeqno() + " but received " + seqno);
}
此ack回複包判斷完畢後,會進行相應的Packet移除
synchronized (dataQueue) {
scope = one.getTraceScope();
if (scope != null) {
scope.reattach();
one.setTraceScope(null);
}
lastAckedSeqno = seqno;
pipelineRecoveryCount = 0;
ackQueue.removeFirst();
dataQueue.notifyAll();
one.releaseBuffer(byteArrayManager);
}
ackQueue中的packet就被徹底移除掉了,從最開始的加入到dataQueue,到move到ackQueue,到最後回複确認完畢,進行最終的移除.
在這些操作執行期間,還會進行一項判斷
isLastPacketInBlock = one.isLastPacketInBlock();
如果此packet是發送block塊的最後一個packet,則此responseProcessor将會退出循環.
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock)
當然,期間發生異常的時候,會導緻responderClosed設定為true,導緻循環的退出
catch (Exception e) {
if (!responderClosed) {
lastException.set(e);
errorState.setInternalError();
errorState.markFirstNodeIfNotMarked();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
if (!errorState.isRestartingNode()) {
LOG.warn("Exception for " + block, e);
}
responderClosed = true;
}
同樣地,我做了一張結構圖簡單的展示了其中的流程
DataStreamer與DFSOutputStream的關系
在前文中已經或多或少提到了這2個類之間的關系.可簡要概況為4大關系:
1.建立與被建立的關系.
2.啟動與被啟動的關系.
3.關閉與被關閉的關系.
4.生産者與消費者的關系.
下面一一做簡要的分析.建立與被建立的關系,可以從DFSOutputStream的構造函數中進行展現
/** Construct a new output stream for append. */
private DFSOutputStream(DFSClient dfsClient, String src,
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
throws IOException {
this(dfsClient, src, progress, stat, checksum);
initialFileSize = stat.getLen(); // length of file when opened
this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
// The last partial block of the file has to be filled.
if (!toNewBlock && lastBlock != null) {
// indicate that we are appending to an existing block
streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress,
checksum, cachingStrategy, byteArrayManager);
getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
adjustPacketChunkSize(stat);
getStreamer().setPipelineInConstruction(lastBlock);
} else {
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum);
streamer = new DataStreamer(stat,
lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src,
progress, checksum, cachingStrategy, byteArrayManager, favoredNodes);
}
}
第二點,啟動與被啟動的關系
啟動指的是start()方法
protected synchronized void start() {
getStreamer().start();
}
getStreamer方法用于擷取内部對象變量dataStreamer.
/**
* Returns the data streamer object.
*/
protected DataStreamer getStreamer() {
return streamer;
}
第三點,關閉與被關閉的關系.
public void close() throws IOException {
synchronized (this) {
try (TraceScope ignored = dfsClient.newPathTraceScope(
"DFSOutputStream#close", src)) {
closeImpl();
}
}
dfsClient.endFileLease(fileId);
}
protected synchronized void closeImpl() throws IOException {
...
closeThreads(true);
...
}
// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
protected void closeThreads(boolean force) throws IOException {
try {
getStreamer().close(force);
getStreamer().join();
getStreamer().closeSocket();
} catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer");
} finally {
getStreamer().setSocketToNull();
setClosed();
}
}
在這裡就會把streamer相關的類進行關閉.
第四點,生成者與消費者的關系,這個關系有點意思,那消費對象是什麼呢,答案就是DFSPacket,dataQueue中所存儲的對象. 也就是說,DFSOutputStream中的方法會往dataQueue中put入DFSPacket,然後dataStreamer會在主方法中區擷取,也就是上文分析的場景.其中在DFSOutputStream中寫入資料包的方法如下
// @see FSOutputSummer#writeChunk()
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
getChecksumSize() + " but found to be " + cklen);
}
if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
.getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
...
}
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks();
getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
}
enqueueCurrentPacketFull方法就會将packet寫入dataQueue中.
void enqueueCurrentPacket() throws IOException {
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
}
其實在DFSOutputStream的close方法中,也會觸發一次flush data最後清洗資料的操作到各個detained中,也會調用到enqueueCurrentPacket方法.
protected synchronized void closeImpl() throws IOException {
...
flushBuffer(); // flush from all upper layers
if (currentPacket != null) {
enqueueCurrentPacket();
}
if (getStreamer().getBytesCurBlock() != 0) {
setCurrentPacketToEmpty();
}
flushInternal(); // flush all data to Datanodes
// get last block before destroying the streamer
// If exception happened before, the last block will be null
lastBlock = getStreamer().getBlock();
...
同樣地,我也設計了一張關系結果圖展現上述的4種關系.
Streamer線程洩漏問題
Streamer線程洩漏問題是在學習DFSOutputStream相關機理時發現的,過程算是比較意外吧.線程洩漏問題可以類比于記憶體洩漏,就是該釋放的空間沒釋放,線程洩漏問題同理,該關閉的線程對象沒有及時關閉,發生的方法自然而然地在DFSOutputStream的close方法中了,重新調出這段程式.
/**
* Closes this output stream and releases any system
* resources associated with this stream.
*/
@Override
public void close() throws IOException {
synchronized (this) {
try (TraceScope ignored = dfsClient.newPathTraceScope(
"DFSOutputStream#close", src)) {
closeImpl();
}
}
dfsClient.endFileLease(fileId);
}
再次進入closeImpl實質的關閉方法,仔細觀察每步操作可能存在的問題
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
getStreamer().getLastException().check(true);
return;
}
try {
flushBuffer(); // flush from all upper layers
if (currentPacket != null) {
enqueueCurrentPacket();
}
if (getStreamer().getBytesCurBlock() != 0) {
setCurrentPacketToEmpty();
}
flushInternal(); // flush all data to Datanodes
// get last block before destroying the streamer
// If exception happened before, the last block will be null
ExtendedBlock lastBlock = getStreamer().getBlock();
closeThreads(true);
try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) {
completeFile(lastBlock);
}
} catch (ClosedChannelException ignored) {
} finally {
setClosed();
}
}
因為可能存在streamer線程對象未關閉的問題,是以我們得要找到可能在closeThreads方法之前可能有問題的代碼.如果你比較細心的話,應該馬上發現問題所在了.
flushBuffer(); // flush from all upper layers
if (currentPacket != null) {
enqueueCurrentPacket();
}
if (getStreamer().getBytesCurBlock() != 0) {
setCurrentPacketToEmpty();
}
flushInternal(); // flush all data to Datanodes
從flushBuffer到flushInternal中的操作都可能抛出IO異常,一旦抛出異常,自然就直接跳到finally處進行處理,中間的closeThread将不會被執行到,進而導緻dataStreamer線程洩漏.這個bug我已經送出開源社群,并且提供相應的patch,編号HDFS-9812,解決辦法很簡單,在這層代碼中再包一層try,catch,把closeThread方法放入新增try,catch方法的末尾進行處理,詳細資訊可以看文章末尾的連結.
相關連結
Issue連結: https://issues.apache.org/jira/browse/HDFS-9812
Github patch連結: https://github.com/linyiqun/open-source-patch/tree/master/hdfs/HDFS-9812