天天看點

lucene添加文檔源碼解析(第二篇)

在lucene中添加文檔是通過IndexWriter.addDocument方法,我們先給出添加文檔的示例代碼

IndexWriterConfig config = new IndexWriterConfig(new WhitespaceAnalyzer());
        config.setUseCompoundFile(false);
        config.setMaxBufferedDocs(2);
        IndexWriter writer = new IndexWriter(dir, config);
        //
        FieldType type = new FieldType();
        type.setStored(true);
        type.setTokenized(true);
        type.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
        type.setStoreTermVectors(true);
        type.setStoreTermVectorPositions(true);
        type.setStoreTermVectorOffsets(true);
        type.freeze();
        //
        Document doc = new Document();
        doc.add(new Field("content", "one", type));
        writer.addDocument(doc);           

複制

上一篇文章中介紹了lucene添加、修改文檔前的流程,在這一篇文章中,介紹處理文檔的流程。

lucene添加文檔源碼解析(第二篇)

lucene添加文檔的流程

1. 擷取ThreadState源碼解析

lucene在處理文檔前需要擷取一個ThreadState

ThreadState obtainAndLock() {
    // 從DocumentsWriterPerThreadPool中擷取ThreadState
    final ThreadState perThread = perThreadPool.getAndLock(Thread
        .currentThread(), documentsWriter);
    boolean success = false;
    try {
      // 如果擷取的ThreadState已經初始化DWPT,并且該ThreadState的deleteQueue
      // 和IndexWriter的deleteQueue不一緻
      if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
        // There is a flush-all in process and this DWPT is
        // now stale -- enroll it for flush and try for
        // another DWPT:
        // 有一個full flush(IndexWriter.flush)操作正在進行中,如果該ThreadState關聯的DWPT
        // 索引的文檔數大于0,将該ThreadState添加到fullFlushBuffer中,否則重置該DWPT
        addFlushableState(perThread);
      }
      success = true;
      // simply return the ThreadState even in a flush all case sine we already hold the lock
      return perThread;
    } finally {
      if (!success) { // make sure we unlock if this fails
        perThreadPool.release(perThread);
      }
    }
  }           

複制

/** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
  // 擷取ThreadState
  ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
    ThreadState threadState = null;
    synchronized (this) {
      // 如果freeList中ThreadState為空,建立新的ThreadState,并且将其添加到threadStates中
      if (freeList.isEmpty()) {
        // ThreadState is already locked before return by this method:
        return newThreadState();
      } else {
        // Important that we are LIFO here! This way if number of concurrent indexing 
        // threads was once high, but has now reduced, we only use a
        // limited number of thread states:
        // LIFO(後進先出),擷取最後一個ThreadState
        threadState = freeList.remove(freeList.size()-1);
        // 如果最後一個ThreadState中的DWPT為空
        if (threadState.dwpt == null) {
          // This thread-state is not initialized, e.g. it
          // was just flushed. See if we can instead find
          // another free thread state that already has docs
          // indexed. This way if incoming thread concurrency
          // has decreased, we don't leave docs
          // indefinitely buffered, tying up RAM.  This
          // will instead get those thread states flushed,
          // freeing up RAM for larger segment flushes:
          // 從頭到尾周遊freeList中的ThreadState,尋找DWPT不為空的ThreadState,
          // 并且将其和最後一個元素進行交換
          for(int i=0;i<freeList.size();i++) {
            ThreadState ts = freeList.get(i);
            if (ts.dwpt != null) {
              // Use this one instead, and swap it with
              // the un-initialized one:
              freeList.set(i, threadState);
              threadState = ts;
              break;
            }
          }
        }
      }
    }
   
    // This could take time, e.g. if the threadState is [briefly] checked for flushing:
    // 對選中的ThreadState進行鎖定
    threadState.lock();

    return threadState;
  }            

複制

2. DocumentsWriterPerThread.updateDocument更新文檔源碼解析

public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
    testPoint("DocumentsWriterPerThread addDocument start");
    assert deleteQueue != null;
    // pendingNumDocs計數遞增
    reserveOneDoc();
    docState.doc = doc;
    docState.analyzer = analyzer;
    docState.docID = numDocsInRAM;
    if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
      infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
    }
    // Even on exception, the document is still added (but marked
    // deleted), so we don't need to un-reserve at that point.
    // Aborting exceptions will actually "lose" more than one
    // document, so the counter will be "wrong" in that case, but
    // it's very hard to fix (we can't easily distinguish aborting
    // vs non-aborting exceptions):
    boolean success = false;
    try {
      try {
        // 處理文檔
        consumer.processDocument();
      } finally {
        // DocState清空狀态
        docState.clear();
      }
      success = true;
    } finally {
      if (!success) {
        // 如果該文檔處理異常,将該docId添加到pendingUpdates中,在flush的時候處理該docId
        // mark document as deleted
        deleteDocID(docState.docID);
        // 遞增numDocsInRAM以便為下一篇文檔的docId
        numDocsInRAM++;
      }
    }
    // 處理完文檔後更新deleteSlice
    return finishDocument(delTerm);
  }           

複制

3. DocConsumer.processDocument源碼解析

該方法會處理文檔的每一個field,生成存儲域、反向索引、DocValue、Point類型

public void processDocument() throws IOException, AbortingException {

    // How many indexed field names we've seen (collapses
    // multiple field instances by the same name):
    int fieldCount = 0;

    long fieldGen = nextFieldGen++;

    // NOTE: we need two passes here, in case there are
    // multi-valued fields, because we must process all
    // instances of a given field at once, since the
    // analyzer is free to reuse TokenStream across fields
    // (i.e., we cannot have more than one TokenStream
    // running "at once"):
    // 反向索引處理前的操作
    termsHash.startDocument();
    // 開始處理存儲域
    startStoredFields(docState.docID);

    boolean aborting = false;
    try {
      // 依次處理每一個field
      for (IndexableField field : docState.doc) {
        // 處理每一個field,涉及反向索引、termVector的生成、存儲域、DocValue、Point類型的處理
        fieldCount = processField(field, fieldGen, fieldCount);
      }
    } catch (AbortingException ae) {
      aborting = true;
      throw ae;
    } finally {
      if (aborting == false) {
        // Finish each indexed field name seen in the document:
        // 依次調用每一個field的finish方法
        for (int i=0;i<fieldCount;i++) {
          fields[i].finish();
        }
        // 結束處理存儲域
        finishStoredFields();
      }
    }

    try {
      // 結束處理反向索引
      termsHash.finishDocument();
    } catch (Throwable th) {
      // Must abort, on the possibility that on-disk term
      // vectors are now corrupt:
      throw AbortingException.wrap(th);
    }
  }           

複制

4. DocumentsWriterFlushControl.doAfterDocument源碼解析

處理完文檔後,會調用DocumentsWriterFlushControl.doAfterDocument進行後置處理。

synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) {
    try {
      // 更新ThreadState占用的記憶體,如果該ThreadState處于flushPending狀态,将占用的記憶體
      // 增加到flushBytes中,否則添加到activeBytes中
      commitPerThreadBytes(perThread);
      // 如果該ThreadState沒有處于flushPending狀态
      if (!perThread.flushPending) {
        // 如果是更新操作,調用FlushPolicy的onUpdate,否則調用onInsert
        if (isUpdate) {
          flushPolicy.onUpdate(this, perThread);
        } else {
          flushPolicy.onInsert(this, perThread);
        }
        // 如果ThreadState沒有處于flushPending狀态,并且ThreadState占用的記憶體大于hardMaxBytesPerDWPT
        // 設定該ThreadState為flushPending狀态
        if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
          // Safety check to prevent a single DWPT exceeding its RAM limit. This
          // is super important since we can not address more than 2048 MB per DWPT
          setFlushPending(perThread);
        }
      }
      // checkout該ThreadState,下面描述該方法的流程
      return checkout(perThread, false);
    } finally {
      boolean stalled = updateStallState();
      assert assertNumDocsSinceStalled(stalled) && assertMemory();
    }
  }           

複制

private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) {
    if (fullFlush) {
      // 處于full flush狀态,如果該ThreadState處于flushPending狀态,将該ThreadState添加到
      // blockedFlushes中, 并且取出一個待flush的DWPT, 否則傳回空
      if (perThread.flushPending) {
        checkoutAndBlock(perThread);
        return nextPendingFlush();
      } else {
        return null;
      }
    } else {
      // 如果需要将該ThreadState設定為flushPending
      if (markPending) {
        assert perThread.isFlushPending() == false;
        setFlushPending(perThread);
      }
      // 取出該ThreadState相關的DWPT
      return tryCheckoutForFlush(perThread);
    }
  }           

複制

該文章描述了處理文檔流程,并沒有展開介紹field的具體處理邏輯,我們會在後面的文章中介紹不同type的field的具體檔案格式。下一篇文章 https://cloud.tencent.com/developer/article/1579684 中我們會介紹處理文檔後的流程。