在lucene中添加文檔是通過IndexWriter.addDocument方法,我們先給出添加文檔的示例代碼
IndexWriterConfig config = new IndexWriterConfig(new WhitespaceAnalyzer());
config.setUseCompoundFile(false);
config.setMaxBufferedDocs(2);
IndexWriter writer = new IndexWriter(dir, config);
//
FieldType type = new FieldType();
type.setStored(true);
type.setTokenized(true);
type.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
type.setStoreTermVectors(true);
type.setStoreTermVectorPositions(true);
type.setStoreTermVectorOffsets(true);
type.freeze();
//
Document doc = new Document();
doc.add(new Field("content", "one", type));
writer.addDocument(doc);
複制
上一篇文章中介紹了lucene添加、修改文檔前的流程,在這一篇文章中,介紹處理文檔的流程。
lucene添加文檔的流程
1. 擷取ThreadState源碼解析
lucene在處理文檔前需要擷取一個ThreadState
ThreadState obtainAndLock() {
// 從DocumentsWriterPerThreadPool中擷取ThreadState
final ThreadState perThread = perThreadPool.getAndLock(Thread
.currentThread(), documentsWriter);
boolean success = false;
try {
// 如果擷取的ThreadState已經初始化DWPT,并且該ThreadState的deleteQueue
// 和IndexWriter的deleteQueue不一緻
if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
// another DWPT:
// 有一個full flush(IndexWriter.flush)操作正在進行中,如果該ThreadState關聯的DWPT
// 索引的文檔數大于0,将該ThreadState添加到fullFlushBuffer中,否則重置該DWPT
addFlushableState(perThread);
}
success = true;
// simply return the ThreadState even in a flush all case sine we already hold the lock
return perThread;
} finally {
if (!success) { // make sure we unlock if this fails
perThreadPool.release(perThread);
}
}
}
複制
/** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
// 擷取ThreadState
ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
ThreadState threadState = null;
synchronized (this) {
// 如果freeList中ThreadState為空,建立新的ThreadState,并且将其添加到threadStates中
if (freeList.isEmpty()) {
// ThreadState is already locked before return by this method:
return newThreadState();
} else {
// Important that we are LIFO here! This way if number of concurrent indexing
// threads was once high, but has now reduced, we only use a
// limited number of thread states:
// LIFO(後進先出),擷取最後一個ThreadState
threadState = freeList.remove(freeList.size()-1);
// 如果最後一個ThreadState中的DWPT為空
if (threadState.dwpt == null) {
// This thread-state is not initialized, e.g. it
// was just flushed. See if we can instead find
// another free thread state that already has docs
// indexed. This way if incoming thread concurrency
// has decreased, we don't leave docs
// indefinitely buffered, tying up RAM. This
// will instead get those thread states flushed,
// freeing up RAM for larger segment flushes:
// 從頭到尾周遊freeList中的ThreadState,尋找DWPT不為空的ThreadState,
// 并且将其和最後一個元素進行交換
for(int i=0;i<freeList.size();i++) {
ThreadState ts = freeList.get(i);
if (ts.dwpt != null) {
// Use this one instead, and swap it with
// the un-initialized one:
freeList.set(i, threadState);
threadState = ts;
break;
}
}
}
}
}
// This could take time, e.g. if the threadState is [briefly] checked for flushing:
// 對選中的ThreadState進行鎖定
threadState.lock();
return threadState;
}
複制
2. DocumentsWriterPerThread.updateDocument更新文檔源碼解析
public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null;
// pendingNumDocs計數遞增
reserveOneDoc();
docState.doc = doc;
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
// Even on exception, the document is still added (but marked
// deleted), so we don't need to un-reserve at that point.
// Aborting exceptions will actually "lose" more than one
// document, so the counter will be "wrong" in that case, but
// it's very hard to fix (we can't easily distinguish aborting
// vs non-aborting exceptions):
boolean success = false;
try {
try {
// 處理文檔
consumer.processDocument();
} finally {
// DocState清空狀态
docState.clear();
}
success = true;
} finally {
if (!success) {
// 如果該文檔處理異常,将該docId添加到pendingUpdates中,在flush的時候處理該docId
// mark document as deleted
deleteDocID(docState.docID);
// 遞增numDocsInRAM以便為下一篇文檔的docId
numDocsInRAM++;
}
}
// 處理完文檔後更新deleteSlice
return finishDocument(delTerm);
}
複制
3. DocConsumer.processDocument源碼解析
該方法會處理文檔的每一個field,生成存儲域、反向索引、DocValue、Point類型
public void processDocument() throws IOException, AbortingException {
// How many indexed field names we've seen (collapses
// multiple field instances by the same name):
int fieldCount = 0;
long fieldGen = nextFieldGen++;
// NOTE: we need two passes here, in case there are
// multi-valued fields, because we must process all
// instances of a given field at once, since the
// analyzer is free to reuse TokenStream across fields
// (i.e., we cannot have more than one TokenStream
// running "at once"):
// 反向索引處理前的操作
termsHash.startDocument();
// 開始處理存儲域
startStoredFields(docState.docID);
boolean aborting = false;
try {
// 依次處理每一個field
for (IndexableField field : docState.doc) {
// 處理每一個field,涉及反向索引、termVector的生成、存儲域、DocValue、Point類型的處理
fieldCount = processField(field, fieldGen, fieldCount);
}
} catch (AbortingException ae) {
aborting = true;
throw ae;
} finally {
if (aborting == false) {
// Finish each indexed field name seen in the document:
// 依次調用每一個field的finish方法
for (int i=0;i<fieldCount;i++) {
fields[i].finish();
}
// 結束處理存儲域
finishStoredFields();
}
}
try {
// 結束處理反向索引
termsHash.finishDocument();
} catch (Throwable th) {
// Must abort, on the possibility that on-disk term
// vectors are now corrupt:
throw AbortingException.wrap(th);
}
}
複制
4. DocumentsWriterFlushControl.doAfterDocument源碼解析
處理完文檔後,會調用DocumentsWriterFlushControl.doAfterDocument進行後置處理。
synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) {
try {
// 更新ThreadState占用的記憶體,如果該ThreadState處于flushPending狀态,将占用的記憶體
// 增加到flushBytes中,否則添加到activeBytes中
commitPerThreadBytes(perThread);
// 如果該ThreadState沒有處于flushPending狀态
if (!perThread.flushPending) {
// 如果是更新操作,調用FlushPolicy的onUpdate,否則調用onInsert
if (isUpdate) {
flushPolicy.onUpdate(this, perThread);
} else {
flushPolicy.onInsert(this, perThread);
}
// 如果ThreadState沒有處于flushPending狀态,并且ThreadState占用的記憶體大于hardMaxBytesPerDWPT
// 設定該ThreadState為flushPending狀态
if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
// Safety check to prevent a single DWPT exceeding its RAM limit. This
// is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
}
}
// checkout該ThreadState,下面描述該方法的流程
return checkout(perThread, false);
} finally {
boolean stalled = updateStallState();
assert assertNumDocsSinceStalled(stalled) && assertMemory();
}
}
複制
private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) {
if (fullFlush) {
// 處于full flush狀态,如果該ThreadState處于flushPending狀态,将該ThreadState添加到
// blockedFlushes中, 并且取出一個待flush的DWPT, 否則傳回空
if (perThread.flushPending) {
checkoutAndBlock(perThread);
return nextPendingFlush();
} else {
return null;
}
} else {
// 如果需要将該ThreadState設定為flushPending
if (markPending) {
assert perThread.isFlushPending() == false;
setFlushPending(perThread);
}
// 取出該ThreadState相關的DWPT
return tryCheckoutForFlush(perThread);
}
}
複制
該文章描述了處理文檔流程,并沒有展開介紹field的具體處理邏輯,我們會在後面的文章中介紹不同type的field的具體檔案格式。下一篇文章 https://cloud.tencent.com/developer/article/1579684 中我們會介紹處理文檔後的流程。