HBase put資料時會先将資料寫入記憶體,其記憶體結構是一個ConcurrentSkipListMap,其Comparator是KVComparator。
keyvalue對象結構
KVComparator的KeyValue對象比較過程
1.使用KeyComparator比較rowkey,結果是rowkey位元組序從小到大
2.如果rowkey一樣,則按column family比較,結果是column family位元組序從小到大
3.如果column family一樣,則按family+qualifier比較,結果是qualifier位元組序從小到大
4.如果qualifier也一樣,則按timestamp排序,結果是timestamp從大到小排序
5.如果timestamp也一樣,則按type排序,delete在put之前
6.以上都一樣,則按照memstoreTS排序,memstoreTS是原子遞增id,不可能一樣,結果是memstoreTS從大到小排序,越新的修改會排前面,友善scan
可見KeyValue對象在記憶體裡其實是已經排序好了,flush生成檔案的時候,隻是簡單的scan一下,設定maxVersion(在這裡超過maxVersion的put自動失效了),将每個KeyValue對象寫入HDFS
Flush生成HFile的過程大抵如下
1.構造Writer,最新版本是HFileWriterV2,第2版
2.循環将KeyValue對象append到writer,這裡會按block寫入cache,預設64k,每64k,就要重新new一個block,每次finish一個block,就會添加一條索引記錄到block index,到block index超過一定限制(預設124K),則寫入一個特殊的InlineBlock,代表這是一個索引塊,HFile就是data block和inline block交替結構
3.KeyValue對象寫完後,再将索引資料以inline block的形式全部寫入,最後寫入root index,fileInfo等資訊。
HFile V2結構
其實作類圖如下
主流程
Scan scan = new Scan();
//最多保留的版本,預設為3
scan.setMaxVersions(scanInfo.getMaxVersions());
// Use a store scanner to find which rows to flush.
// Note that we need to retain deletes, hence
// treat this as a minor compaction.
InternalScanner scanner = new StoreScanner(this, scan, Collections
.singletonList(new CollectionBackedScanner(set, this.comparator)),
ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + this + ": creating writer");
// A. Write the map out to the disk
writer = createWriterInTmp(set.size());
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
pathName = writer.getPath();
try {
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
hasMore = scanner.next(kvs);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners.
kv = kv.shallowCopy();
kv.setMemstoreTS(0);
}
//append寫keyvalue
writer.append(kv);
flushed += this.memstore.heapSizeChange(kv, true);
}
kvs.clear();
}
} while (hasMore);
} finally {
// Write out the log sequence number that corresponds to this output
// hfile. The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file");
//寫入中繼資料
writer.close();
}
}
} finally {
flushedSize.set(flushed);
scanner.close();
}
append過程
private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
final byte[] value, final int voffset, final int vlength)
throws IOException {
//檢查key順序,傳回是否和上一個key重複
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
//如果是新key,才檢查block是否超過限制,也就是說同樣的key保證在同一個block data裡
//如果block超過64K限制,則開始将block寫入HDFS的outputstream,不flush,同時更新block index資訊
if (!dupKey) {
checkBlockBoundary();
}
//初始化的時候重置狀态,準備開始寫入data block了
if (!fsBlockWriter.isWriting())
newBlock();
// Write length of key and value and then actual key and value bytes.
// Additionally, we may also write down the memstoreTS.
{
//userDataStream是臨時的,如果block滿了之後,會将裡面的資料flush到HDFS的outputstream
//這裡将keyvalue對象順序寫入
DataOutputStream out = fsBlockWriter.getUserDataStream();
out.writeInt(klength);
totalKeyLength += klength;
out.writeInt(vlength);
totalValueLength += vlength;
out.write(key, koffset, klength);
out.write(value, voffset, vlength);
if (this.includeMemstoreTS) {
WritableUtils.writeVLong(out, memstoreTS);
}
}
// Are we the first key in this block?
//block的第一個key,後續會作為data index的entry屬性
if (firstKeyInBlock == null) {
// Copy the key.
firstKeyInBlock = new byte[klength];
System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
}
//上一個keyvalue
lastKeyBuffer = key;
lastKeyOffset = koffset;
lastKeyLength = klength;
entryCount++;
}
初始化開始寫入
/**
* Starts writing into the block. The previous block's data is discarded.
*
* @return the stream the user can write their data into
* @throws IOException
*/
public DataOutputStream startWriting(BlockType newBlockType)
throws IOException {
if (state == State.BLOCK_READY && startOffset != -1) {
// We had a previous block that was written to a stream at a specific
// offset. Save that offset as the last offset of a block of that type.
//儲存着同類型block的上一個block的偏移量
prevOffsetByType[blockType.getId()] = startOffset;
}
//目前block的偏移量
startOffset = -1;
//block類型,主要有data,block,index block,meta block
blockType = newBlockType;
//臨時buffer
baosInMemory.reset();
//頭資料,這裡是占位用,後續finish block的時候會寫入正式的header資料
baosInMemory.write(DUMMY_HEADER);
//開始寫
state = State.WRITING;
// We will compress it later in finishBlock()
//臨時stream
userDataStream = new DataOutputStream(baosInMemory);
return userDataStream;
}
寫着寫着,可能block就滿了,檢查data block是否已滿
private void checkBlockBoundary() throws IOException {
//預設64K
if (fsBlockWriter.blockSizeWritten() < blockSize)
return;
//将之前寫入的userDataStream裡的data block寫入HDFS的outputStream,添加索引記錄
finishBlock();
//如果索引快滿了,則将index block寫入HDFS的outputStream
writeInlineBlocks(false);
//重置狀态,進入’WRITING‘狀态,等待寫入
newBlock();
}
具體finishBlock過程,flush資料到HDFS的outputstream
/** Clean up the current block */
private void finishBlock() throws IOException {
//前置狀态’WRITING‘,userDataStream有資料寫入
if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
return;
long startTimeNs = System.nanoTime();
// Update the first data block offset for scanning.
//第一個data block的偏移量
if (firstDataBlockOffset == -1) {
firstDataBlockOffset = outputStream.getPos();
}
// Update the last data block offset
//上一個data block的偏移量
lastDataBlockOffset = outputStream.getPos();
//這裡将userDataStream裡的資料flush到HDFS的outputStream
fsBlockWriter.writeHeaderAndData(outputStream);
//在HDFS上寫了多少位元組,有可能壓縮後,加上了checksum資料
int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
//更新data block的索引
dataBlockIndexWriter.addEntry(firstKeyInBlock, lastDataBlockOffset,
onDiskSize);
//未壓縮的資料位元組數
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
HFile.offerWriteData(onDiskSize);
//更新block cache
if (cacheConf.shouldCacheDataOnWrite()) {
doCacheOnWrite(lastDataBlockOffset);
}
}
寫入HDFS stream過程
public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
long offset = out.getPos();
if (startOffset != -1 && offset != startOffset) {
throw new IOException("A " + blockType + " block written to a "
+ "stream twice, first at offset " + startOffset + ", then at "
+ offset);
}
//這個塊的開始位置
startOffset = offset;
//寫
writeHeaderAndData((DataOutputStream) out);
}
private void writeHeaderAndData(DataOutputStream out) throws IOException {
//這個方法比較重要,當一個block需要flush到HDFS stream的時候,需要做資料做一些處理,比如壓縮,編碼等,設定狀态為’READY‘
ensureBlockReady();
//onDiskBytesWithHeader是處理之後的資料,直接寫入HDFS stream
out.write(onDiskBytesWithHeader);
if (compressAlgo == NONE) {
if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
throw new IOException("A " + blockType
+ " without compression should have checksums "
+ " stored separately.");
}
//不壓縮的話,還要寫入checksum
out.write(onDiskChecksum);
}
}
對buffer資料處理部分,包括壓縮和編碼等處理
private void finishBlock() throws IOException {
//先flush一下
userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array.
//拿到buffer中的資料,也就是目前所有寫入的資料,未壓縮
uncompressedBytesWithHeader = baosInMemory.toByteArray();
//上一個同類型的block偏移量
prevOffset = prevOffsetByType[blockType.getId()];
// We need to set state before we can package the block up for
// cache-on-write. In a way, the block is ready, but not yet encoded or
// compressed.
//READY,準備flush
state = State.BLOCK_READY;
//encode
encodeDataBlockForDisk();
//壓縮和checksum
doCompressionAndChecksumming();
}
壓縮和checksum,壓縮之後,checksum資料直接寫入onDiskBytesWithHeader,否則寫入onDiskChecksum,不管壓縮不壓縮,都要寫入block的header資料
private void doCompressionAndChecksumming() throws IOException {
// do the compression
if (compressAlgo != NONE) {
compressedByteStream.reset();
compressedByteStream.write(DUMMY_HEADER);
compressionStream.resetState();
compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
uncompressedBytesWithHeader.length - HEADER_SIZE);
compressionStream.flush();
compressionStream.finish();
// generate checksums
onDiskDataSizeWithHeader = compressedByteStream.size(); // data size
// reserve space for checksums in the output byte stream
ChecksumUtil.reserveSpaceForChecksums(compressedByteStream,
onDiskDataSizeWithHeader, bytesPerChecksum);
onDiskBytesWithHeader = compressedByteStream.toByteArray();
putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
// generate checksums for header and data. The checksums are
// part of onDiskBytesWithHeader itself.
ChecksumUtil.generateChecksums(
onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,
onDiskBytesWithHeader, onDiskDataSizeWithHeader,
checksumType, bytesPerChecksum);
// Checksums are already part of onDiskBytesWithHeader
onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
//set the header for the uncompressed bytes (for cache-on-write)
putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + onDiskChecksum.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
} else {
// If we are not using any compression, then the
// checksums are written to its own array onDiskChecksum.
onDiskBytesWithHeader = uncompressedBytesWithHeader;
onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;
//check份數
int numBytes = (int)ChecksumUtil.numBytes(
uncompressedBytesWithHeader.length,
bytesPerChecksum);
//checksum資料
onDiskChecksum = new byte[numBytes];
//set the header for the uncompressed bytes
//修改header
putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + onDiskChecksum.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
ChecksumUtil.generateChecksums(
uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,
onDiskChecksum, 0,
checksumType, bytesPerChecksum);
}
}
data block處理完之後,更新索引,索引項由block的firstkey,開始的偏移量,dataSize組成。索引主要有2種,leaf-level chunk和root index chunk
void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
long curTotalNumSubEntries) {
// Record the offset for the secondary index
//二級索引,記每個entry的偏移量
secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
//下一個entry的偏移位址
curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
+ firstKey.length;
//給root chunk用
curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
+ WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
//索引資訊記錄
blockKeys.add(firstKey);
blockOffsets.add(blockOffset);
onDiskDataSizes.add(onDiskDataSize);
//如果是root index chunk添加索引
if (curTotalNumSubEntries != -1) {
numSubEntriesAt.add(curTotalNumSubEntries);
// Make sure the parallel arrays are in sync.
if (numSubEntriesAt.size() != blockKeys.size()) {
throw new IllegalStateException("Only have key/value count " +
"stats for " + numSubEntriesAt.size() + " block index " +
"entries out of " + blockKeys.size());
}
}
}
回到前頭,finishBlock之後,資料都從buffer中flush到了HDFS的stream裡。這個時候給index block一個機會,檢查下是否已滿,滿的話,将索引塊flush到HDFS
private void writeInlineBlocks(boolean closing) throws IOException {
for (InlineBlockWriter ibw : inlineBlockWriters) {
//如果InlineBlock需要flush,就flush
while (ibw.shouldWriteBlock(closing)) {
long offset = outputStream.getPos();
boolean cacheThisBlock = ibw.cacheOnWrite();
//和data block寫入一樣,先start,再write
ibw.writeInlineBlock(fsBlockWriter.startWriting(
ibw.getInlineBlockType()));
fsBlockWriter.writeHeaderAndData(outputStream);
ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
fsBlockWriter.getUncompressedSizeWithoutHeader());
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
if (cacheThisBlock) {
doCacheOnWrite(offset);
}
}
}
}
對于BlockIndexWriter來說規則是,maxChunkSize預設128K
curInlineChunk.getNonRootSize() >= maxChunkSize;
看看BlockIndexWriter是如何寫的
public void writeInlineBlock(DataOutput out) throws IOException {
if (singleLevelOnly)
throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
// Write the inline block index to the output stream in the non-root
// index block format.
//以leaf chunk寫入
curInlineChunk.writeNonRoot(out);
// Save the first key of the inline block so that we can add it to the
// parent-level index.
//保留這個index block的firstkey,給後續多級索引用
firstKey = curInlineChunk.getBlockKey(0);
// Start a new inline index block
//curInlineChunk初始化,後續索引資料繼續寫
curInlineChunk.clear();
}
寫入leaf chunk
void writeNonRoot(DataOutput out) throws IOException {
// The number of entries in the block.
//從索引記錄數
out.writeInt(blockKeys.size());
if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
throw new IOException("Corrupted block index chunk writer: " +
blockKeys.size() + " entries but " +
secondaryIndexOffsetMarks.size() + " secondary index items");
}
// For each entry, write a "secondary index" of relative offsets to the
// entries from the end of the secondary index. This works, because at
// read time we read the number of entries and know where the secondary
// index ends.
//二級索引資料寫入,因為每個索引entry是變長的,這個二級索引記錄着每個entry的偏移資訊,友善查找
for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
out.writeInt(currentSecondaryIndex);
// We include one other element in the secondary index to calculate the
// size of each entry more easily by subtracting secondary index elements.
//總大小
out.writeInt(curTotalNonRootEntrySize);
//索引資料
for (int i = 0; i < blockKeys.size(); ++i) {
out.writeLong(blockOffsets.get(i));
out.writeInt(onDiskDataSizes.get(i));
out.write(blockKeys.get(i));
}
}
索引block寫入HDFS stream後,更新rootChunk索引,rootChunk是一個對data block index塊的索引結構,所有keyvalue都寫完後,rootChunk才會flush
到HDFS stream,會進一步分裂多級結構,但是在循環寫入的時候隻有2級
public void blockWritten(long offset, int onDiskSize, int uncompressedSize)
{
// Add leaf index block size
totalBlockOnDiskSize += onDiskSize;
totalBlockUncompressedSize += uncompressedSize;
if (singleLevelOnly)
throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
if (firstKey == null) {
throw new IllegalStateException("Trying to add second-level index " +
"entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
"but the first key was not set in writeInlineBlock");
}
//寫入的時候,隻有2級,因為rootChunk 128K足夠存memstore的所有索引資訊了
if (rootChunk.getNumEntries() == 0) {
// We are writing the first leaf block, so increase index level.
expectNumLevels(1);
numLevels = 2;
}
// Add another entry to the second-level index. Include the number of
// entries in all previous leaf-level chunks for mid-key calculation.
//root索引寫入entry,totalNumEntries為目前子節點數,也就是leaf level chunk數目
rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
firstKey = null;
}
以上就是循環的大抵過程,data block和data index block交替寫入。當所有資料都寫完後,開始做close操作,看HFileWriterV2的close操作
public void close() throws IOException {
if (outputStream == null) {
return;
}
// Write out the end of the data blocks, then write meta data blocks.
// followed by fileinfo, data block index and meta block index.
//結尾資料寫入stream
finishBlock();
//寫index block資料
writeInlineBlocks(true);
//trailer資訊
FixedFileTrailer trailer = new FixedFileTrailer(2,
HFileReaderV2.MAX_MINOR_VERSION);
// Write out the metadata blocks if any.
//meta block寫入,V2裡meta沒用,V1裡是用來存bloomfilter的
if (!metaNames.isEmpty()) {
for (int i = 0; i < metaNames.size(); ++i) {
// store the beginning offset
long offset = outputStream.getPos();
// write the metadata content
DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
metaData.get(i).write(dos);
fsBlockWriter.writeHeaderAndData(outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
// Add the new meta block to the meta index.
metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
fsBlockWriter.getOnDiskSizeWithHeader());
}
}
// Load-on-open section.
// Data block index.
//
// In version 2, this section of the file starts with the root level data
// block index. We call a function that writes intermediate-level blocks
// first, then root level, and returns the offset of the root level block
// index.
//這裡将rootChunk分裂成子樹,如果rootChunk很大,按照128K分裂成子block并寫入HDFS,如果一開始索引樹有2層的話,結果索引樹會有3層
//也就是多了一個中間層
long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
trailer.setLoadOnOpenOffset(rootIndexOffset);
// Meta block index.
//寫入meta block
metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
BlockType.ROOT_INDEX), "meta");
fsBlockWriter.writeHeaderAndData(outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
if (this.includeMemstoreTS) {
appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
}
//寫file info
// File info
writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
fsBlockWriter.writeHeaderAndData(outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
// Load-on-open data supplied by higher levels, e.g. Bloom filters.
for (BlockWritable w : additionalLoadOnOpenData){
fsBlockWriter.writeBlock(w, outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
}
//寫trailer
// Now finish off the trailer.
trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
trailer.setUncompressedDataIndexSize(
dataBlockIndexWriter.getTotalUncompressedSize());
trailer.setFirstDataBlockOffset(firstDataBlockOffset);
trailer.setLastDataBlockOffset(lastDataBlockOffset);
trailer.setComparatorClass(comparator.getClass());
trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
finishClose(trailer);
fsBlockWriter.releaseCompressor();
}