上一篇文章大概講了索引從indexwriter到defaultindexchain的過程,也分析了defaultindexchain的基本流程,
主要就是:
将dwpt接收的每個文檔一條條處理---》對每一條文檔再按Field依次處理---》對每個Field依據他是否分詞,是否存儲是否有docvalue再分别處理。
可見每個dwpt之間是并行的做事情,每個dwpt内是串行的做事情。
每個field的具體處理是需要寫termsHashPerField的資訊,而這個資訊是被termsHash統一管理的,可以把termsHash了解為一個dwpt中共享的緩沖區,主要用于在記憶體中建立索引,并在需要fulsh的時候刷入磁盤,每一個dwpt被刷入磁盤後其實就是一個段,當然如果段的大小被設定的話,可能還需要進行段合并之類的。這是後話,我們将在以後專門分析flush,在本文中我們主要繼續上一篇分析pf.invert(),代碼如下
public void invert(IndexableField field, boolean first) throws IOException, AbortingException {
if (first) {
// First time we're seeing this field (indexed) in
// this document:
invertState.reset(); //第一次的話需要重置資訊
}
IndexableFieldType fieldType = field.fieldType();
IndexOptions indexOptions = fieldType.indexOptions();
fieldInfo.setIndexOptions(indexOptions);
if (fieldType.omitNorms()) {
fieldInfo.setOmitsNorms();
}
final boolean analyzed = fieldType.tokenized() && docState.analyzer != null;
// only bother checking offsets if something will consume them.
// TODO: after we fix analyzers, also check if termVectorOffsets will be indexed.
final boolean checkOffsets = indexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS;
/*
* To assist people in tracking down problems in analysis components, we wish to write the field name to the
* infostream when we fail. We expect some caller to eventually deal with the real exception, so we don't want any
* 'catch' clauses, but rather a finally that takes note of the problem.
*/
boolean succeededInProcessingField = false;
try (TokenStream stream = tokenStream = field.tokenStream(docState.analyzer, tokenStream)) { //擷取field内容的token流
// reset the TokenStream to the first token
stream.reset();
invertState.setAttributeSource(stream);
termsHashPerField.start(field, first);
while (stream.incrementToken()) { //對field内容中分出的每一個term進行處理
// If we hit an exception in stream.next below
// (which is fairly common, e.g. if analyzer
// chokes on a given document), then it's
// non-aborting and (above) this one document
// will be marked as deleted, but still
// consume a docID
int posIncr = invertState.posIncrAttribute.getPositionIncrement();
invertState.position += posIncr;
if (invertState.position < invertState.lastPosition) {
if (posIncr == 0) {
throw new IllegalArgumentException("first position increment must be > 0 (got 0) for field '"
+ field.name() + "'");
} else {
throw new IllegalArgumentException("position increments (and gaps) must be >= 0 (got " + posIncr
+ ") for field '" + field.name() + "'");
}
} else if (invertState.position > IndexWriter.MAX_POSITION) {
throw new IllegalArgumentException("position " + invertState.position + " is too large for field '"
+ field.name() + "': max allowed position is " + IndexWriter.MAX_POSITION);
}
invertState.lastPosition = invertState.position;
if (posIncr == 0) {
invertState.numOverlap++;
}
if (checkOffsets) {
int startOffset = invertState.offset + invertState.offsetAttribute.startOffset();
int endOffset = invertState.offset + invertState.offsetAttribute.endOffset();
if (startOffset < invertState.lastStartOffset || endOffset < startOffset) {
throw new IllegalArgumentException(
"startOffset must be non-negative, and endOffset must be >= startOffset, and offsets must not go backwards "
+ "startOffset=" + startOffset + ",endOffset=" + endOffset + ",lastStartOffset="
+ invertState.lastStartOffset + " for field '" + field.name() + "'");
}
invertState.lastStartOffset = startOffset;
}
invertState.length++;
if (invertState.length < 0) {
throw new IllegalArgumentException("too many tokens in field '" + field.name() + "'");
}
// System.out.println(" term=" + invertState.termAttribute);
// If we hit an exception in here, we abort
// all buffered documents since the last
// flush, on the likelihood that the
// internal state of the terms hash is now
// corrupt and should not be flushed to a
// new segment:
try {
termsHashPerField.add(); //将term加入目前記憶體中的索引結構,主要有兩個處理鍊,一個是FreqProxTermsWriterPerField,另一個是TermVectorsConsumerPerField
} catch (MaxBytesLengthExceededException e) {
byte[] prefix = new byte[30];
BytesRef bigTerm = invertState.termAttribute.getBytesRef();
System.arraycopy(bigTerm.bytes, bigTerm.offset, prefix, 0, 30);
String msg = "Document contains at least one immense term in field=\""
+ fieldInfo.name
+ "\" (whose UTF8 encoding is longer than the max length "
+ DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8
+ "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '"
+ Arrays.toString(prefix) + "...', original message: " + e.getMessage();
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", "ERROR: " + msg);
}
// Document will be deleted above:
throw new IllegalArgumentException(msg, e);
} catch (Throwable th) {
throw AbortingException.wrap(th);
}
}
// trigger streams to perform end-of-stream operations
stream.end(); //token流結束
// TODO: maybe add some safety? then again, it's already checked
// when we come back around to the field...
invertState.position += invertState.posIncrAttribute.getPositionIncrement();
invertState.offset += invertState.offsetAttribute.endOffset();
/* if there is an exception coming through, we won't set this to true here: */
succeededInProcessingField = true;
} finally {
if (!succeededInProcessingField && docState.infoStream.isEnabled("DW")) {
docState.infoStream.message("DW", "An exception was thrown while processing field " + fieldInfo.name);
}
}
if (analyzed) {
invertState.position += docState.analyzer.getPositionIncrementGap(fieldInfo.name);
invertState.offset += docState.analyzer.getOffsetGap(fieldInfo.name);
}
invertState.boost *= field.boost();
}
}
termsHashPerField.add()方法比較重要,他負責利用token填充記憶體中的緩沖區,代碼如下:
void add() throws IOException {
// We are first in the chain so we must "intern" the
// term text into textStart address
// Get the text & hash of this term.
int termID = bytesHash.add(termAtt.getBytesRef()); // 判斷term是否存在,存在傳回大于0,否則小于零
// System.out.println("add term=" + termBytesRef.utf8ToString() + " doc=" + docState.docID + " termID=" + termID);
if (termID >= 0) {// New posting
bytesHash.byteStart(termID);
// Init stream slices
if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
intPool.nextBuffer(); // intPool配置設定空間
}
if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt * ByteBlockPool.FIRST_LEVEL_SIZE) {
bytePool.nextBuffer(); // bytePool配置設定空間
}
intUptos = intPool.buffer;
intUptoStart = intPool.intUpto;
intPool.intUpto += streamCount;
postingsArray.intStarts[termID] = intUptoStart + intPool.intOffset; // 指派intstart
for (int i = 0; i < streamCount; i++) {
final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
intUptos[intUptoStart + i] = upto + bytePool.byteOffset;
}
postingsArray.byteStarts[termID] = intUptos[intUptoStart]; // 指派bytestart
newTerm(termID); // 新加一個term,調用freproxtermwriter
} else {
termID = (-termID) - 1;
int intStart = postingsArray.intStarts[termID];
intUptos = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
intUptoStart = intStart & IntBlockPool.INT_BLOCK_MASK;
addTerm(termID);
}
if (doNextCall) {
nextPerField.add(postingsArray.textStarts[termID]); // 繼續一次,在term處理時會調用termvectorconsumer,調用另外一個add函數
}
}
在這裡涉及到了postingsArray,intPool,bytePool,這三者的關系可以用一張圖檔表示:
我盡量用一句話來解釋,postingsArray是一個二維數組,每一行是一個term,textstart記錄了這個term的值在bytepool中的偏移量,bytestart是結束位址,intstart是在intpool中的偏移量。intpool和byte是所有field共享的,postingsArray是每個field獨有的,是以真正寫入段的資訊是通過bytepool和intpool,這一點很重要,是以操作這兩個空間一定是串行的。而lucene的執行流程也保證了這一點。
具體解釋可以檢視這一篇文章http://blog.csdn.net/liweisnake/article/details/11364597
這裡的關系比較難以搞懂,盜圖一張:
接着上面的代碼,addTerm()和newTerm()是具體寫入bytepool和intpool的,而newTerm分别又是調用了freproxtermwriter以及termvectorconsumer中的newTerm()方法。下面先來看看freproxtermwriter.newTerm()
void newTerm(final int termID) {
// First time we're seeing this term since the last
// flush
final FreqProxPostingsArray postings = freqProxPostingsArray;
postings.lastDocIDs[termID] = docState.docID;
if (!hasFreq) {
assert postings.termFreqs == null;
postings.lastDocCodes[termID] = docState.docID;
} else {
postings.lastDocCodes[termID] = docState.docID << 1;
postings.termFreqs[termID] = 1;
if (hasProx) {
writeProx(termID, fieldState.position); //将位置資訊寫入bytepool
if (hasOffsets) {
writeOffsets(termID, fieldState.offset); //将位置偏移量寫入bytepool
}
} else {
assert !hasOffsets;
}
}
fieldState.maxTermFrequency = Math.max(1, fieldState.maxTermFrequency);
fieldState.uniqueTermCount++;
}
termvectorconsumer.newTerm()的過程類似,也是寫入intpool和bytepool,但是這兩個newTerm寫入的内容不一樣,前者寫入詞頻,位置資訊,後者寫入Term長度和term的内容。回到剛才的add函數,裡面調用了 int termID = bytesHash.add(termAtt.getBytesRef()); 如果term存在則會傳回一個小于0的數也就是-(termID+1),如果不存在會在bytepool中寫入term的長度和值代碼如下:
public int add(BytesRef bytes) {
assert bytesStart != null : "Bytesstart is null - not initialized";
final int length = bytes.length;
// final position
final int hashPos = findHash(bytes);
int e = ids[hashPos];
if (e == -1) {
// new entry
final int len2 = 2 + bytes.length;
if (len2 + pool.byteUpto > BYTE_BLOCK_SIZE) {
if (len2 > BYTE_BLOCK_SIZE) {
throw new MaxBytesLengthExceededException("bytes can be at most "
+ (BYTE_BLOCK_SIZE - 2) + " in length; got " + bytes.length);
}
pool.nextBuffer();
}
final byte[] buffer = pool.buffer;
final int bufferUpto = pool.byteUpto;
if (count >= bytesStart.length) {
bytesStart = bytesStartArray.grow();
assert count < bytesStart.length + 1 : "count: " + count + " len: "
+ bytesStart.length;
}
e = count++;
bytesStart[e] = bufferUpto + pool.byteOffset; //在二維數組中寫入bytestart
// We first encode the length, followed by the
// bytes. Length is encoded as vInt, but will consume
// 1 or 2 bytes at most (we reject too-long terms,
// above).
if (length < 128) {
// 1 byte to store length
buffer[bufferUpto] = (byte) length; //term的長度
pool.byteUpto += length + 1;
assert length >= 0: "Length must be positive: " + length;
System.arraycopy(bytes.bytes, bytes.offset, buffer, bufferUpto + 1,
length); //term的值
} else {
// 2 byte to store length
buffer[bufferUpto] = (byte) (0x80 | (length & 0x7f));
buffer[bufferUpto + 1] = (byte) ((length >> 7) & 0xff);
pool.byteUpto += length + 2;
System.arraycopy(bytes.bytes, bytes.offset, buffer, bufferUpto + 2,
length);
}
assert ids[hashPos] == -1;
ids[hashPos] = e; //寫入termID
if (count == hashHalfSize) {
rehash(2 * hashSize, true);
}
return e; //傳回termID
}
return -(e + 1); //存在傳回-(termID+1)的
}
至此可以看出這裡面寫入了倒排資訊,具體怎麼和索引檔案對應,我将在下一篇文章中繼續分析。
如有不正确,請指正。