HBase put数据时会先将数据写入内存,其内存结构是一个ConcurrentSkipListMap,其Comparator是KVComparator。
keyvalue对象结构
KVComparator的KeyValue对象比较过程
1.使用KeyComparator比较rowkey,结果是rowkey字节序从小到大
2.如果rowkey一样,则按column family比较,结果是column family字节序从小到大
3.如果column family一样,则按family+qualifier比较,结果是qualifier字节序从小到大
4.如果qualifier也一样,则按timestamp排序,结果是timestamp从大到小排序
5.如果timestamp也一样,则按type排序,delete在put之前
6.以上都一样,则按照memstoreTS排序,memstoreTS是原子递增id,不可能一样,结果是memstoreTS从大到小排序,越新的修改会排前面,方便scan
可见KeyValue对象在内存里其实是已经排序好了,flush生成文件的时候,只是简单的scan一下,设置maxVersion(在这里超过maxVersion的put自动失效了),将每个KeyValue对象写入HDFS
Flush生成HFile的过程大抵如下
1.构造Writer,最新版本是HFileWriterV2,第2版
2.循环将KeyValue对象append到writer,这里会按block写入cache,默认64k,每64k,就要重新new一个block,每次finish一个block,就会添加一条索引记录到block index,到block index超过一定限制(默认124K),则写入一个特殊的InlineBlock,代表这是一个索引块,HFile就是data block和inline block交替结构
3.KeyValue对象写完后,再将索引数据以inline block的形式全部写入,最后写入root index,fileInfo等信息。
HFile V2结构
其实现类图如下
主流程
Scan scan = new Scan();
//最多保留的版本,默认为3
scan.setMaxVersions(scanInfo.getMaxVersions());
// Use a store scanner to find which rows to flush.
// Note that we need to retain deletes, hence
// treat this as a minor compaction.
InternalScanner scanner = new StoreScanner(this, scan, Collections
.singletonList(new CollectionBackedScanner(set, this.comparator)),
ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + this + ": creating writer");
// A. Write the map out to the disk
writer = createWriterInTmp(set.size());
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
pathName = writer.getPath();
try {
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
hasMore = scanner.next(kvs);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners.
kv = kv.shallowCopy();
kv.setMemstoreTS(0);
}
//append写keyvalue
writer.append(kv);
flushed += this.memstore.heapSizeChange(kv, true);
}
kvs.clear();
}
} while (hasMore);
} finally {
// Write out the log sequence number that corresponds to this output
// hfile. The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file");
//写入元数据
writer.close();
}
}
} finally {
flushedSize.set(flushed);
scanner.close();
}
append过程
private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
final byte[] value, final int voffset, final int vlength)
throws IOException {
//检查key顺序,返回是否和上一个key重复
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
//如果是新key,才检查block是否超过限制,也就是说同样的key保证在同一个block data里
//如果block超过64K限制,则开始将block写入HDFS的outputstream,不flush,同时更新block index信息
if (!dupKey) {
checkBlockBoundary();
}
//初始化的时候重置状态,准备开始写入data block了
if (!fsBlockWriter.isWriting())
newBlock();
// Write length of key and value and then actual key and value bytes.
// Additionally, we may also write down the memstoreTS.
{
//userDataStream是临时的,如果block满了之后,会将里面的数据flush到HDFS的outputstream
//这里将keyvalue对象顺序写入
DataOutputStream out = fsBlockWriter.getUserDataStream();
out.writeInt(klength);
totalKeyLength += klength;
out.writeInt(vlength);
totalValueLength += vlength;
out.write(key, koffset, klength);
out.write(value, voffset, vlength);
if (this.includeMemstoreTS) {
WritableUtils.writeVLong(out, memstoreTS);
}
}
// Are we the first key in this block?
//block的第一个key,后续会作为data index的entry属性
if (firstKeyInBlock == null) {
// Copy the key.
firstKeyInBlock = new byte[klength];
System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
}
//上一个keyvalue
lastKeyBuffer = key;
lastKeyOffset = koffset;
lastKeyLength = klength;
entryCount++;
}
初始化开始写入
/**
* Starts writing into the block. The previous block's data is discarded.
*
* @return the stream the user can write their data into
* @throws IOException
*/
public DataOutputStream startWriting(BlockType newBlockType)
throws IOException {
if (state == State.BLOCK_READY && startOffset != -1) {
// We had a previous block that was written to a stream at a specific
// offset. Save that offset as the last offset of a block of that type.
//保存着同类型block的上一个block的偏移量
prevOffsetByType[blockType.getId()] = startOffset;
}
//当前block的偏移量
startOffset = -1;
//block类型,主要有data,block,index block,meta block
blockType = newBlockType;
//临时buffer
baosInMemory.reset();
//头数据,这里是占位用,后续finish block的时候会写入正式的header数据
baosInMemory.write(DUMMY_HEADER);
//开始写
state = State.WRITING;
// We will compress it later in finishBlock()
//临时stream
userDataStream = new DataOutputStream(baosInMemory);
return userDataStream;
}
写着写着,可能block就满了,检查data block是否已满
private void checkBlockBoundary() throws IOException {
//默认64K
if (fsBlockWriter.blockSizeWritten() < blockSize)
return;
//将之前写入的userDataStream里的data block写入HDFS的outputStream,添加索引记录
finishBlock();
//如果索引快满了,则将index block写入HDFS的outputStream
writeInlineBlocks(false);
//重置状态,进入’WRITING‘状态,等待写入
newBlock();
}
具体finishBlock过程,flush数据到HDFS的outputstream
/** Clean up the current block */
private void finishBlock() throws IOException {
//前置状态’WRITING‘,userDataStream有数据写入
if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
return;
long startTimeNs = System.nanoTime();
// Update the first data block offset for scanning.
//第一个data block的偏移量
if (firstDataBlockOffset == -1) {
firstDataBlockOffset = outputStream.getPos();
}
// Update the last data block offset
//上一个data block的偏移量
lastDataBlockOffset = outputStream.getPos();
//这里将userDataStream里的数据flush到HDFS的outputStream
fsBlockWriter.writeHeaderAndData(outputStream);
//在HDFS上写了多少字节,有可能压缩后,加上了checksum数据
int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
//更新data block的索引
dataBlockIndexWriter.addEntry(firstKeyInBlock, lastDataBlockOffset,
onDiskSize);
//未压缩的数据字节数
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
HFile.offerWriteData(onDiskSize);
//更新block cache
if (cacheConf.shouldCacheDataOnWrite()) {
doCacheOnWrite(lastDataBlockOffset);
}
}
写入HDFS stream过程
public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
long offset = out.getPos();
if (startOffset != -1 && offset != startOffset) {
throw new IOException("A " + blockType + " block written to a "
+ "stream twice, first at offset " + startOffset + ", then at "
+ offset);
}
//这个块的开始位置
startOffset = offset;
//写
writeHeaderAndData((DataOutputStream) out);
}
private void writeHeaderAndData(DataOutputStream out) throws IOException {
//这个方法比较重要,当一个block需要flush到HDFS stream的时候,需要做数据做一些处理,比如压缩,编码等,设置状态为’READY‘
ensureBlockReady();
//onDiskBytesWithHeader是处理之后的数据,直接写入HDFS stream
out.write(onDiskBytesWithHeader);
if (compressAlgo == NONE) {
if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
throw new IOException("A " + blockType
+ " without compression should have checksums "
+ " stored separately.");
}
//不压缩的话,还要写入checksum
out.write(onDiskChecksum);
}
}
对buffer数据处理部分,包括压缩和编码等处理
private void finishBlock() throws IOException {
//先flush一下
userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array.
//拿到buffer中的数据,也就是当前所有写入的数据,未压缩
uncompressedBytesWithHeader = baosInMemory.toByteArray();
//上一个同类型的block偏移量
prevOffset = prevOffsetByType[blockType.getId()];
// We need to set state before we can package the block up for
// cache-on-write. In a way, the block is ready, but not yet encoded or
// compressed.
//READY,准备flush
state = State.BLOCK_READY;
//encode
encodeDataBlockForDisk();
//压缩和checksum
doCompressionAndChecksumming();
}
压缩和checksum,压缩之后,checksum数据直接写入onDiskBytesWithHeader,否则写入onDiskChecksum,不管压缩不压缩,都要写入block的header数据
private void doCompressionAndChecksumming() throws IOException {
// do the compression
if (compressAlgo != NONE) {
compressedByteStream.reset();
compressedByteStream.write(DUMMY_HEADER);
compressionStream.resetState();
compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
uncompressedBytesWithHeader.length - HEADER_SIZE);
compressionStream.flush();
compressionStream.finish();
// generate checksums
onDiskDataSizeWithHeader = compressedByteStream.size(); // data size
// reserve space for checksums in the output byte stream
ChecksumUtil.reserveSpaceForChecksums(compressedByteStream,
onDiskDataSizeWithHeader, bytesPerChecksum);
onDiskBytesWithHeader = compressedByteStream.toByteArray();
putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
// generate checksums for header and data. The checksums are
// part of onDiskBytesWithHeader itself.
ChecksumUtil.generateChecksums(
onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,
onDiskBytesWithHeader, onDiskDataSizeWithHeader,
checksumType, bytesPerChecksum);
// Checksums are already part of onDiskBytesWithHeader
onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
//set the header for the uncompressed bytes (for cache-on-write)
putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + onDiskChecksum.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
} else {
// If we are not using any compression, then the
// checksums are written to its own array onDiskChecksum.
onDiskBytesWithHeader = uncompressedBytesWithHeader;
onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;
//check份数
int numBytes = (int)ChecksumUtil.numBytes(
uncompressedBytesWithHeader.length,
bytesPerChecksum);
//checksum数据
onDiskChecksum = new byte[numBytes];
//set the header for the uncompressed bytes
//修改header
putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + onDiskChecksum.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
ChecksumUtil.generateChecksums(
uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,
onDiskChecksum, 0,
checksumType, bytesPerChecksum);
}
}
data block处理完之后,更新索引,索引项由block的firstkey,开始的偏移量,dataSize组成。索引主要有2种,leaf-level chunk和root index chunk
void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
long curTotalNumSubEntries) {
// Record the offset for the secondary index
//二级索引,记每个entry的偏移量
secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
//下一个entry的偏移地址
curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
+ firstKey.length;
//给root chunk用
curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
+ WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
//索引信息记录
blockKeys.add(firstKey);
blockOffsets.add(blockOffset);
onDiskDataSizes.add(onDiskDataSize);
//如果是root index chunk添加索引
if (curTotalNumSubEntries != -1) {
numSubEntriesAt.add(curTotalNumSubEntries);
// Make sure the parallel arrays are in sync.
if (numSubEntriesAt.size() != blockKeys.size()) {
throw new IllegalStateException("Only have key/value count " +
"stats for " + numSubEntriesAt.size() + " block index " +
"entries out of " + blockKeys.size());
}
}
}
回到前头,finishBlock之后,数据都从buffer中flush到了HDFS的stream里。这个时候给index block一个机会,检查下是否已满,满的话,将索引块flush到HDFS
private void writeInlineBlocks(boolean closing) throws IOException {
for (InlineBlockWriter ibw : inlineBlockWriters) {
//如果InlineBlock需要flush,就flush
while (ibw.shouldWriteBlock(closing)) {
long offset = outputStream.getPos();
boolean cacheThisBlock = ibw.cacheOnWrite();
//和data block写入一样,先start,再write
ibw.writeInlineBlock(fsBlockWriter.startWriting(
ibw.getInlineBlockType()));
fsBlockWriter.writeHeaderAndData(outputStream);
ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
fsBlockWriter.getUncompressedSizeWithoutHeader());
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
if (cacheThisBlock) {
doCacheOnWrite(offset);
}
}
}
}
对于BlockIndexWriter来说规则是,maxChunkSize默认128K
curInlineChunk.getNonRootSize() >= maxChunkSize;
看看BlockIndexWriter是如何写的
public void writeInlineBlock(DataOutput out) throws IOException {
if (singleLevelOnly)
throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
// Write the inline block index to the output stream in the non-root
// index block format.
//以leaf chunk写入
curInlineChunk.writeNonRoot(out);
// Save the first key of the inline block so that we can add it to the
// parent-level index.
//保留这个index block的firstkey,给后续多级索引用
firstKey = curInlineChunk.getBlockKey(0);
// Start a new inline index block
//curInlineChunk初始化,后续索引数据继续写
curInlineChunk.clear();
}
写入leaf chunk
void writeNonRoot(DataOutput out) throws IOException {
// The number of entries in the block.
//从索引记录数
out.writeInt(blockKeys.size());
if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
throw new IOException("Corrupted block index chunk writer: " +
blockKeys.size() + " entries but " +
secondaryIndexOffsetMarks.size() + " secondary index items");
}
// For each entry, write a "secondary index" of relative offsets to the
// entries from the end of the secondary index. This works, because at
// read time we read the number of entries and know where the secondary
// index ends.
//二级索引数据写入,因为每个索引entry是变长的,这个二级索引记录着每个entry的偏移信息,方便查找
for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
out.writeInt(currentSecondaryIndex);
// We include one other element in the secondary index to calculate the
// size of each entry more easily by subtracting secondary index elements.
//总大小
out.writeInt(curTotalNonRootEntrySize);
//索引数据
for (int i = 0; i < blockKeys.size(); ++i) {
out.writeLong(blockOffsets.get(i));
out.writeInt(onDiskDataSizes.get(i));
out.write(blockKeys.get(i));
}
}
索引block写入HDFS stream后,更新rootChunk索引,rootChunk是一个对data block index块的索引结构,所有keyvalue都写完后,rootChunk才会flush
到HDFS stream,会进一步分裂多级结构,但是在循环写入的时候只有2级
public void blockWritten(long offset, int onDiskSize, int uncompressedSize)
{
// Add leaf index block size
totalBlockOnDiskSize += onDiskSize;
totalBlockUncompressedSize += uncompressedSize;
if (singleLevelOnly)
throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
if (firstKey == null) {
throw new IllegalStateException("Trying to add second-level index " +
"entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
"but the first key was not set in writeInlineBlock");
}
//写入的时候,只有2级,因为rootChunk 128K足够存memstore的所有索引信息了
if (rootChunk.getNumEntries() == 0) {
// We are writing the first leaf block, so increase index level.
expectNumLevels(1);
numLevels = 2;
}
// Add another entry to the second-level index. Include the number of
// entries in all previous leaf-level chunks for mid-key calculation.
//root索引写入entry,totalNumEntries为当前子节点数,也就是leaf level chunk数目
rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
firstKey = null;
}
以上就是循环的大抵过程,data block和data index block交替写入。当所有数据都写完后,开始做close操作,看HFileWriterV2的close操作
public void close() throws IOException {
if (outputStream == null) {
return;
}
// Write out the end of the data blocks, then write meta data blocks.
// followed by fileinfo, data block index and meta block index.
//结尾数据写入stream
finishBlock();
//写index block数据
writeInlineBlocks(true);
//trailer信息
FixedFileTrailer trailer = new FixedFileTrailer(2,
HFileReaderV2.MAX_MINOR_VERSION);
// Write out the metadata blocks if any.
//meta block写入,V2里meta没用,V1里是用来存bloomfilter的
if (!metaNames.isEmpty()) {
for (int i = 0; i < metaNames.size(); ++i) {
// store the beginning offset
long offset = outputStream.getPos();
// write the metadata content
DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
metaData.get(i).write(dos);
fsBlockWriter.writeHeaderAndData(outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
// Add the new meta block to the meta index.
metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
fsBlockWriter.getOnDiskSizeWithHeader());
}
}
// Load-on-open section.
// Data block index.
//
// In version 2, this section of the file starts with the root level data
// block index. We call a function that writes intermediate-level blocks
// first, then root level, and returns the offset of the root level block
// index.
//这里将rootChunk分裂成子树,如果rootChunk很大,按照128K分裂成子block并写入HDFS,如果一开始索引树有2层的话,结果索引树会有3层
//也就是多了一个中间层
long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
trailer.setLoadOnOpenOffset(rootIndexOffset);
// Meta block index.
//写入meta block
metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
BlockType.ROOT_INDEX), "meta");
fsBlockWriter.writeHeaderAndData(outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
if (this.includeMemstoreTS) {
appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
}
//写file info
// File info
writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
fsBlockWriter.writeHeaderAndData(outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
// Load-on-open data supplied by higher levels, e.g. Bloom filters.
for (BlockWritable w : additionalLoadOnOpenData){
fsBlockWriter.writeBlock(w, outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
}
//写trailer
// Now finish off the trailer.
trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
trailer.setUncompressedDataIndexSize(
dataBlockIndexWriter.getTotalUncompressedSize());
trailer.setFirstDataBlockOffset(firstDataBlockOffset);
trailer.setLastDataBlockOffset(lastDataBlockOffset);
trailer.setComparatorClass(comparator.getClass());
trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
finishClose(trailer);
fsBlockWriter.releaseCompressor();
}