天天看點

[HBase]KeyValue and HFile create

HBase put資料時會先将資料寫入記憶體,其記憶體結構是一個ConcurrentSkipListMap,其Comparator是KVComparator。

keyvalue對象結構

[HBase]KeyValue and HFile create

KVComparator的KeyValue對象比較過程

1.使用KeyComparator比較rowkey,結果是rowkey位元組序從小到大

2.如果rowkey一樣,則按column family比較,結果是column family位元組序從小到大

3.如果column family一樣,則按family+qualifier比較,結果是qualifier位元組序從小到大

4.如果qualifier也一樣,則按timestamp排序,結果是timestamp從大到小排序

5.如果timestamp也一樣,則按type排序,delete在put之前

6.以上都一樣,則按照memstoreTS排序,memstoreTS是原子遞增id,不可能一樣,結果是memstoreTS從大到小排序,越新的修改會排前面,友善scan

可見KeyValue對象在記憶體裡其實是已經排序好了,flush生成檔案的時候,隻是簡單的scan一下,設定maxVersion(在這裡超過maxVersion的put自動失效了),将每個KeyValue對象寫入HDFS

Flush生成HFile的過程大抵如下

1.構造Writer,最新版本是HFileWriterV2,第2版

2.循環将KeyValue對象append到writer,這裡會按block寫入cache,預設64k,每64k,就要重新new一個block,每次finish一個block,就會添加一條索引記錄到block index,到block index超過一定限制(預設124K),則寫入一個特殊的InlineBlock,代表這是一個索引塊,HFile就是data block和inline block交替結構

3.KeyValue對象寫完後,再将索引資料以inline block的形式全部寫入,最後寫入root index,fileInfo等資訊。

HFile V2結構

[HBase]KeyValue and HFile create

其實作類圖如下

[HBase]KeyValue and HFile create

主流程

Scan scan = new Scan();
	//最多保留的版本,預設為3
    scan.setMaxVersions(scanInfo.getMaxVersions());
    // Use a store scanner to find which rows to flush.
    // Note that we need to retain deletes, hence
    // treat this as a minor compaction.
    InternalScanner scanner = new StoreScanner(this, scan, Collections
        .singletonList(new CollectionBackedScanner(set, this.comparator)),
        ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
        HConstants.OLDEST_TIMESTAMP);
    try {
      // TODO:  We can fail in the below block before we complete adding this
      // flush to list of store files.  Add cleanup of anything put on filesystem
      // if we fail.
      synchronized (flushLock) {
        status.setStatus("Flushing " + this + ": creating writer");
        // A. Write the map out to the disk
        writer = createWriterInTmp(set.size());
        writer.setTimeRangeTracker(snapshotTimeRangeTracker);
        pathName = writer.getPath();
        try {
          List<KeyValue> kvs = new ArrayList<KeyValue>();
          boolean hasMore;
          do {
            hasMore = scanner.next(kvs);
            if (!kvs.isEmpty()) {
              for (KeyValue kv : kvs) {
                // If we know that this KV is going to be included always, then let us
                // set its memstoreTS to 0. This will help us save space when writing to disk.
                if (kv.getMemstoreTS() <= smallestReadPoint) {
                  // let us not change the original KV. It could be in the memstore
                  // changing its memstoreTS could affect other threads/scanners.
                  kv = kv.shallowCopy();
                  kv.setMemstoreTS(0);
                }
		//append寫keyvalue
                writer.append(kv);
                flushed += this.memstore.heapSizeChange(kv, true);
              }
              kvs.clear();
            }
          } while (hasMore);
        } finally {
          // Write out the log sequence number that corresponds to this output
          // hfile.  The hfile is current up to and including logCacheFlushId.
          status.setStatus("Flushing " + this + ": appending metadata");
          writer.appendMetadata(logCacheFlushId, false);
          status.setStatus("Flushing " + this + ": closing flushed file");
	//寫入中繼資料
          writer.close();
        }
      }
    } finally {
      flushedSize.set(flushed);
      scanner.close();
    }
           

 append過程

private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
      final byte[] value, final int voffset, final int vlength)
      throws IOException {
	//檢查key順序,傳回是否和上一個key重複
    boolean dupKey = checkKey(key, koffset, klength);
    checkValue(value, voffset, vlength);
	//如果是新key,才檢查block是否超過限制,也就是說同樣的key保證在同一個block data裡
	//如果block超過64K限制,則開始将block寫入HDFS的outputstream,不flush,同時更新block index資訊
    if (!dupKey) {
      checkBlockBoundary();
    }
	//初始化的時候重置狀态,準備開始寫入data block了
    if (!fsBlockWriter.isWriting())
      newBlock();

    // Write length of key and value and then actual key and value bytes.
    // Additionally, we may also write down the memstoreTS.
    {
	//userDataStream是臨時的,如果block滿了之後,會将裡面的資料flush到HDFS的outputstream
	//這裡将keyvalue對象順序寫入
      DataOutputStream out = fsBlockWriter.getUserDataStream();
      out.writeInt(klength);
      totalKeyLength += klength;
      out.writeInt(vlength);
      totalValueLength += vlength;
      out.write(key, koffset, klength);
      out.write(value, voffset, vlength);
      if (this.includeMemstoreTS) {
        WritableUtils.writeVLong(out, memstoreTS);
      }
    }

    // Are we the first key in this block?
	//block的第一個key,後續會作為data index的entry屬性
    if (firstKeyInBlock == null) {
      // Copy the key.
      firstKeyInBlock = new byte[klength];
      System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
    }
	//上一個keyvalue
    lastKeyBuffer = key;
    lastKeyOffset = koffset;
    lastKeyLength = klength;
    entryCount++;
  }
           

 初始化開始寫入

/**
     * Starts writing into the block. The previous block's data is discarded.
     *
     * @return the stream the user can write their data into
     * @throws IOException
     */
    public DataOutputStream startWriting(BlockType newBlockType)
        throws IOException {
      if (state == State.BLOCK_READY && startOffset != -1) {
        // We had a previous block that was written to a stream at a specific
        // offset. Save that offset as the last offset of a block of that type.
	//儲存着同類型block的上一個block的偏移量
        prevOffsetByType[blockType.getId()] = startOffset;
      }
	//目前block的偏移量
      startOffset = -1;
	//block類型,主要有data,block,index block,meta block
      blockType = newBlockType;
	//臨時buffer
      baosInMemory.reset();
	//頭資料,這裡是占位用,後續finish block的時候會寫入正式的header資料
      baosInMemory.write(DUMMY_HEADER);
	//開始寫
      state = State.WRITING;

      // We will compress it later in finishBlock()
	//臨時stream
      userDataStream = new DataOutputStream(baosInMemory);
      return userDataStream;
    }
           

 寫着寫着,可能block就滿了,檢查data block是否已滿

private void checkBlockBoundary() throws IOException {
	//預設64K
    if (fsBlockWriter.blockSizeWritten() < blockSize)
      return;
	//将之前寫入的userDataStream裡的data block寫入HDFS的outputStream,添加索引記錄	
    finishBlock();
	//如果索引快滿了,則将index block寫入HDFS的outputStream
    writeInlineBlocks(false);
	//重置狀态,進入’WRITING‘狀态,等待寫入
    newBlock();
  }
           

 具體finishBlock過程,flush資料到HDFS的outputstream

/** Clean up the current block */
  private void finishBlock() throws IOException {
	//前置狀态’WRITING‘,userDataStream有資料寫入
    if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
      return;

    long startTimeNs = System.nanoTime();

    // Update the first data block offset for scanning.
	//第一個data block的偏移量
    if (firstDataBlockOffset == -1) {
      firstDataBlockOffset = outputStream.getPos();
    }

    // Update the last data block offset
	//上一個data block的偏移量
    lastDataBlockOffset = outputStream.getPos();

	//這裡将userDataStream裡的資料flush到HDFS的outputStream
    fsBlockWriter.writeHeaderAndData(outputStream);
	//在HDFS上寫了多少位元組,有可能壓縮後,加上了checksum資料
    int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
	//更新data block的索引
    dataBlockIndexWriter.addEntry(firstKeyInBlock, lastDataBlockOffset,
        onDiskSize);
	//未壓縮的資料位元組數
    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

    HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
    HFile.offerWriteData(onDiskSize);
	//更新block cache    
    if (cacheConf.shouldCacheDataOnWrite()) {
      doCacheOnWrite(lastDataBlockOffset);
    }
  }
           

 寫入HDFS stream過程

public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
      long offset = out.getPos();
      if (startOffset != -1 && offset != startOffset) {
        throw new IOException("A " + blockType + " block written to a "
            + "stream twice, first at offset " + startOffset + ", then at "
            + offset);
      }
	//這個塊的開始位置
      startOffset = offset;
	//寫
      writeHeaderAndData((DataOutputStream) out);
    }

    private void writeHeaderAndData(DataOutputStream out) throws IOException {
	//這個方法比較重要,當一個block需要flush到HDFS stream的時候,需要做資料做一些處理,比如壓縮,編碼等,設定狀态為’READY‘
      ensureBlockReady();
	//onDiskBytesWithHeader是處理之後的資料,直接寫入HDFS stream
      out.write(onDiskBytesWithHeader);
      if (compressAlgo == NONE) {
        if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
          throw new IOException("A " + blockType 
              + " without compression should have checksums " 
              + " stored separately.");
        }
	//不壓縮的話,還要寫入checksum
        out.write(onDiskChecksum);
      }
    }
           

 對buffer資料處理部分,包括壓縮和編碼等處理

private void finishBlock() throws IOException {
	//先flush一下
      userDataStream.flush();

      // This does an array copy, so it is safe to cache this byte array.
	//拿到buffer中的資料,也就是目前所有寫入的資料,未壓縮
      uncompressedBytesWithHeader = baosInMemory.toByteArray();
	//上一個同類型的block偏移量
      prevOffset = prevOffsetByType[blockType.getId()];

      // We need to set state before we can package the block up for
      // cache-on-write. In a way, the block is ready, but not yet encoded or
      // compressed.
	//READY,準備flush
      state = State.BLOCK_READY;
	//encode
      encodeDataBlockForDisk();
	//壓縮和checksum
      doCompressionAndChecksumming();
    }
           

 壓縮和checksum,壓縮之後,checksum資料直接寫入onDiskBytesWithHeader,否則寫入onDiskChecksum,不管壓縮不壓縮,都要寫入block的header資料

private void doCompressionAndChecksumming() throws IOException {
      // do the compression
      if (compressAlgo != NONE) {
        compressedByteStream.reset();
        compressedByteStream.write(DUMMY_HEADER);

        compressionStream.resetState();

        compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
            uncompressedBytesWithHeader.length - HEADER_SIZE);

        compressionStream.flush();
        compressionStream.finish();

        // generate checksums
        onDiskDataSizeWithHeader = compressedByteStream.size(); // data size

        // reserve space for checksums in the output byte stream
        ChecksumUtil.reserveSpaceForChecksums(compressedByteStream, 
          onDiskDataSizeWithHeader, bytesPerChecksum);


        onDiskBytesWithHeader = compressedByteStream.toByteArray();
        putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
            uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);

       // generate checksums for header and data. The checksums are
       // part of onDiskBytesWithHeader itself.
       ChecksumUtil.generateChecksums(
         onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,
         onDiskBytesWithHeader, onDiskDataSizeWithHeader,
         checksumType, bytesPerChecksum);

        // Checksums are already part of onDiskBytesWithHeader
        onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;

        //set the header for the uncompressed bytes (for cache-on-write)
        putHeader(uncompressedBytesWithHeader, 0,
          onDiskBytesWithHeader.length + onDiskChecksum.length,
          uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);

      } else {
        // If we are not using any compression, then the
        // checksums are written to its own array onDiskChecksum.
        onDiskBytesWithHeader = uncompressedBytesWithHeader;

        onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;
	//check份數
        int numBytes = (int)ChecksumUtil.numBytes(
                          uncompressedBytesWithHeader.length,
                          bytesPerChecksum);
	//checksum資料
        onDiskChecksum = new byte[numBytes];

        //set the header for the uncompressed bytes
	//修改header
        putHeader(uncompressedBytesWithHeader, 0,
          onDiskBytesWithHeader.length + onDiskChecksum.length,
          uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);

        ChecksumUtil.generateChecksums(
          uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,
          onDiskChecksum, 0,
          checksumType, bytesPerChecksum);
      }
    }
           

 data block處理完之後,更新索引,索引項由block的firstkey,開始的偏移量,dataSize組成。索引主要有2種,leaf-level chunk和root index chunk

void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
        long curTotalNumSubEntries) {
      // Record the offset for the secondary index
	//二級索引,記每個entry的偏移量
      secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
	//下一個entry的偏移位址
      curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
          + firstKey.length;
	//給root chunk用
      curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
          + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
	//索引資訊記錄
      blockKeys.add(firstKey);
      blockOffsets.add(blockOffset);
      onDiskDataSizes.add(onDiskDataSize);
	//如果是root index chunk添加索引
      if (curTotalNumSubEntries != -1) {
        numSubEntriesAt.add(curTotalNumSubEntries);

        // Make sure the parallel arrays are in sync.
        if (numSubEntriesAt.size() != blockKeys.size()) {
          throw new IllegalStateException("Only have key/value count " +
              "stats for " + numSubEntriesAt.size() + " block index " +
              "entries out of " + blockKeys.size());
        }
      }
    }
           

 回到前頭,finishBlock之後,資料都從buffer中flush到了HDFS的stream裡。這個時候給index block一個機會,檢查下是否已滿,滿的話,将索引塊flush到HDFS

private void writeInlineBlocks(boolean closing) throws IOException {
    for (InlineBlockWriter ibw : inlineBlockWriters) {
	//如果InlineBlock需要flush,就flush
      while (ibw.shouldWriteBlock(closing)) {
        long offset = outputStream.getPos();
        boolean cacheThisBlock = ibw.cacheOnWrite();
	//和data block寫入一樣,先start,再write
        ibw.writeInlineBlock(fsBlockWriter.startWriting(
            ibw.getInlineBlockType()));
        fsBlockWriter.writeHeaderAndData(outputStream);
        ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
            fsBlockWriter.getUncompressedSizeWithoutHeader());
        totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

        if (cacheThisBlock) {
          doCacheOnWrite(offset);
        }
      }
    }
  }
           

 對于BlockIndexWriter來說規則是,maxChunkSize預設128K

curInlineChunk.getNonRootSize() >= maxChunkSize;
           

 看看BlockIndexWriter是如何寫的

public void writeInlineBlock(DataOutput out) throws IOException {
      if (singleLevelOnly)
        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);

      // Write the inline block index to the output stream in the non-root
      // index block format.
	//以leaf chunk寫入
      curInlineChunk.writeNonRoot(out);

      // Save the first key of the inline block so that we can add it to the
      // parent-level index.
	//保留這個index block的firstkey,給後續多級索引用
      firstKey = curInlineChunk.getBlockKey(0);
	
      // Start a new inline index block
	//curInlineChunk初始化,後續索引資料繼續寫
      curInlineChunk.clear();
    }
           

 寫入leaf chunk

void writeNonRoot(DataOutput out) throws IOException {
      // The number of entries in the block.
	//從索引記錄數
      out.writeInt(blockKeys.size());
	
      if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
        throw new IOException("Corrupted block index chunk writer: " +
            blockKeys.size() + " entries but " +
            secondaryIndexOffsetMarks.size() + " secondary index items");
      }

      // For each entry, write a "secondary index" of relative offsets to the
      // entries from the end of the secondary index. This works, because at
      // read time we read the number of entries and know where the secondary
      // index ends.
	//二級索引資料寫入,因為每個索引entry是變長的,這個二級索引記錄着每個entry的偏移資訊,友善查找
      for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
        out.writeInt(currentSecondaryIndex);

      // We include one other element in the secondary index to calculate the
      // size of each entry more easily by subtracting secondary index elements.
	//總大小
      out.writeInt(curTotalNonRootEntrySize);
	//索引資料
      for (int i = 0; i < blockKeys.size(); ++i) {
        out.writeLong(blockOffsets.get(i));
        out.writeInt(onDiskDataSizes.get(i));
        out.write(blockKeys.get(i));
      }
    }
           

 索引block寫入HDFS stream後,更新rootChunk索引,rootChunk是一個對data block index塊的索引結構,所有keyvalue都寫完後,rootChunk才會flush

到HDFS stream,會進一步分裂多級結構,但是在循環寫入的時候隻有2級

public void blockWritten(long offset, int onDiskSize, int uncompressedSize)
    {
      // Add leaf index block size
      totalBlockOnDiskSize += onDiskSize;
      totalBlockUncompressedSize += uncompressedSize;

      if (singleLevelOnly)
        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);

      if (firstKey == null) {
        throw new IllegalStateException("Trying to add second-level index " +
            "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
            "but the first key was not set in writeInlineBlock");
      }
	//寫入的時候,隻有2級,因為rootChunk 128K足夠存memstore的所有索引資訊了
      if (rootChunk.getNumEntries() == 0) {
        // We are writing the first leaf block, so increase index level.
        expectNumLevels(1);
        numLevels = 2;
      }

      // Add another entry to the second-level index. Include the number of
      // entries in all previous leaf-level chunks for mid-key calculation.
	//root索引寫入entry,totalNumEntries為目前子節點數,也就是leaf level chunk數目
      rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
      firstKey = null;
    }
           

 以上就是循環的大抵過程,data block和data index block交替寫入。當所有資料都寫完後,開始做close操作,看HFileWriterV2的close操作

public void close() throws IOException {
    if (outputStream == null) {
      return;
    }
    // Write out the end of the data blocks, then write meta data blocks.
    // followed by fileinfo, data block index and meta block index.
	//結尾資料寫入stream
    finishBlock();
	//寫index block資料
    writeInlineBlocks(true);
	//trailer資訊
    FixedFileTrailer trailer = new FixedFileTrailer(2, 
                                 HFileReaderV2.MAX_MINOR_VERSION);

    // Write out the metadata blocks if any.
	//meta block寫入,V2裡meta沒用,V1裡是用來存bloomfilter的
    if (!metaNames.isEmpty()) {
      for (int i = 0; i < metaNames.size(); ++i) {
        // store the beginning offset
        long offset = outputStream.getPos();
        // write the metadata content
        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
        metaData.get(i).write(dos);

        fsBlockWriter.writeHeaderAndData(outputStream);
        totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

        // Add the new meta block to the meta index.
        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
            fsBlockWriter.getOnDiskSizeWithHeader());
      }
    }

    // Load-on-open section.

    // Data block index.
    //
    // In version 2, this section of the file starts with the root level data
    // block index. We call a function that writes intermediate-level blocks
    // first, then root level, and returns the offset of the root level block
    // index.

	//這裡将rootChunk分裂成子樹,如果rootChunk很大,按照128K分裂成子block并寫入HDFS,如果一開始索引樹有2層的話,結果索引樹會有3層
	//也就是多了一個中間層
    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
    trailer.setLoadOnOpenOffset(rootIndexOffset);

    // Meta block index.
	//寫入meta block
    metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
        BlockType.ROOT_INDEX), "meta");
    fsBlockWriter.writeHeaderAndData(outputStream);
    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

    if (this.includeMemstoreTS) {
      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
    }
	//寫file info
    // File info
    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
    fsBlockWriter.writeHeaderAndData(outputStream);
    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
    for (BlockWritable w : additionalLoadOnOpenData){
      fsBlockWriter.writeBlock(w, outputStream);
      totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
    }

	//寫trailer
    // Now finish off the trailer.
    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
    trailer.setUncompressedDataIndexSize(
        dataBlockIndexWriter.getTotalUncompressedSize());
    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
    trailer.setLastDataBlockOffset(lastDataBlockOffset);
    trailer.setComparatorClass(comparator.getClass());
    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());


    finishClose(trailer);

    fsBlockWriter.releaseCompressor();
  }
           

繼續閱讀