Lucene索引创建过程

本文档旨在分析Lucene如何把业务信息写到磁盘上的大致流程,并不涉及Document中每个Field如何存储(该部分放在另外一篇wiki中介绍)。

一,Lucene建索引API

Directory dire = NIOFSDirectory.open(FileSystems.getDefault().getPath(indexDirectory));
IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer());
iwc.setRAMBufferSizeMB(64); //兆默认刷
indexWriter = new IndexWriter(dire, iwc);

Document doc = createDocument(artiste, skuId);
indexWriter.addDocument(doc);

indexWriter.commit();
indexWriter.close();

二,创建IndexWriter

NIOFSDirectory.open()

如果是64位JRE会得到MMapDirectory(采用内存映射的方式写索引数据到File中)。

IndexWriterConfig

//properties
this.analyzer = analyzer;
ramBufferSizeMB = IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB;//默认超过16M就会触发flush磁盘操作
maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;//默认按照RAM空间大小触发flush
maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS;//
mergedSegmentWarmer = null;
delPolicy = new KeepOnlyLastCommitDeletionPolicy();//删除策略
commit = null;
useCompoundFile = IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM;
openMode = OpenMode.CREATE_OR_APPEND;//IndexWriter打开模式
similarity = IndexSearcher.getDefaultSimilarity();//相似度计算,一般初始化Searcher的时候会用(因为只有查询的时候才会用到相似度计算)
mergeScheduler = new ConcurrentMergeScheduler();//每个segement的merge交个一个线程完成
writeLockTimeout = IndexWriterConfig.WRITE_LOCK_TIMEOUT;//写操作遇到锁超时时间
indexingChain = DocumentsWriterPerThread.defaultIndexingChain;
codec = Codec.getDefault();
if (codec == null) {
  throw new NullPointerException();
}
infoStream = InfoStream.getDefault();
mergePolicy = new TieredMergePolicy();//合并策略
flushPolicy = new FlushByRamOrCountsPolicy();//flush策略
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
indexerThreadPool = new DocumentsWriterPerThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);//并发写索引线程池
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;

可以对IndexWriter做一些属性配置,IndexWriterConfig里面有非常丰富的各种配置。

三,创建Document

这个步骤比较简单,主要是将业务字段组装成一个Document。一个Document由多个Field组成的。

每个Filed一般有四个属性组成:

  • name:该字段的名称
  • value:该字段的值
  • value是否需要存储到索引文件中:如果存储到索引文件中,则search的时候可以从Document中读取到该字段的值
  • value值是否被索引:如果该字段被索引,则可以通过该字段为条件进行检索

四,添加Document

添加一个Document,其实调用的是updateDocument。而Lucene更新Document不像Mysql可以直接更新某一条记录,所以只能先删除这条记录(Document),然后再添加上这条Document。下面参数Term,是一个检索条件,满足条件的Document做更新。

public void updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
  ensureOpen();
  try {
    boolean success = false;
    try {
      if (docWriter.updateDocument(doc, analyzer, term)) {
        processEvents(true, false);
      }
      success = true;
    } finally {
      if (!success) {
        if (infoStream.isEnabled("IW")) {
          infoStream.message("IW", "hit exception updating document");
        }
      }
    }
  } catch (AbortingException | OutOfMemoryError tragedy) {
    tragicEvent(tragedy, "updateDocument");
  }
}

1  Lucene使用场景

这里从下面几个角度阐述下为什么Lucene不能直接更新一个Document?

  1. Lucene的设计本质是一个面向检索,或者面向读的系统。为了方面的检索,在建立索引的时候做了大量的读优化存储设计。简而言之,为了读的性能,牺牲了方便写、更新的操作。
  2. Lucene使用背景暗含了:Lucene适合(擅长)频繁读,不常写的场景。

所以上面添加一个Document,最后演变成了更新一个Document。并且updateDocument包含两个串行操作

(1)先检索,如果有满足条件的Document,则删除

(2)如果没有满足条件的Document,则直接添加到内存中

2  重要的几个基础类

在看docWriter.updateDocument(doc, analyzer, term)代码之前,我们先看几个Lucene子建的类,下面着重分析下:

2.1  DocumentsWriterPerThreadPool

Lucene内部实现的一个DocumentsWriterPerThread池(并不是严格意义的线程池),主要是
实现DocumentsWriterPerThread的重用(准确来说是实现ThreadState的重用)。该类可以简单理解一个线程池。

2.2 ThreadState

/*{@link ThreadState} references and guards a
* {@link DocumentsWriterPerThread} instance that is used during indexing to build a in-memory index segment.
*/
final static class ThreadState extends ReentrantLock {
  DocumentsWriterPerThread dwpt;
  // TODO this should really be part of DocumentsWriterFlushControl
  // write access guarded by DocumentsWriterFlushControl
  volatile boolean flushPending = false;
  // TODO this should really be part of DocumentsWriterFlushControl
  // write access guarded by DocumentsWriterFlushControl
  long bytesUsed = 0;
  // guarded by Reentrant lock
  private boolean isActive = true;

  ThreadState(DocumentsWriterPerThread dpwt) {
    this.dwpt = dpwt;
  }
本质是个读写锁,用来配合DocumentsWriterPerThread来完成对一个Document的写操作。

2.3  DocumentsWriterPerThread

简单理解成一个Document的写线程。线程池保证了DocumentsWriterPerThread的重用。

2.4  DocumentsWriterFlushControl

控制DocumentsWriterPerThread完成index过程中flush操作

2.5  FlushPolicy

刷新策略

理解了ThreadState这个类应该就简单了,甚至可以直接把该类看做带读写锁控制的写线程。其实是ThreadState内部引用DocumentWriterPerThread实例。在线程池初始化的时候就创建了8个ThreadState(这个时候并没有初始化,意思是DocumentWriterPerThread并没有新建起来,而是延迟初始化具体线程)。后面就尽量重用这个8个ThreadState。

DocumentsWriterPerThreadPool(int maxNumThreadStates) {//默认maxNumThreadStates=8
  if (maxNumThreadStates < 1) {
    throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates);
  }
  threadStates = new ThreadState[maxNumThreadStates];
  numThreadStatesActive = 0;
  for (int i = 0; i < threadStates.length; i++) {
    threadStates[i] = new ThreadState(null);
  }
  freeList = new ThreadState[maxNumThreadStates];
}

3  docWriter.updateDocument

好了,看完了几个基础类,回到上面updateDocument最关键的是这一行。

docWriter.updateDocument(doc, analyzer, term)

boolean updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
    final Term delTerm) throws IOException, AbortingException {

  boolean hasEvents = preUpdate();

  final ThreadState perThread = flushControl.obtainAndLock();

  final DocumentsWriterPerThread flushingDWPT;
  try {
    if (!perThread.isActive()) {
      ensureOpen();
      assert false: "perThread is not active but we are still open";
    }
    ensureInitialized(perThread);//真正初始化单个具体线程DocumentsWriterPerThread
    assert perThread.isInitialized();
    final DocumentsWriterPerThread dwpt = perThread.dwpt;
    final int dwptNumDocs = dwpt.getNumDocsInRAM();
    try {
      dwpt.updateDocument(doc, analyzer, delTerm); //DocumentsWriterPerThread线程真正更新文档
    } catch (AbortingException ae) {
      flushControl.doOnAbort(perThread);
      dwpt.abort();
      throw ae;
    } finally {
      // We don‘t know whether the document actually
      // counted as being indexed, so we must subtract here to
      // accumulate our separate counter:
      numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
    }
    final boolean isUpdate = delTerm != null;
    flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
  } finally {
    perThreadPool.release(perThread);//将该线程重新放回到线程池中,释放掉资源
  }

  return postUpdate(flushingDWPT, hasEvents);
} 

4  docWriter.updateDocument详细步骤

  1. 从线程池中获取一个ThreadState

    ThreadState obtainAndLock() {
      final ThreadState perThread = perThreadPool.getAndLock(Thread
          .currentThread(), documentsWriter);//从线程池中拿取一个ThreadState
      boolean success = false;
      try {
        if (perThread.isInitialized()
            && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
          // There is a flush-all in process and this DWPT is
          // now stale -- enroll it for flush and try for
          // another DWPT:
          addFlushableState(perThread);
        }
        success = true;
        // simply return the ThreadState even in a flush all case sine we already hold the lock
        return perThread;
      } finally {
        if (!success) { // make sure we unlock if this fails
          perThreadPool.release(perThread);
        }
      }
    }
  2. 初始化ThreadState的线程DocumentsWriterPerThread
  3. 该线程更新Document
  4. 该线程重新回到线程池中。线程池中维护了一个freeList,可重用的ThreadState都放到该freeList里面

5  DocumentsWriterPerThread.updateDocument详细步骤

该Document的更新交给一个DocumentsWriterPerThread之后,我们再往下看。

public void updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
  testPoint("DocumentsWriterPerThread addDocument start");
  assert deleteQueue != null;
  reserveOneDoc();
  docState.doc = doc;
  docState.analyzer = analyzer;
  docState.docID = numDocsInRAM;
  if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
    infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
  }
  // Even on exception, the document is still added (but marked
  // deleted), so we don‘t need to un-reserve at that point.
  // Aborting exceptions will actually "lose" more than one
  // document, so the counter will be "wrong" in that case, but
  // it‘s very hard to fix (we can‘t easily distinguish aborting
  // vs non-aborting exceptions):
  boolean success = false;
  try {
    try {
      consumer.processDocument();
    } finally {
      docState.clear();
    }
    success = true;
  } finally {
    if (!success) {
      // mark document as deleted
      deleteDocID(docState.docID);
      numDocsInRAM++;
    }
  }
  finishDocument(delTerm);
}

该线程里面我们只关心一行代码

consumer.processDocument();

从这里差不多就豁然开朗了,一切最后该Document的处理是交给了一个DocConsumer来处理。而这个DocConsumer的获取见下:

abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) throws IOException;

Lucene实现了一个默认的DocConsumer即:DefaultIndexingChain。 那接下来就看该DocConsumer是如何处理该Document的了就行了。

6  DefaultIndexingChain.processDocument详细步骤

@Override
public void processDocument() throws IOException, AbortingException {

  // How many indexed field names we‘ve seen (collapses
  // multiple field instances by the same name):
  int fieldCount = 0;

  long fieldGen = nextFieldGen++;

  // NOTE: we need two passes here, in case there are
  // multi-valued fields, because we must process all
  // instances of a given field at once, since the
  // analyzer is free to reuse TokenStream across fields
  // (i.e., we cannot have more than one TokenStream
  // running "at once"):

  termsHash.startDocument();

  fillStoredFields(docState.docID);
  startStoredFields();

  boolean aborting = false;
  try {
    for (IndexableField field : docState.doc) {//挨个遍历每个Field做处理,哈哈,终于露出可爱的尾巴了
      fieldCount = processField(field, fieldGen, fieldCount);
    }
  } catch (AbortingException ae) {
    aborting = true;
    throw ae;
  } finally {
    if (aborting == false) {
      // Finish each indexed field name seen in the document:
      for (int i=0;i<fieldCount;i++) {
        fields[i].finish();
      }
      finishStoredFields();
    }
  }

  try {
    termsHash.finishDocument();
  } catch (Throwable th) {
    // Must abort, on the possibility that on-disk term
    // vectors are now corrupt:
    throw AbortingException.wrap(th);
  }
}

看到上面代码,我笑了。哈哈,越来越清晰,有没有。对该Document的处理,无非就是演化成遍历每个Field,对Field做处理就行了。但是具体Field怎么处理,该wiki不涉及,放到另外一篇wiki中深入记录(参考:Document存储细节)。

五,Commit Document

indexWriter.commit();

提交Commit完成如下工作:

  • 凡是挂起的改变都提交到index中。包括新增加的文档,要删除的文档,segement的合并。
  • 该操作会执行Directory.sync,sync操作会将文件系统的cache都刷新到disk上面。虽然比较耗时(同步耗时),但是刷新到disk上之后,VM挂掉(或者断电)都不影响这些挂起的更新。

sync操作具体的解释可参考如下一段解释:

传统的UNIX实现在内核中设有缓冲区高速缓存或页面高速缓存,大多数磁盘I/O都通过缓冲进行。当将数据写入文件时,内核通常先将该数据复制到其中一个缓冲区中,如果该缓冲区尚未写满,则并不将其排入输出队列,而是等待其写满或者当内核需要重用该缓冲区以便存放其他磁盘块数据时,再将该缓冲排入输出队列,然后待其到达队首时,才进行实际的I/O操作。这种输出方式被称为延迟写(delayed write)(Bach [1986]第3章详细讨论了缓冲区高速缓存)。
延迟写减少了磁盘读写次数,但是却降低了文件内容的更新速度,使得欲写到文件中的数据在一段时间内并没有写到磁盘上。当系统发生故障时,这种延迟可能造成文件更新内容的丢失。为了保证磁盘上实际文件系统与缓冲区高速缓存中内容的一致性,UNIX系统提供了sync、fsync和fdatasync三个函数。
sync函数只是将所有修改过的块缓冲区排入写队列,然后就返回,它并不等待实际写磁盘操作结束。
通常称为update的系统守护进程会周期性地(一般每隔30秒)调用sync函数。这就保证了定期冲洗内核的块缓冲区。命令sync(1)也调用sync函数。
fsync函数只对由文件描述符filedes指定的单一文件起作用,并且等待写磁盘操作结束,然后返回。fsync可用于数据库这样的应用程序,这种应用程序需要确保将修改过的块立即写到磁盘上。
fdatasync函数类似于fsync,但它只影响文件的数据部分。而除数据外,fsync还会同步更新文件的属性。
对于提供事务支持的数据库,在事务提交时,都要确保事务日志(包含该事务所有的修改操作以及一个提交记录)完全写到硬盘上,才认定事务提交成功并返回给应用层。 

看完这段解释就能明白,sync操作就是将文件系统(甚至内核)中的缓存数据都刷新到disk上面,保证数据的安全性(OS挂掉,断电,数据不会丢失)。

那具体Lucene做了些什么呢?

private final void commitInternal(MergePolicy mergePolicy) throws IOException {

  if (infoStream.isEnabled("IW")) {
    infoStream.message("IW", "commit: start");
  }

  synchronized(commitLock) {
    ensureOpen(false);

    if (infoStream.isEnabled("IW")) {
      infoStream.message("IW", "commit: enter lock");
    }

    if (pendingCommit == null) {
      if (infoStream.isEnabled("IW")) {
        infoStream.message("IW", "commit: now prepare");
      }
      prepareCommitInternal(mergePolicy);//最关键的一行
    } else {
      if (infoStream.isEnabled("IW")) {
        infoStream.message("IW", "commit: already prepared");
      }
    }

    finishCommit();
  }
}

走到prepareCommitInternal里面就是详细的刷新操作,索引刷新操作放在另外一篇wiki中介绍。

六,关闭IndexWriter

刷新数据,关闭资源。往里走,逻辑还是很丰富的。等flush详细讲完之后,再回头看这部分。

时间: 2024-10-01 02:27:35

Lucene索引创建过程的相关文章

Lucene索引创建方法和步骤

在全文索引工具中,都是由这样的三部分组成 1.索引部分 2.分词部分 3.搜索部分 ---------------------------------- 索引创建域选项 ---------------------------------- Field.Store.YES或者NO(存储域选项) YES:表示会把这个域中的内容完全存储到文件中,方便进行还原[对于主键,标题可以是这种方式存储] NO:表示把这个域的内容不存储到文件中,但是可以被索引,此时内容无法完全还原(doc.get())[对于内容

lucene 索引创建步骤

一.步骤: 1.存储位置:1)文件: Directory dir= FSDirectory.open(new File("D:\\LuceneIndex")); 2)内存: new RAMDirectory(FSDirectory.getDirectory(file));//不建议,只会把一些搜索相关的信息放入到内存,不是全部的索引文件 2.分词器: Analyzer analyzer=new IKAnalyzer();//这个是中文分词器,并不是lucene自带的(StandardA

lucene 索引流程整理笔记

索引的原文档(Document). 为了方便说明索引创建过程,这里特意用两个文件为例: 文件一:Students should be allowed to go out with their friends, but not allowed to drink beer. 文件二:My friend Jerry went to school to see his students but found them drunk which is not allowed. 结果处的索引文件: Docume

搜索引擎系列五:Lucene索引详解(IndexWriter详解、Document详解、索引更新)

一.IndexWriter详解 问题1:索引创建过程完成什么事? 分词.存储到反向索引中 1. 回顾Lucene架构图: 介绍我们编写的应用程序要完成数据的收集,再将数据以document的形式用lucene的索引API创建索引.存储. 这里重点要强调应用代码负责做什么,lucene负责做什么. 2. Lucene索引创建API 图示 通过该图介绍lucene创建索引的核心API:Document.IndexWriter Lucene中要索引的文档.数据记录以document表示,应用程序通过I

InnoDB 快速索引创建

5.0和更早版本的MySQL中,在一个已经有很多数据的表上添加或者删除一个索引将非常耗时.CREATE INDEX和DROP INDEX通过创建一个新的空的带有要创建索引的表,然后拷贝存在的行到新表中,同时更新索引,当此时key没有排序时插入条目极慢.在所有的行都被拷贝完成以后,旧表被删除,新表被改名. 从5.1开始,MySQL允许一个存储引擎在不拷贝表数据的情况下创建或者删除一个索引.MySQL 5.1中内建的InnoDB,并没有利用这个特点,新发布的插件带有这个功能. 在InnoDB中,表的

Lucene索引过程中的内存管理与数据存储

Lucene的索引过程分两个阶段,第一阶段把文档索引到内存中:第二阶段,即内存满了,就把内存中的数据刷新到硬盘上.          倒排索引信息在内存存储方式 Lucene有各种Field,比如StringField,TextField,IntField,FloatField,DoubleField-,Lucene在处理的过程中把各种Field都处理成相应的byte[],以最本质的方式来看待各种Field的内容,统一了数据的存储形式. 在写入内存阶段,第一步就是需要理清各个类之间的关系. 在索

lucene 建立索引的过程

时间 2014-06-30 17:56:52 ? CSDN博客 原文 http://blog.csdn.net/caohaicheng/article/details/35992149 看lucene主页(http://lucene.apache.org/)上目前lucene已经到4.9.0版本了, 参考学习的书是按照2.1版本讲解的,写的代码例子是用的3.0.2版本的,版本 的不同导致有些方 法的 使用差异,但是大体还是相同的. 源代码用到的jar包(3.0.2版本)下载地址 参考资料: 1.

一步一步跟我学习lucene(6)---lucene索引优化之多线程创建索引

这两天工作有点忙,博客更新不及时,请大家见谅: 前面了解到lucene在索引创建的时候一个IndexWriter获取到一个读写锁,这样势在lucene创建大数据量的索引的时候,执行效率低下的问题: 查看前面文档一步一步跟我学习lucene(5)---lucene的索引构建原理可以看出,lucene索引的建立,跟以下几点关联很大: 磁盘空间大小,这个直接影响索引的建立,甚至会造成索引写入提示完成,但是没有同步的问题: 索引合并策略的选择,这个类似于sql里边的批量操作,批量操作的数量过多直接影响执

搜索引擎系列 ---lucene简介 创建索引和搜索初步

一.什么是Lucene? Lucene最初是由Doug Cutting开发的,2000年3月,发布第一个版本,是一个全文检索引擎的架构,提供了完整的查询引擎和索引引擎 :Lucene得名于Doug妻子的中名,同时这也她外祖母的姓;目前是Apache基金会的一个顶级项目,同时也是学习搜索引擎入门必知必会. Lucene 是一个 JAVA 搜索类库,它本身并不是一个完整的解决方案,需要额外的开发工作. 优点:成熟的解决方案,有很多的成功案例.apache 顶级项目,正在持续快速的进步.庞大而活跃的开