一步一步跟我学习lucene(19)---lucene增量更新和NRT(near-real-time)Query近实时查询

这两天加班,不能兼顾博客的更新,请大家见谅。

有时候我们创建完索引之后,数据源可能有更新的内容,而我们又想像数据库那样能直接体现在查询中,这里就是我们所说的增量索引。对于这样的需求我们怎么来实现呢?lucene内部是没有提供这种增量索引的实现的;

这里我们一般可能会想到,将之前的索引全部删除,然后进行索引的重建。对于这种做法,如果数据源的条数不是特别大的情况下倒还可以,如果数据源的条数特别大的话,势必会造成查询数据耗时,同时索引的构建也是比较耗时的,几相叠加,势必可能造成查询的时候数据缺失的情况,这势必严重影响用户的体验;

比较常见的增量索引的实现是:

  • 设置一个定时器,定时从数据源中读取比现有索引文件中新的内容或是数据源中带有更新标示的数据。
  • 对数据转换成需要的document并进行索引

这样做较以上的那种全删除索引然后重建的好处在于:

  • 数据源查询扫描的数据量小
  • 相应的更新索引的条数也少,减少了大量的IndexWriter的commit和close这些耗时操作

以上解决了增量的问题,但是实时性的问题还是存在的:

  • 索引的变更只有在IndexWriter的commit执行之后才可以体现出来

那么我们怎样对实时性有个提升呢,大家都知道lucene索引可以以文件索引和内存索引两种方式存在,相较于文件索引,内存索引的执行效率要高于文件索引的构建,因为文件索引是要频繁的IO操作的;结合以上的考虑,我们采用文件索引+内存索引的形式来进行lucene的增量更新;其实现机制如下:

  • 定时任务扫描数据源的变更
  • 对获得的数据源列表放在内存中
  • 内存中的document达到数量限制的时候,以队列的方式删除内存中的索引,并将之添加到文件索引
  • 查询的时候采用文件+内存索引联合查询的方式以达到NRT效果

定时任务调度器

java内置了TimerTask,此类是可以提供定时任务的,但是有一点就是TimerTask的任务是无状态的,我们还需要对任务进行并行的设置;了解到quartz任务调度框架提供了有状态的任务StatefulJob,即在本次调度任务没有执行完毕时,下次任务不会执行;

常见的我们启动一个quartz任务的方式如下:

Date runTime = DateBuilder.evenSecondDate(new Date());
   StdSchedulerFactory sf = new StdSchedulerFactory();
      Scheduler scheduler = sf.getScheduler();
   JobDetail job = JobBuilder.newJob(XXX.class).build();
      Trigger trigger = TriggerBuilder.newTrigger().startAt(runTime).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).repeatForever()).forJob(job).build();
      scheduler.scheduleJob(job, trigger);
     
      scheduler.start();</span>

以上我们是设置了每三秒执行一次定时任务,而任务类是XXX

任务类通用方法

这里我定义了一个XXX的父类,其定义如下:

package com.chechong.lucene.indexcreasement;

import java.util.List;
import java.util.TimerTask;

import org.apache.lucene.store.RAMDirectory;
import org.quartz.Job;
import org.quartz.StatefulJob;

/**有状态的任务:串行执行,即不允许上次执行没有完成即开始本次如果需要并行给接口改为Job即可
 * @author lenovo
 *
 */
public abstract class BaseInCreasementIndex implements StatefulJob {
	/**
	 * 内存索引
	 */
	private RAMDirectory ramDirectory;
	public BaseInCreasementIndex() {
	}
	public BaseInCreasementIndex(RAMDirectory ramDirectory) {
		super();
		this.ramDirectory = ramDirectory;
	}

	/**更新索引
	 * @throws Exception
	 */
	public abstract void updateIndexData() throws Exception;
	/**消费数据
	 * @param list
	 */
	public abstract void consume(List list) throws Exception;
}

任务类相关实现,以下方法是获取待添加索引的数据源XXXInCreasementIndex

@Override
	public void execute(JobExecutionContext context) throws JobExecutionException {
		try {
			XXXInCreasementIndex index = new XXXInCreasementIndex(Constants.XXX_INDEX_PATH, XXXDao.getInstance(), RamDirectoryControl.getRAMDireactory());
			index.updateIndexData();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
@Override
	public void updateIndexData() throws Exception {
		int maxBeanID = SearchUtil.getLastIndexBeanID();
		System.out.println(maxBeanID);
		List<XXX> sources = XXXDao.getListInfoBefore(maxBeanID);、、
		if (sources != null && sources.size() > 0) {
			this.consume(sources);
		}
	}

这里,XXX代表我们要获取数据的实体类对象

consume方法主要是做两件事:

  • 数据存放到内存索引
  • 判断内存索引数量,超出限制的话以队列方式取出超出的数量,并将之存放到文件索引
@Override
	public void consume(List list) throws Exception {
		IndexWriter writer = RamDirectoryControl.getRAMIndexWriter();
		RamDirectoryControl.consume(writer,list);
	}

上边我们将内存索引和队列的实现放在了RamDirectoryControl中

内存索引控制器

首先我们对内存索引的IndexWriter进行初始化,在初始化的时候需要注意先执行一次commit,否则会提示no segments的异常

private static IndexWriter ramIndexWriter;
	private static RAMDirectory directory;
	static{
		directory = new RAMDirectory();
		try {
			ramIndexWriter = getRAMIndexWriter();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	public static RAMDirectory getRAMDireactory(){
		return directory;
	}
	public static IndexSearcher getIndexSearcher() throws IOException{
		IndexReader reader = null;
		IndexSearcher searcher = null;
		try {
			reader = DirectoryReader.open(directory);
		} catch (IOException e) {
			e.printStackTrace();
		}
		searcher =  new IndexSearcher(reader);
		return searcher;
	}
	/**单例模式获取ramIndexWriter
	 * @return
	 * @throws Exception
	 */
	public static IndexWriter getRAMIndexWriter() throws Exception{
			if(ramIndexWriter == null){
				synchronized (IndexWriter.class) {
					Analyzer analyzer = new IKAnalyzer();
					IndexWriterConfig iwConfig = new IndexWriterConfig(analyzer);
				    iwConfig.setOpenMode(OpenMode.CREATE_OR_APPEND);
					try {
						ramIndexWriter = new IndexWriter(directory, iwConfig);
						ramIndexWriter.commit();
						ramIndexWriter.close();
						iwConfig = new IndexWriterConfig(analyzer);
					    iwConfig.setOpenMode(OpenMode.CREATE_OR_APPEND);
						ramIndexWriter = new IndexWriter(directory, iwConfig);
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}

		return ramIndexWriter;
	}

定义一个获取内存索引中数据条数的方法

/**根据查询器、查询条件、每页数、排序条件进行查询
	 * @param query 查询条件
	 * @param first 起始值
	 * @param max 最大值
	 * @param sort 排序条件
	 * @return
	 */
	public static TopDocs getScoreDocsByPerPageAndSortField(IndexSearcher searcher,Query query, int first,int max, Sort sort){
		try {
			if(query == null){
				System.out.println(" Query is null return null ");
				return null;
			}
			TopFieldCollector collector = null;
			if(sort != null){
				collector = TopFieldCollector.create(sort, first+max, false, false, false);
			}else{
				SortField[] sortField = new SortField[1];
				sortField[0] = new SortField("createTime",SortField.Type.STRING,true);
				Sort defaultSort = new Sort(sortField);
				collector = TopFieldCollector.create(defaultSort,first+max, false, false, false);
			}
			searcher.search(query, collector);
			return collector.topDocs(first, max);
		} catch (IOException e) {
			// TODO Auto-generated catch block
		}
		return null;
	}

此方法返回结果为TopDocs,我们根据TopDocs的totalHits来获取内存索引中的数据条数,以此来鉴别内存占用,防止内存溢出。

consume方法的实现如下:

/**消费数据
	 * @param docs
	 * @param listSize
	 * @param writer
	 * @param list
	 * @throws Exception
	 */
	public static void consume(IndexWriter writer, List list) throws Exception {
		Query query = new MatchAllDocsQuery();
		IndexSearcher searcher = getIndexSearcher();
		System.out.println(directory);
		TopDocs topDocs = getScoreDocsByPerPageAndSortField(searcher,query, 1, 1, null);
		int currentTotal = topDocs.totalHits;
		if(currentTotal+list.size() > Constants.XXX_RAM_LIMIT){
			//超出内存限制
			int pulCount = Constants.XXX_RAM_LIMIT - currentTotal;
			List<Document> docs = new LinkedList<Document>();

			if(pulCount <= 0){
				//直接处理集合的内容
				TopDocs allDocs = SearchUtil.getScoreDocsByPerPageAndSortField(searcher, query, 0,currentTotal, null);
				ScoreDoc[] scores = allDocs.scoreDocs;
				for(int i = 0 ;i < scores.length ; i ++){
					//取出内存中的数据
					Document doc1 = searcher.doc(scores[i].doc);
					Integer pollId = Integer.parseInt(doc1.get("id"));
					Document doc = delDocumentFromRAMDirectory(pollId);
					if(doc != null){
						XXX carSource = (XXX) BeanTransferUtil.doc2Bean(doc, XXX.class);
						Document doc2 = carSource2Document(carSource);
						if(doc2 != null){
							docs.add(doc2);
						}
					}
				}
				addDocumentToFSDirectory(docs);
				writer = getRAMIndexWriter();
				consume(writer, list);
			}else{
				//先取出未达到内存的部分
				List subProcessList = list.subList(0, pulCount);
				consume(writer, subProcessList);
				List leaveList = list.subList(pulCount, list.size());
				consume(writer, leaveList);
			}
		}else{//未超出限制,直接存放到内存
			int listSize = list.size();
			if(listSize > 0){
				//存放到内存

			}
		}

	}

上边的逻辑为:

  1. 根据getScoreDocsByPerPageAndSortField获取当前内存中的数据条数
  2. 根据内存中数据数量A和本次获取的数据源的总数B和内存中限制的数量C进行比较
  3. 如果A+B<=C则未超出内存索引的限制,所有数据均存放到内存
  4. 反之,判断当前内存中的数据是否已经达到限制,如果已经超出,则直接处理取出内存中的内容,然后回调此方法。
  5. 如果未达到限制,先取出未达到限制的部分,然后对剩余的进行回调。

这里我们的BeanTransferUtil是根据document转换成对应的bean的方法,此处用到了反射和commons-beanutils.jar

package com.chechong.util;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.lucene.document.Document;

public class BeanTransferUtil {

	public static Object doc2Bean(Document doc, Class clazz) {
		try {
			Object obj = clazz.newInstance();
			Field[] fields = clazz.getDeclaredFields();
			for (Field field : fields) {
				field.setAccessible(true);
				String fieldName = field.getName();
				BeanUtils.setProperty(obj, fieldName, doc.get(fieldName));
			}
			return obj;
		} catch (InstantiationException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IllegalAccessException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InvocationTargetException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return null;
	}
}

从内存索引中读取索引的方法如下:

/**从内存索引中删除指定的doc
	 * @param pollId
	 * @throws IOException
	 */
	private static Document delDocumentFromRAMDirectory(Integer pollId) throws IOException {
		Document doc = null;
		Query query = SearchUtil.getQuery("id", "int", pollId+"", false);
		IndexSearcher searcher = getIndexSearcher();
		try {
			TopDocs queryDoc = SearchUtil.getScoreDocsByPerPageAndSortField(searcher, query, 0, 1, null);
			ScoreDoc[] docs = queryDoc.scoreDocs;
			System.out.println(docs.length);
			if(docs.length > 0){
				doc = searcher.doc(docs[0].doc);
				System.out.println(doc);
				ramIndexWriter.deleteDocuments(query);
				ramIndexWriter.commit();
			}
			return doc;
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return null;
	}

此处是根据id来读取内存索引中的内容,然后将它转换成document同时删除内存中的对应记录。

NRT近实时查询的实现

对于上边的索引我们要采用适当的查询方法,这里查询时候为了达到近实时的效果,需要将内存索引添加到查询的范围中,即IndexReader中。

这里的IndexSearcher的获取方法如下:

/**多目录多线程查询
	 * @param parentPath 父级索引目录
	 * @param service 多线程查询
	 * @param isAddRamDirectory 是否增加内存索引查询
	 * @return
	 * @throws IOException
	 */
	public static IndexSearcher getMultiSearcher(String parentPath,ExecutorService service, boolean isAddRamDirectory) throws IOException{
		File file = new File(parentPath);
		File[] files = file.listFiles();

		IndexReader[] readers = null;
		if(!isAddRamDirectory){
			readers = new IndexReader[files.length];
		}else{
			readers = new IndexReader[files.length+1];
		}
		for (int i = 0 ; i < files.length ; i ++) {
			readers[i] = DirectoryReader.open(FSDirectory.open(Paths.get(files[i].getPath(), new String[0])));
		}
		if(isAddRamDirectory){
			readers[files.length] = DirectoryReader.open(RamDirectoryControl.getRAMDireactory());
		}

		MultiReader multiReader = new MultiReader(readers);
		IndexSearcher searcher = new IndexSearcher(multiReader,service);
		return searcher;
	}

如此,我们就可以在查询的时候既从文件索引中读取,也从内存索引中检索数据了;

一步一步跟我学习lucene是对近期做lucene索引的总结,大家有问题的话联系本人的Q-Q:  891922381,同时本人新建Q-Q群:106570134(lucene,solr,netty,hadoop),如蒙加入,不胜感激,大家共同探讨,本人争取每日一博,希望大家持续关注,会带给大家惊喜的



时间: 2024-12-14 18:49:18

一步一步跟我学习lucene(19)---lucene增量更新和NRT(near-real-time)Query近实时查询的相关文章

一步一步跟我学习lucene(9)---lucene搜索之拼写检查和相似度查询提示(spellcheck)

suggest应用场景 用户的输入行为是不确定的,而我们在写程序的时候总是想让用户按照指定的内容或指定格式的内容进行搜索,这里就要进行人工干预用户输入的搜索条件了:我们在用百度谷歌等搜索引擎的时候经常会看到按键放下的时候直接会提示用户是否想搜索某些相关的内容,恰好lucene在开发的时候想到了这一点,lucene提供的suggest包正是用来解决上述问题的. suggest包联想词相关介绍 suggest包提供了lucene的自动补全或者拼写检查的支持: 拼写检查相关的类在org.apache.

一步一步跟我学习lucene(13)---lucene搜索之自定义排序的实现原理和编写自己的自定义排序工具

自定义排序说明 我们在做lucene搜索的时候,可能会需要排序功能,虽然lucene内置了多种类型的排序,但是如果在需要先进行某些值的运算然后在排序的时候就有点显得无能为力了: 要做自定义查询,我们就要研究lucene已经实现的排序功能,lucene的所有排序都是要继承FieldComparator,然后重写内部实现,这里以IntComparator为例子来查看其实现: IntComparator相关实现 其类的声明为 public static class IntComparator exte

一步一步跟我学习lucene(16)---lucene搜索之facet查询查询示例(2)

本篇是接一步一步跟我学习lucene(14)---lucene搜索之facet索引原理和facet查询实例(http://blog.csdn.net/wuyinggui10000/article/details/45973769),上篇主要是统计facet的dim和每个种类对应的数量,个人感觉这个跟lucene的group不同的在于facet的存储类似于hash(key-field-value)形式的,而group则是单一的map(key-value)形式的,虽然都可以统计某一品类的数量,显然f

一步一步跟我学习lucene(14)---lucene搜索之facet查询原理和facet查询实例

Facet说明 我们在浏览网站的时候,经常会遇到按某一类条件查询的情况,这种情况尤以电商网站最多,以天猫商城为例,我们选择某一个品牌,系统会将该品牌对应的商品展示出来,效果图如下: 如上图,我们关注的是品牌,选购热点等方面,对于类似的功能我们用lucene的term查询当然可以,但是在数据量特别大的情况下还用普通查询来实现显然会因为FSDirectory.open等耗时的操作造成查询效率的低下,同时普通查询是全部document都扫描一遍,这样显然造成了查询效率低: lucene提供了facet

一步一步跟我学习lucene(12)---lucene搜索之分组处理group查询

grouping介绍 我们在做lucene搜索的时候,可能会用到对某个条件的数据进行统计,比如统计有多少个省份,在sql查询中我们可以用distinct来完成类似的功能,也可以用group by来对查询的列进行分组查询.在lucene中我们实现类似的功能怎么做呢,比较费时的做法时我们查询出所有的结果,然后对结果里边的省份对应的field查询出来,往set里边放,显然这种做法效率低,不可取:lucene为了解决上述问题,提供了用于分组操作的模块group,group主要用户处理不同lucene中含

一步一步跟我学习lucene(11)---lucene搜索之高亮显示highlighter

highlighter介绍 这几天一直加班,博客有三天没有更新了,望见谅:我们在做查询的时候,希望对我们自己的搜索结果与搜索内容相近的地方进行着重显示,就如下面的效果 这里我们搜索的内容是"一步一步跟我学习lucene",搜索引擎展示的结果中对用户的输入信息进行了配色方面的处理,这种区分正常文本和输入内容的效果即是高亮显示: 这样做的好处: 视觉上让人便于查找有搜索对应的文本块: 界面展示更友好: lucene提供了highlighter插件来体现类似的效果: highlighter对

一步一步跟我学习lucene(10)---lucene搜索之联想词提示之suggest原理和应用

昨天了解了suggest包中的spell相关的内容,主要是拼写检查和相似度查询提示: 今天准备了解下关于联想词的内容,lucene的联想词是在org.apache.lucene.search.suggest包下边,提供了自动补全或者联想提示功能的支持: InputIterator说明 InputIterator是一个支持枚举term,weight,payload三元组的供suggester使用的接口,目前仅支持AnalyzingSuggester,FuzzySuggester andAnalyz

一步一步跟我学习lucene(8)---lucene搜索之索引的查询原理和查询工具类示例

昨天我们了解了lucene搜索之IndexSearcher构建过程(http://blog.csdn.net/wuyinggui10000/article/details/45698667),对lucene的IndexSearcher有一个大体的了解,知道了怎么创建IndexSearcher,就要开始学会使用IndexSearcher进行索引的搜索,本节我们学习索引的查询原理和根据其相关原理写索引查询的工具类的编写: IndexSearcher提供了几个常用的方法: IndexSearcher.

一步一步跟我学习lucene(7)---lucene搜索之IndexSearcher构建过程

最近一直在写一步一步跟我学习lucene系列(http://blog.csdn.net/wuyinggui10000/article/category/3173543),个人的博客也收到了很多的访问量,谢谢大家的关注,这也是对我个人的一个激励,O(∩_∩)O哈哈~,个人感觉在博客的编写过程中自己收获了很多,我会一直继续下去,在工作的过程中自己也会写出更多类似系列的博客,也算是对自己只是的一种积累: IndexSearcher 搜索引擎的构建分为索引内容和查询索引两个大方面,这里要介绍的是luce