天天看點

lucene索引源碼分析2

上一篇文章大概講了索引從indexwriter到defaultindexchain的過程,也分析了defaultindexchain的基本流程,

主要就是:

将dwpt接收的每個文檔一條條處理---》對每一條文檔再按Field依次處理---》對每個Field依據他是否分詞,是否存儲是否有docvalue再分别處理。

可見每個dwpt之間是并行的做事情,每個dwpt内是串行的做事情。

每個field的具體處理是需要寫termsHashPerField的資訊,而這個資訊是被termsHash統一管理的,可以把termsHash了解為一個dwpt中共享的緩沖區,主要用于在記憶體中建立索引,并在需要fulsh的時候刷入磁盤,每一個dwpt被刷入磁盤後其實就是一個段,當然如果段的大小被設定的話,可能還需要進行段合并之類的。這是後話,我們将在以後專門分析flush,在本文中我們主要繼續上一篇分析pf.invert(),代碼如下

public void invert(IndexableField field, boolean first) throws IOException, AbortingException {
      if (first) {
        // First time we're seeing this field (indexed) in
        // this document:
        invertState.reset();      //第一次的話需要重置資訊
      }
      
      IndexableFieldType fieldType = field.fieldType();
      
      IndexOptions indexOptions = fieldType.indexOptions();
      fieldInfo.setIndexOptions(indexOptions);
      
      if (fieldType.omitNorms()) {
        fieldInfo.setOmitsNorms();
      }
      
      final boolean analyzed = fieldType.tokenized() && docState.analyzer != null;
      
      // only bother checking offsets if something will consume them.
      // TODO: after we fix analyzers, also check if termVectorOffsets will be indexed.
      final boolean checkOffsets = indexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS;
      
      /*
       * To assist people in tracking down problems in analysis components, we wish to write the field name to the
       * infostream when we fail. We expect some caller to eventually deal with the real exception, so we don't want any
       * 'catch' clauses, but rather a finally that takes note of the problem.
       */
      boolean succeededInProcessingField = false;
      try (TokenStream stream = tokenStream = field.tokenStream(docState.analyzer, tokenStream)) {    //擷取field内容的token流
        // reset the TokenStream to the first token
        stream.reset();
        invertState.setAttributeSource(stream);
        termsHashPerField.start(field, first);
        
        while (stream.incrementToken()) {   //對field内容中分出的每一個term進行處理
          
          // If we hit an exception in stream.next below
          // (which is fairly common, e.g. if analyzer
          // chokes on a given document), then it's
          // non-aborting and (above) this one document
          // will be marked as deleted, but still
          // consume a docID
          
          int posIncr = invertState.posIncrAttribute.getPositionIncrement();
          invertState.position += posIncr;
          if (invertState.position < invertState.lastPosition) {
            if (posIncr == 0) {
              throw new IllegalArgumentException("first position increment must be > 0 (got 0) for field '"
                  + field.name() + "'");
            } else {
              throw new IllegalArgumentException("position increments (and gaps) must be >= 0 (got " + posIncr
                  + ") for field '" + field.name() + "'");
            }
          } else if (invertState.position > IndexWriter.MAX_POSITION) {
            throw new IllegalArgumentException("position " + invertState.position + " is too large for field '"
                + field.name() + "': max allowed position is " + IndexWriter.MAX_POSITION);
          }
          invertState.lastPosition = invertState.position;
          if (posIncr == 0) {
            invertState.numOverlap++;
          }
          
          if (checkOffsets) {
            int startOffset = invertState.offset + invertState.offsetAttribute.startOffset();
            int endOffset = invertState.offset + invertState.offsetAttribute.endOffset();
            if (startOffset < invertState.lastStartOffset || endOffset < startOffset) {
              throw new IllegalArgumentException(
                  "startOffset must be non-negative, and endOffset must be >= startOffset, and offsets must not go backwards "
                      + "startOffset=" + startOffset + ",endOffset=" + endOffset + ",lastStartOffset="
                      + invertState.lastStartOffset + " for field '" + field.name() + "'");
            }
            invertState.lastStartOffset = startOffset;
          }
          
          invertState.length++;
          if (invertState.length < 0) {
            throw new IllegalArgumentException("too many tokens in field '" + field.name() + "'");
          }
          // System.out.println("  term=" + invertState.termAttribute);
          
          // If we hit an exception in here, we abort
          // all buffered documents since the last
          // flush, on the likelihood that the
          // internal state of the terms hash is now
          // corrupt and should not be flushed to a
          // new segment:
          try {
            termsHashPerField.add();      //将term加入目前記憶體中的索引結構,主要有兩個處理鍊,一個是FreqProxTermsWriterPerField,另一個是TermVectorsConsumerPerField
          } catch (MaxBytesLengthExceededException e) {
            byte[] prefix = new byte[30];
            BytesRef bigTerm = invertState.termAttribute.getBytesRef();
            System.arraycopy(bigTerm.bytes, bigTerm.offset, prefix, 0, 30);
            String msg = "Document contains at least one immense term in field=\""
                + fieldInfo.name
                + "\" (whose UTF8 encoding is longer than the max length "
                + DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8
                + "), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is: '"
                + Arrays.toString(prefix) + "...', original message: " + e.getMessage();
            if (docState.infoStream.isEnabled("IW")) {
              docState.infoStream.message("IW", "ERROR: " + msg);
            }
            // Document will be deleted above:
            throw new IllegalArgumentException(msg, e);
          } catch (Throwable th) {
            throw AbortingException.wrap(th);
          }
        }
        
        // trigger streams to perform end-of-stream operations
        stream.end();   //token流結束
        
        // TODO: maybe add some safety? then again, it's already checked
        // when we come back around to the field...
        invertState.position += invertState.posIncrAttribute.getPositionIncrement();
        invertState.offset += invertState.offsetAttribute.endOffset();
        
        /* if there is an exception coming through, we won't set this to true here: */
        succeededInProcessingField = true;
      } finally {
        if (!succeededInProcessingField && docState.infoStream.isEnabled("DW")) {
          docState.infoStream.message("DW", "An exception was thrown while processing field " + fieldInfo.name);
        }
      }
      
      if (analyzed) {
        invertState.position += docState.analyzer.getPositionIncrementGap(fieldInfo.name);
        invertState.offset += docState.analyzer.getOffsetGap(fieldInfo.name);
      }
      
      invertState.boost *= field.boost();
    }
  }
           

termsHashPerField.add()方法比較重要,他負責利用token填充記憶體中的緩沖區,代碼如下:

void add() throws IOException {
    // We are first in the chain so we must "intern" the
    // term text into textStart address
    // Get the text & hash of this term.
    int termID = bytesHash.add(termAtt.getBytesRef()); // 判斷term是否存在,存在傳回大于0,否則小于零
    
    // System.out.println("add term=" + termBytesRef.utf8ToString() + " doc=" + docState.docID + " termID=" + termID);
    
    if (termID >= 0) {// New posting
      bytesHash.byteStart(termID);
      // Init stream slices
      if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
        intPool.nextBuffer(); // intPool配置設定空間
      }
      
      if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt * ByteBlockPool.FIRST_LEVEL_SIZE) {
        bytePool.nextBuffer(); // bytePool配置設定空間
      }
      
      intUptos = intPool.buffer;
      intUptoStart = intPool.intUpto;
      intPool.intUpto += streamCount;
      
      postingsArray.intStarts[termID] = intUptoStart + intPool.intOffset; // 指派intstart
      
      for (int i = 0; i < streamCount; i++) {
        final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
        intUptos[intUptoStart + i] = upto + bytePool.byteOffset;
      }
      postingsArray.byteStarts[termID] = intUptos[intUptoStart]; // 指派bytestart
      
      newTerm(termID); // 新加一個term,調用freproxtermwriter
      
    } else {
      termID = (-termID) - 1;
      int intStart = postingsArray.intStarts[termID];
      intUptos = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
      intUptoStart = intStart & IntBlockPool.INT_BLOCK_MASK;
      addTerm(termID);
    }
    
    if (doNextCall) {
      nextPerField.add(postingsArray.textStarts[termID]); // 繼續一次,在term處理時會調用termvectorconsumer,調用另外一個add函數
    }
  }
           

在這裡涉及到了postingsArray,intPool,bytePool,這三者的關系可以用一張圖檔表示:

lucene索引源碼分析2

我盡量用一句話來解釋,postingsArray是一個二維數組,每一行是一個term,textstart記錄了這個term的值在bytepool中的偏移量,bytestart是結束位址,intstart是在intpool中的偏移量。intpool和byte是所有field共享的,postingsArray是每個field獨有的,是以真正寫入段的資訊是通過bytepool和intpool,這一點很重要,是以操作這兩個空間一定是串行的。而lucene的執行流程也保證了這一點。

具體解釋可以檢視這一篇文章http://blog.csdn.net/liweisnake/article/details/11364597

這裡的關系比較難以搞懂,盜圖一張:

lucene索引源碼分析2

接着上面的代碼,addTerm()和newTerm()是具體寫入bytepool和intpool的,而newTerm分别又是調用了freproxtermwriter以及termvectorconsumer中的newTerm()方法。下面先來看看freproxtermwriter.newTerm()

void newTerm(final int termID) {
    // First time we're seeing this term since the last
    // flush
    final FreqProxPostingsArray postings = freqProxPostingsArray;

    postings.lastDocIDs[termID] = docState.docID;
    if (!hasFreq) {
      assert postings.termFreqs == null;
      postings.lastDocCodes[termID] = docState.docID;
    } else {
      postings.lastDocCodes[termID] = docState.docID << 1;
      postings.termFreqs[termID] = 1;
      if (hasProx) {
        writeProx(termID, fieldState.position);   //将位置資訊寫入bytepool
        if (hasOffsets) {
          writeOffsets(termID, fieldState.offset);  //将位置偏移量寫入bytepool
        }
      } else {
        assert !hasOffsets;
      }
    }
    fieldState.maxTermFrequency = Math.max(1, fieldState.maxTermFrequency);
    fieldState.uniqueTermCount++;
  }
           

termvectorconsumer.newTerm()的過程類似,也是寫入intpool和bytepool,但是這兩個newTerm寫入的内容不一樣,前者寫入詞頻,位置資訊,後者寫入Term長度和term的内容。回到剛才的add函數,裡面調用了   int termID = bytesHash.add(termAtt.getBytesRef()); 如果term存在則會傳回一個小于0的數也就是-(termID+1),如果不存在會在bytepool中寫入term的長度和值代碼如下:

public int add(BytesRef bytes) {
    assert bytesStart != null : "Bytesstart is null - not initialized";
    final int length = bytes.length;
    // final position
    final int hashPos = findHash(bytes);
    int e = ids[hashPos];
    
    if (e == -1) {
      // new entry
      final int len2 = 2 + bytes.length;
      if (len2 + pool.byteUpto > BYTE_BLOCK_SIZE) {
        if (len2 > BYTE_BLOCK_SIZE) {
          throw new MaxBytesLengthExceededException("bytes can be at most "
              + (BYTE_BLOCK_SIZE - 2) + " in length; got " + bytes.length);
        }
        pool.nextBuffer();
      }
      final byte[] buffer = pool.buffer;
      final int bufferUpto = pool.byteUpto;
      if (count >= bytesStart.length) {
        bytesStart = bytesStartArray.grow();
        assert count < bytesStart.length + 1 : "count: " + count + " len: "
            + bytesStart.length;
      }
      e = count++;

      bytesStart[e] = bufferUpto + pool.byteOffset;     //在二維數組中寫入bytestart

      // We first encode the length, followed by the
      // bytes. Length is encoded as vInt, but will consume
      // 1 or 2 bytes at most (we reject too-long terms,
      // above).
      if (length < 128) {
        // 1 byte to store length
        buffer[bufferUpto] = (byte) length;     //term的長度
        pool.byteUpto += length + 1;
        assert length >= 0: "Length must be positive: " + length;
        System.arraycopy(bytes.bytes, bytes.offset, buffer, bufferUpto + 1, 
            length);                            //term的值
      } else {
        // 2 byte to store length
        buffer[bufferUpto] = (byte) (0x80 | (length & 0x7f));
        buffer[bufferUpto + 1] = (byte) ((length >> 7) & 0xff);
        pool.byteUpto += length + 2;
        System.arraycopy(bytes.bytes, bytes.offset, buffer, bufferUpto + 2,
            length);
      }
      assert ids[hashPos] == -1;
      ids[hashPos] = e;       //寫入termID
      
      if (count == hashHalfSize) {
        rehash(2 * hashSize, true);
      }
      return e;   //傳回termID
    }
    return -(e + 1);  //存在傳回-(termID+1)的
  }
           

至此可以看出這裡面寫入了倒排資訊,具體怎麼和索引檔案對應,我将在下一篇文章中繼續分析。

如有不正确,請指正。

繼續閱讀