现在的位置: 首页 > 综合 > 正文

Lucene源代码之DocumentWriter

2013年03月20日 ⁄ 综合 ⁄ 共 14170字 ⁄ 字号 评论关闭

首先我们先概念性地了解一下索引如何写入的。

Lucene的索引文件是按照Document、Field、Term三个层面存了document,field,term的信息。一个文件系统必定存在着一组互逆的过程:写入,读取。检索系统也是一种文件系统,也存在互逆过程,有相关的类与之对应:

1)写入→*Writer

2)读取→*Reader

标引过程我们着重描述*Writer类的功能,Lucene有着如下的几个主要的写入类:IndexWriter,DocumentWriter,FieldsWriter,TermInfoWriter。他们的主要方法是addDocument方法,通过这个方法他们相互调用,添加document,field,term各个层面索引信息,以下图所示。

 

先是调用了IndexWriter的addDocument方法:

  1.   public void addDocument(Document doc, Analyzer analyzer) throws IOException {
  2.     /**写入内存*/
  3.     SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer);
  4.     synchronized (this) {
  5.       ramSegmentInfos.addElement(newSegmentInfo);
  6.       maybeFlushRamSegments();    /**将内存中的索引合并后保存到硬盘*/
  7.     }
  8.   }

 首先在内存中建立索引,代码如下:

  1.   /**
  2.    * 构建DocumentWriter,将索引写入到内存中,
  3.    * 从'此 '将开始DocumentWriter的时代。
  4.    */
  5.   SegmentInfo buildSingleDocSegment(Document doc, Analyzer analyzer)
  6.       throws IOException {
  7.     DocumentWriter dw = new DocumentWriter(ramDirectory, analyzer, this);
  8.     dw.setInfoStream(infoStream);
  9.     String segmentName = newRamSegmentName();
  10.     dw.addDocument(segmentName, doc);
  11.     return new SegmentInfo(segmentName, 1, ramDirectory, falsefalse);
  12.   }

默认词条索引区间为128,即this.termIndexInterval = writer.getTermIndexInterval();,也可以在IndexWriter类中找到定义。

另外,this.similarity = writer.getSimilarity();,其实DocumentWriter的这个成员similarity=new DefaultSimilarity();。DefaultSimilarity类继承自Similarity抽象类,该类是用来处理有关“相似性”的,与检索密切相关,其实就是对一些数据在运算过程中可能涉及到数据位数的舍入与进位。具体地,Similarity类的定义可查看org.apache.lucene.search.Similarity。

这样,一个DocumentWriter就构造完成了。

DocumentWriter类的addDocument()方法

  1. final void addDocument(String segment, Document doc)
  2.           throws CorruptIndexException, IOException {
  3.     // 创建一个FieldInfos对象,用来存储加入到索引的Document中的各个Field的信息
  4.     fieldInfos = new FieldInfos();
  5.     fieldInfos.add(doc);   // 将Document加入到FieldInfos中
  6.     
  7.     // postingTable是用于存储所有词条的HashTable
  8.     postingTable.clear();     // clear postingTable
  9.     fieldLengths = new int[fieldInfos.size()];    // 初始化int[]数组fieldLengths,用来记录当前Document中所有Field的长度
  10.     fieldPositions = new int[fieldInfos.size()]; // 初始化int[]数组fieldPositions,用来记录当前Document中所有Field在分析完成后所处位置
  11.     fieldOffsets = new int[fieldInfos.size()];    // 初始化int[]数组fieldOffsets,用来记录当前Document中所有Field的offset
  12.     fieldStoresPayloads = new BitSet(fieldInfos.size());
  13.     
  14.     fieldBoosts = new float[fieldInfos.size()];   // 初始化int[]数组fieldBoosts,用来记录当前Document中所有Field的boost值
  15.     Arrays.fill(fieldBoosts, doc.getBoost());    // 为fieldBoosts数组中的每个元素赋值,根据Document中记录的boost值
  16.     try {
  17.     
  18.       // 在将FieldInfos写入之前,要对Document中的各个Field进行“倒排”
  19.       invertDocument(doc); 
  20.     
  21.       // 对postingTable中的词条进行排序,返回一个排序的Posting[]数组
  22.       Posting[] postings = sortPostingTable();
  23.     
  24.       // 将FieldInfos写入到索引目录directory中,即写入到文件segments.fnm中
  25.       fieldInfos.write(directory, segment + ".fnm");
  26.       // 构造一个FieldInfos的输出流FieldsWriter,将Field的详细信息(包括上面提到的各个数组中的值)写入到索引目录中
  27.       FieldsWriter fieldsWriter =
  28.         new FieldsWriter(directory, segment, fieldInfos);
  29.       try {
  30.         fieldsWriter.addDocument(doc);    // 将Document加入到FieldsWriter
  31.       } finally {
  32.         fieldsWriter.close();    // 关闭FieldsWriter输出流
  33.       }
  34.      // 将经过排序的Posting[]数组写入到索引段文件中(segmentsv.frq文件和segments.prx文件)
  35.       writePostings(postings, segment);
  36.       // 写入被索引的Field的norm信息
  37.       writeNorms(segment);
  38.     } finally {
  39.       // 关闭TokenStreams
  40.       IOException ex = null;
  41.       
  42.       Iterator it = openTokenStreams.iterator();    // openTokenStreams是DocumentWriter类定义的一个链表成员,即:private List openTokenStreams = new LinkedList();
  43.       while (it.hasNext()) {
  44.         try {
  45.           ((TokenStream) it.next()).close();
  46.         } catch (IOException e) {
  47.           if (ex != null) {
  48.             ex = e;
  49.           }
  50.         }
  51.       }
  52.       openTokenStreams.clear();    // 清空openTokenStreams
  53.       
  54.       if (ex != null) {
  55.         throw ex;
  56.       }
  57.     }
  58. }

文件倒排的实现过程

DocumentWriter实现对Document的“倒排”

  1. // 调用底层分析器接口,遍历Document中的Field,对数据源进行分析
  2. private final void invertDocument(Document doc)
  3.           throws IOException {
  4.     Iterator fieldIterator = doc.getFields().iterator();    // 通过Document获取Field的List列表doc.getFields()
  5.     while (fieldIterator.hasNext()) {
  6.       Fieldable field = (Fieldable) fieldIterator.next();
  7.       String fieldName = field.name();
  8.       int fieldNumber = fieldInfos.fieldNumber(fieldName);    // 根据一个Field的fieldName得到该Field的编号number(number是FieldInfo类的一个成员)
  9.       int length = fieldLengths[fieldNumber];     // 根据每个Field的编号,设置每个Field的长度
  10.       int position = fieldPositions[fieldNumber]; // 根据每个Field的编号,设置每个Field的位置
  11.       if (length>0) position+=analyzer.getPositionIncrementGap(fieldName);
  12.       int offset = fieldOffsets[fieldNumber];       // 根据每个Field的编号,设置每个Field的offset
  13.       if (field.isIndexed()) {    // 如果Field被索引
  14.         if (!field.isTokenized()) {    // 如果Field没有进行分词
  15.           String stringValue = field.stringValue();    // 获取Field的String数据值
  16.           if(field.isStoreOffsetWithTermVector())    // 是否把整个Field的数据作为一个词条存储到postingTable中
  17.         // 把整个Field的数据作为一个词条存储到postingTable中
  18.             addPosition(fieldName, stringValue, position++, nullnew TermVectorOffsetInfo(offset, offset + stringValue.length()));
  19.           else    // 否则,不把整个Field的数据作为一个词条存储到postingTable中
  20.             addPosition(fieldName, stringValue, position++, nullnull);
  21.           offset += stringValue.length();
  22.           length++;
  23.         } else 
  24.         { // 需要对Field进行分词
  25.           TokenStream stream = field.tokenStreamValue();
  26.           if (stream == null) {     // 如果一个TokenStream不存在,即为null,则必须从一个Analyzer中获取一个TokenStream流
  27.             Reader reader;     
  28.             if (field.readerValue() != null)    // 如果从Field获取的Reader数据不为null
  29.               reader = field.readerValue();    // 一个Reader流存在
  30.             else if (field.stringValue() != null)
  31.               reader = new StringReader(field.stringValue());    //   根据从Field获取的字符串数据构造一个Reader输入流
  32.             else
  33.               throw new IllegalArgumentException
  34.                       ("field must have either String or Reader value");
  35.             // 把经过分词处理的Field加入到postingTable中
  36.             stream = analyzer.tokenStream(fieldName, reader);
  37.           }
  38.           
  39.           // 将每个Field对应的TokenStream加入到链表openTokenStreams中,等待整个Document中的所有Field都分析处理完毕后,对链表openTokenStreams中的每个链表TokenStream进行统一关闭
  40.           openTokenStreams.add(stream);
  41.           
  42.           // 对第一个Token,重置一个TokenStream
  43.           stream.reset();
  44.          
  45.           Token lastToken = null;
  46.           for (Token t = stream.next(); t != null; t = stream.next()) {
  47.             position += (t.getPositionIncrement() - 1);    // 每次切出一个词,就将position加上这个词的长度
  48.               
  49.             Payload payload = t.getPayload();    // 每个词都对应一个Payload,它是关于一个词存储到postingTable中的元数据(metadata)
  50.             if (payload != null) {
  51.               fieldStoresPayloads.set(fieldNumber);    // private BitSet fieldStoresPayloads;,BitSet是一个bits的向量,调用BitSet类的set方法,设置该Field的在索引fieldNumber处的bit值
  52.             }
  53.               
  54.             TermVectorOffsetInfo termVectorOffsetInfo;
  55.             if (field.isStoreOffsetWithTermVector()) {    // 如果指定了Field的词条向量的偏移量,则存储该此条向量
  56.               termVectorOffsetInfo = new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset());
  57.             } else {
  58.               termVectorOffsetInfo = null;
  59.             }
  60.         // 把该Field的切出的词条存储到postingTable中
  61.             addPosition(fieldName, t.termText(), position++, payload, termVectorOffsetInfo);
  62.               
  63.             lastToken = t;
  64.             if (++length >= maxFieldLength) {// 如果当前切出的词条数已经达到了该Field的最大长度
  65.               if (infoStream != null)
  66.                 infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens");
  67.               break;
  68.             }
  69.           }
  70.             
  71.           if(lastToken != null)    // 如果最后一个切出的词不为null,设置offset的值
  72.             offset += lastToken.endOffset() + 1;
  73.         }
  74.         fieldLengths[fieldNumber] = length;   // 存储Field的长度
  75.         fieldPositions[fieldNumber] = position;   // 存储Field的位置
  76.         fieldBoosts[fieldNumber] *= field.getBoost();    // 存储Field的boost值
  77.         fieldOffsets[fieldNumber] = offset;    //   存储Field的offset值
  78.       }
  79.     }
  80.     
  81.     // 所有的Field都有经过分词处理的具有Payload描述的词条,更新FieldInfos
  82.     for (int i = fieldStoresPayloads.nextSetBit(0); i >= 0; i = fieldStoresPayloads.nextSetBit(i+1)) { 
  83.     fieldInfos.fieldInfo(i).storePayloads = true;
  84.     }
  85. }

【小议一下】

1、该invertDocument()方法遍历了FieldInfos的每个Field,根据每个Field的属性进行分析,如果需要分词,则调用底层分析器接口,执行分词处理。

2、在invertDocument()方法中,对Field的信息进行加工处理,尤其是每个Field的切出的词条,这些词条最后将添加到postingTable中。

【迷惑】文件倒排到底是如何进行的呢?

 fieldLengths[fieldNumber] = length;   // save field length
        fieldPositions[fieldNumber] = position;   // save field position
        fieldBoosts[fieldNumber] *= field.getBoost();
        fieldOffsets[fieldNumber] = offset;

起作用还是

  addPosition(fieldName, t.termText(), position++, new TermVectorOffsetInfo(offset + t.startOffset(), offset + t.endOffset()));

【别人谈】文件的倒排就是对term位置信息记录的过程。非常正确。

【总结】两部分都是倒排过程的一部分,前者是结果,后者是过程。

下面的addPosition()方法填写PostingTable的过程比较简单,但不易理解;

  1.  private final void addPosition(String field, String text, int position, TermVectorOffsetInfo offset) {
  2.     termBuffer.set(field, text);
  3.     Posting ti = (Posting) postingTable.get(termBuffer);
  4.     if (ti != null) {                 // word seen before
  5.       int freq = ti.freq;
  6.       if (ti.positions.length == freq) {      // positions array is full
  7.         int[] newPositions = new int[freq * 2];   // double size
  8.         int[] positions = ti.positions;
  9.         System.arraycopy(positions, 0, newPositions, 0, freq);
  10.         ti.positions = newPositions;
  11.       }
  12.       ti.positions[freq] = position;          // add new position
  13.       if (offset != null) {
  14.         if (ti.offsets.length == freq){
  15.           TermVectorOffsetInfo [] newOffsets = new TermVectorOffsetInfo[freq*2];
  16.           TermVectorOffsetInfo [] offsets = ti.offsets;
  17.           System.arraycopy(offsets, 0, newOffsets, 0, freq);
  18.           ti.offsets = newOffsets;
  19.         }
  20.         ti.offsets[freq] = offset;
  21.       }
  22.       ti.freq = freq + 1;             // update frequency
  23.     } else {                      // word not seen before
  24.       Term term = new Term(field, text, false);
  25.       postingTable.put(term, new Posting(term, position, offset));
  26.     }
  27.   }

解读上述代码:

首先要了解postingTable的组成<term,posting>;而term的组成(fieldname,text)(field的名称,field的内容),以及posting的组成(freg,postions[])(词的频率信息,位置信息)。首先设置term的对象,然后到HashTable表中寻找此term的value(也就是posting)值,如果该Posting的对象不为空,也就是说HashTable具有相同的Term信息(相同的field名,相同的Term的内容),表明该term出现了至少一次。此后,开始修改位置信息。

举例说明:假设field编号为1,fieldData为“feild,field,field,two,text”,field名称为textField。那么,当第三个token,也就是第三个field添加后,扩展position缓存大小为4个int,执行@后position值为

"ti.postions"=int[4]  (id=100)

[0]=0

[1]=1

[2]=2

[3]=0

 

PostingTable的排序过程

当FieldInfos中的每个Field进行分词以后,所有切出的词条都放到了一个HashTable postingTable中,这时所有的词条在postingTable中是无序的。在DocumentWriter的addDocument()方法中调用了sortPostingTable()方法,对词条进行了排序,排序使用“快速排序”方式,“快速排序”的时间复杂度O(N*logN),排序速度很快。

sortPostingTable()方法的实现如下所示:

  1. private final Posting[] sortPostingTable() {
  2.     // 将postingTable转换成Posting[]数组,便于快速排序
  3.     Posting[] array = new Posting[postingTable.size()];
  4.     Enumeration postings = postingTable.elements();
  5.     for (int i = 0; postings.hasMoreElements(); i++)
  6.       array[i] = (Posting) postings.nextElement();
  7.     // 调用quickSort()方法,使用快速排序对Posting[]数组进行排序
  8.     quickSort(array, 0, array.length - 1);
  9.     return array;
  10. }

采用快速排序,效率更高。代码如下:

  1.  private static final void quickSort(Posting[] postings, int lo, int hi) {
  2.     if (lo >= hi)
  3.       return;
  4.     int mid = (lo + hi) / 2;
  5.     if (postings[lo].term.compareTo(postings[mid].term) > 0) {
  6.       Posting tmp = postings[lo];
  7.       postings[lo] = postings[mid];
  8.       postings[mid] = tmp;
  9.     }
  10.     if (postings[mid].term.compareTo(postings[hi].term) > 0) {
  11.       Posting tmp = postings[mid];
  12.       postings[mid] = postings[hi];
  13.       postings[hi] = tmp;
  14.       if (postings[lo].term.compareTo(postings[mid].term) > 0) {
  15.         Posting tmp2 = postings[lo];
  16.         postings[lo] = postings[mid];
  17.         postings[mid] = tmp2;
  18.       }
  19.     }
  20.     int left = lo + 1;
  21.     int right = hi - 1;
  22.     if (left >= right)
  23.       return;
  24.     Term partition = postings[mid].term;
  25.     for (; ;) {
  26.       while (postings[right].term.compareTo(partition) > 0)
  27.         --right;
  28.       while (left < right && postings[left].term.compareTo(partition) <= 0)
  29.         ++left;
  30.       if (left < right) {
  31.         Posting tmp = postings[left];
  32.         postings[left] = postings[right];
  33.         postings[right] = tmp;
  34.         --right;
  35.       } else {
  36.         break;
  37.       }
  38.     }
  39.     quickSort(postings, lo, left);
  40.     quickSort(postings, left + 1, hi);
  41.   }

Posting类

Posting类定义在DocumentWriter类的内部。

在DocumentWriter类的addPosition方法中的最后两行可以看到:

      Term term = new Term(field, text, false);
      postingTable.put(term, new Posting(term, position, payload, offset));

一个postingTable是一个HashMap,存储的是一个个的<键,值>对。它的键是Term对象,值是Posting对象。

该类是为排序服务的,提取了与词条信息有关的一些使用频率较高的属性,定义成了该Posting类,实现非常简单,如下所示:

  1. final class Posting {      // 在一个Document中与词条有关的信息
  2. Term term;       // 一个词条
  3. int freq;       // 词条Term term在该Document中的频率
  4. int[] positions;      // 位置
  5. Payload[] payloads; // Payloads信息
  6. TermVectorOffsetInfo [] offsets;    // 词条向量的offset(偏移量)信息
  7. Posting(Term t, int position, Payload payload, TermVectorOffsetInfo offset) {    // Posting构造器
  8.     term = t;
  9.     freq = 1;
  10.     positions = new int[1];
  11.     positions[0] = position;
  12.     
  13.     if (payload != null) {
  14.       payloads = new Payload[1];
  15.       payloads[0] = payload;
  16.     } else 
  17.       payloads = null;    
  18.    
  19.     if(offset != null){
  20.       offsets = new TermVectorOffsetInfo[1];
  21.       offsets[0] = offset;
  22.     } else
  23.       offsets = null;
  24. }
  25. }

DocumentWriter实现中还有writePosting方法,该方法主要是写入.frq和.frx文件的。在后续篇章中继续……

最后还有有一个写入规格化文件writeNorms,代码如下:

  1. private final void writeNorms(String segment) throws IOException { 
  2.     for(int n = 0; n < fieldInfos.size(); n++){
  3.       FieldInfo fi = fieldInfos.fieldInfo(n);
  4.       if(fi.isIndexed && !fi.omitNorms){
  5.         float norm = fieldBoosts[n] * similarity.lengthNorm(fi.name, fieldLengths[n]);
  6.         IndexOutput norms = directory.createOutput(segment + ".f" + n);
  7.         try {
  8.           norms.writeByte(Similarity.encodeNorm(norm));
  9.         } finally {
  10.           norms.close();
  11.         }
  12.       }
  13.     }
  14.   }

创建.f*规格化文件,每个field都有自己的规格化之。规格化值是通过DefaultSimilarity按照field的名字与该field包含的term个数计算出来的。具体内容,后续查询篇章详解之。

 

最后的总结:

在学习DocumentWriter类的addDocument()方法的过程中,涉及到了该类的很多方法,其中关于文档的倒排的方法是非常重要的。

此外,还涉及到了FieldInfos类和FieldInfo类,他们的关系很像SegmentInfos类和SegmentInfo类。FieldInfos类主要是对Document添加到中的Field进行管理的,可以通过FieldInfos类来访问Document中所有Field的信息。每个索引段(索引段即Segment)都拥有一个单独的FieldInfos。

抱歉!评论已关闭.