Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略。本文以及后续的文章将重点介绍Replication策略。Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步。本文先介绍在SolrCloud的leader到replica的数据同步,下一篇文章将介绍通过配置SolrConfig.xml来实现多个Solr节点间的主从同步。

一. Replication策略介绍

Replication的作用在前文已经介绍过了,当需要同步的数据较多时候,Solr会放弃按document为单位的同步模式(PeerSync)而采用以文件为最小单位的同步模式。在Replication的过程中,重点使用了SnapPuller类,它封装了对leader数据快照以及通过快照来实现同步的代码。Replication流程原理如下图所示。接下来根据源码来介绍每一步骤。

  • 开始Replication的时候,首先进行一次commitOnLeader操作,即发送commit命令到leader。它的作用就是将leader的update中的数据刷入到索引文件中,使得快照snap完整。
 1   private void commitOnLeader(String leaderUrl) throws SolrServerException,
 2       IOException {
 3     HttpSolrServer server = new HttpSolrServer(leaderUrl);
 4     try {
 5       server.setConnectionTimeout(30000);
 6       UpdateRequest ureq = new UpdateRequest();
 7       ureq.setParams(new ModifiableSolrParams());
 8       ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
 9       ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
10       ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
11           server);
12     } finally {
13       server.shutdown();
14     }
15   }
  • 获取leader的lastVersion与lastGeneration,同本分片的进行比较来确定是否需要进行同步。
 1       //get the current ‘replicateable‘ index version in the master
 2       NamedList response = null;
 3       try {
 4         response = getLatestVersion();
 5       } catch (Exception e) {
 6         LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
 7         return false;
 8       }
 9       long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
10       long latestGeneration = (Long) response.get(GENERATION);
  • 检查本分片是否打开IndexWriter,如果没有则Recovery失败。这是因为没有打开indexWriter就无法获取索引的generation以及version信息,replication无法进行下去。
 1       // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes)
 2       IndexCommit commit = core.getDeletionPolicy().getLatestCommit();
 3       if (commit == null) {
 4         // Presumably the IndexWriter hasn‘t been opened yet, and hence the deletion policy hasn‘t been updated with commit points
 5         RefCounted<SolrIndexSearcher> searcherRefCounted = null;
 6         try {
 7           searcherRefCounted = core.getNewestSearcher(false);
 8           if (searcherRefCounted == null) {
 9             LOG.warn("No open searcher found - fetch aborted");
10             return false;
11           }
12           commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
13         } finally {
14           if (searcherRefCounted != null)
15             searcherRefCounted.decref();
16         }
17       }
  • 如果获取的leader的lastestVersion为0,则表示leader没有索引数据,那么根本就不需要进行replication。所以返回true结果。
 1       if (latestVersion == 0L) {
 2         if (forceReplication && commit.getGeneration() != 0) {
 3           // since we won‘t get the files for an empty index,
 4           // we just clear ours and commit
 5           RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
 6           try {
 7             iw.get().deleteAll();
 8           } finally {
 9             iw.decref();
10           }
11           SolrQueryRequest req = new LocalSolrQueryRequest(core,
12               new ModifiableSolrParams());
13           core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
14         }
15
16         //there is nothing to be replicated
17         successfulInstall = true;
18         return true;
19       }
  • 我们还需要通过比较分片的lastestVersion和leader的lastestVersion来确定是否需要继续进行replication,因为两者相等同样没必要进行replication,除非进行的时forceReplication
1       if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
2         //master and slave are already in sync just return
3         LOG.info("Slave in sync with master.");
4         successfulInstall = true;
5         return true;
6       }
  • 获取leader节点的lastestGeneration的索引文件列表以及相关文件信息,以及配置文件列表以及信息。如果文件列表为空,则退出replication。
 1       // get the list of files first
 2       fetchFileList(latestGeneration);
 3       // this can happen if the commit point is deleted before we fetch the file list.
 4       if(filesToDownload.isEmpty()) return false;
 5
 6  private void fetchFileList(long gen) throws IOException {
 7     ModifiableSolrParams params = new ModifiableSolrParams();
 8     params.set(COMMAND,  CMD_GET_FILE_LIST);
 9     params.set(GENERATION, String.valueOf(gen));
10     params.set(CommonParams.WT, "javabin");
11     params.set(CommonParams.QT, "/replication");
12     QueryRequest req = new QueryRequest(params);
13     HttpSolrServer server = new HttpSolrServer(masterUrl, myHttpClient);  //XXX modify to use shardhandler
14     try {
15       server.setSoTimeout(60000);
16       server.setConnectionTimeout(15000);
17       NamedList response = server.request(req);
18
19       List<Map<String, Object>> files = (List<Map<String,Object>>) response.get(CMD_GET_FILE_LIST);
20       if (files != null)
21         filesToDownload = Collections.synchronizedList(files);
22       else {
23         filesToDownload = Collections.emptyList();
24         LOG.error("No files to download for index generation: "+ gen);
25       }
26
27       files = (List<Map<String,Object>>) response.get(CONF_FILES);
28       if (files != null)
29         confFilesToDownload = Collections.synchronizedList(files);
30
31     } catch (SolrServerException e) {
32       throw new IOException(e);
33     } finally {
34       server.shutdown();
35     }
36   }
  • 建立临时的index目录来存放同步过来的数据,临时index目录的格式是index.timestamp。它存放在data目录下。
1 String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
2       tmpIndex = createTempindexDir(core, tmpIdxDirName);
3
4       tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
5
6       // cindex dir...
7       indexDirPath = core.getIndexDir();
8       indexDir = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
  • 判断isFullCopyNeeded是否为true来决定是否需要关闭IndexWriter。如果本分片(slave)的数据的version或者generation新于master(leader)或者是forceReplication,那么必须进行所有数据的完整同步。
 1 // if the generation of master is older than that of the slave , it means they are not compatible to be copied
 2       // then a new index directory to be created and all the files need to be copied
 3       boolean isFullCopyNeeded = IndexDeletionPolicyWrapper
 4           .getCommitTimestamp(commit) >= latestVersion
 5           || commit.getGeneration() >= latestGeneration || forceReplication;
 6
 7         if (isIndexStale(indexDir)) {
 8           isFullCopyNeeded = true;
 9         }
10
11         if (!isFullCopyNeeded) {
12           // rollback - and do it before we download any files
13           // so we don‘t remove files we thought we didn‘t need
14           // to download later
15           solrCore.getUpdateHandler().getSolrCoreState()
16           .closeIndexWriter(core, true);
17         }
  • 现在才开始真正的下载不同的索引文件,Replication是根据索引文件的大小来判断是否发生过变化.下载文件时候,Replication是以packet的大小为单位进行下载的,这可以在SolrConfig.xml中设置,下一篇文章将具体介绍这个。
 1   private void downloadIndexFiles(boolean downloadCompleteIndex,
 2       Directory indexDir, Directory tmpIndexDir, long latestGeneration)
 3       throws Exception {
 4     if (LOG.isDebugEnabled()) {
 5       LOG.debug("Download files to dir: " + Arrays.asList(indexDir.listAll()));
 6     }
 7     for (Map<String,Object> file : filesToDownload) {
 8       if (!slowFileExists(indexDir, (String) file.get(NAME))
 9           || downloadCompleteIndex) {
10         dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
11             (String) file.get(NAME), false, latestGeneration);
12         currentFile = file;
13         dirFileFetcher.fetchFile();
14         filesDownloaded.add(new HashMap<>(file));
15       } else {
16         LOG.info("Skipping download for " + file.get(NAME)
17             + " because it already exists");
18       }
19     }
20   }
21
22  /**
23      * The main method which downloads file
24      */
25     void fetchFile() throws Exception {
26       try {
27         while (true) {
28           final FastInputStream is = getStream();
29           int result;
30           try {
31             //fetch packets one by one in a single request
32             result = fetchPackets(is);
33             if (result == 0 || result == NO_CONTENT) {
34
35               return;
36             }
37             //if there is an error continue. But continue from the point where it got broken
38           } finally {
39             IOUtils.closeQuietly(is);
40           }
41         }
42       } finally {
43         cleanup();
44         //if cleanup suceeds . The file is downloaded fully. do an fsync
45         fsyncService.submit(new Runnable(){
46           @Override
47           public void run() {
48             try {
49               copy2Dir.sync(Collections.singleton(saveAs));
50             } catch (IOException e) {
51               fsyncException = e;
52             }
53           }
54         });
55       }
56     }
  • 到这里已经完成了索引文件的同步,但是整一个同步过程才进行了一半。接下来要获取已经发生过修改的配置文件,如果没有修改过的配置文件则不需要下载配置文件。而比较配置文件是否发生变化则是比较文件的checksum信息。下载配置文件的过程downloadConfFiles()与下载索引文件的过程类似,就不具体介绍了。
 1     //get the details of the local conf files with the same alias/name
 2     List<Map<String, Object>> localFilesInfo = replicationHandler.getConfFileInfoFromCache(names, confFileInfoCache);
 3     //compare their size/checksum to see if
 4     for (Map<String, Object> fileInfo : localFilesInfo) {
 5       String name = (String) fileInfo.get(NAME);
 6       Map<String, Object> m = nameVsFile.get(name);
 7       if (m == null) continue; // the file is not even present locally (so must be downloaded)
 8       if (m.get(CHECKSUM).equals(fileInfo.get(CHECKSUM))) {
 9         nameVsFile.remove(name); //checksums are same so the file need not be downloaded
10       }
11     }
 1   private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestGeneration) throws Exception {
 2     LOG.info("Starting download of configuration files from master: " + confFilesToDownload);
 3     confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
 4     File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date()));
 5     try {
 6       boolean status = tmpconfDir.mkdirs();
 7       if (!status) {
 8         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
 9                 "Failed to create temporary config folder: " + tmpconfDir.getName());
10       }
11       for (Map<String, Object> file : confFilesToDownload) {
12         String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
13         localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, true, latestGeneration);
14         currentFile = file;
15         localFileFetcher.fetchFile();
16         confFilesDownloaded.add(new HashMap<>(file));
17       }
18       // this is called before copying the files to the original conf dir
19       // so that if there is an exception avoid corrupting the original files.
20       terminateAndWaitFsyncService();
21       copyTmpConfFiles2Conf(tmpconfDir);
22     } finally {
23       delTree(tmpconfDir);
24     }
25   }
  • 下载完索引数据以及配置文件后,现在需要处理临时的索引数据了。不同于索引文件的下载,配置文件在下载的过程中就已经替换了原先的配置文件了,这是在copyTmpConfFiles2Conf过程中。而索引数据的替换则需要根据isFullCopyNeeded这个参数,如果该值为true,则临时的索引文件将全部替换旧的索引文件,否则只是部分的文件的替换,他们的实现分别为:modifyIndexProps和moveIndexFiles。
1             if (isFullCopyNeeded) {
2               successfulInstall = modifyIndexProps(tmpIdxDirName);
3               deleteTmpIdxDir = false;
4             } else {
5               successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
6             }

接下来要重点介绍下modifyIndexProps和moveIndexFiles的实现。前文讲到,同步索引文件时候,下载下来的数据会存放在data目录下,以index. 加上同步开始时间的时间戳结构的目录下。当下载数据完成后,Replication会在同级目录下新建index.property文件。该文件内只会放入一句内容,index= index.2014XXXXXXXXXX,这样做的目的就是将索引目录index重定向到index.2014XXXXXXXXXX上,这个时候相当于index.2014XXXXXXXXXX成为了index目录。然后就可以删除原来的index目录了。

而moveIndexFiles则比较简单,即将临时文件下的索引文件都拷贝到正在用的index目录上,其中segment_N文件最后复制。

  • 将Replication的统计信息存于Replication.properties文件当中。统计信息较多,这里就不介绍了。
  • 如果配置文件发生变化,需要进行reloadcore操作才能使得配置生效。
  • 最后进行一次openNewSearcherAndUpdateCommitPoint,重新打开searcher以及更新commit信息。

Replication的一次同步过程就这么结束了,但是有个问题需要搞清楚,那就是在进行Replication的时候即shard的状态recoverying时候,分片是可以建索引的但是不能进行查询。在同步的时候,新进来的数据会进入到ulog中,但是这些数据是否会进入索引文件中?源码上我还没有发现可以证明新进来的数据的只会进入ulog中,但是不会进入索引文件。

目前我认为,当shard变为recoverying时候,新进来的请求只会进入ulog中,而不会进入索引文件中,原因有3:

1. 因为一旦有新数据写入旧索引文件中,索引文件发送变化了,那么下载好后的数据(索引文件)就不好替换旧的索引文件。

2. 在同步过程中,如果isFullCopyNeeded是false,那么就会close indexwriter,既然关闭了indexwriter就无法写入新的数据。而如果isFullCopyNeeded是true的话,因为整个index目录替换,所以是否关闭indexeriter也没啥意义。

3. 在recovery过程中,当同步replication结束后,会进行replay过程,该过程就是将ulog中的请求重新进行一遍。

以上是我目前的猜测,待我搞明白了再来修改这部分内容,或者是否有网友能指导下。

二. Replay过程

在整个recovery过程中,当replication结束后,会调用replay的来将ulog的请求重新刷入索引文件中。replay过程的本质是调用ulog的LogReplayer线程。

  • LogReplayer是以transactionlog为单位的。
1         for(;;) {
2           TransactionLog translog = translogs.pollFirst();
3           if (translog == null) break;
4           doReplay(translog);
5         }
  • doReplay会重新获取索引链,读取transctionlog的update命令然后重新走一遍索引链三步骤,这些内容在<Solr4.8.0源码分析(14)之SolrCloud索引深入(1)>已经介绍过了,这里就不再介绍了。需要指出的是在进行doReplay时候会设置updatecmd为replay,而一旦cmd=UpdateCmd.Replay,因为无法获取到nodes所以不会分发给其他分片包括leader,所以doReplay只会对本分片有效,且不会记录ulog中。
1         tlogReader = translog.getReader(recoveryInfo.positionOfStart);
2
3         // NOTE: we don‘t currently handle a core reload during recovery.  This would cause the core
4         // to change underneath us.
5
6         UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null);
7         UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp);
      if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
        isLeader = false;     // we actually might be the leader, but we don‘t want leader-logic for these types of updates anyway.
        forwardToLeader = false;
        return nodes;
      }
  • LogReplayer主要用于applyBufferedUpdates(replication策略中)以及recoverFromLog(单机模式下的recovery,即从ulog进行recovery)。

总结:

本文主要介绍了SolrCloud中Replication的原理以及过程,同时简要讲述LogReplayer的doReplay线程对ulog的日志进行recovery。下文将要重点介绍主从模式下的Replication的配置以及使用。

时间: 2024-10-06 00:49:40

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)的相关文章

Solr4.8.0源码分析(24)之SolrCloud的Recovery策略(五)

Solr4.8.0源码分析(24)之SolrCloud的Recovery策略(五) 题记:关于SolrCloud的Recovery策略已经写了四篇了,这篇应该是系统介绍Recovery策略的最后一篇了.本文主要介绍Solr的主从同步复制.它与前文<Solr4.8.0源码分析(22)之SolrCloud的Recovery策略(三)>略有不同,前文讲到的是SolrCloud的leader与replica之间的同步,不需要通过配置solrconfig.xml来实现.而本文主要介绍单机模式下,利用so

Solr4.8.0源码分析(23)之SolrCloud的Recovery策略(四)

Solr4.8.0源码分析(23)之SolrCloud的Recovery策略(四) 题记:本来计划的SolrCloud的Recovery策略的文章是3篇的,但是没想到Recovery的内容蛮多的,前面三章分别介绍了Recovery的原理和总体流程,PeerSync策略,Replication策略.本章主要介绍我在实际生产环境中碰到的recovery的几个问题,以及前面漏下的几个点. 一. 日志中多次出现"Stopping recovery for zkNodeName= ..." 我在

Solr4.8.0源码分析(20)之SolrCloud的Recovery策略(一)

Solr4.8.0源码分析(20)之SolrCloud的Recovery策略(一) 题记: 我们在使用SolrCloud中会经常发现会有备份的shard出现状态Recoverying,这就表明SolrCloud的数据存在着不一致性,需要进行Recovery,这个时候的SolrCloud建索引是不会写入索引文件中的(每个shard接受到update后写入自己的ulog中).关于Recovery的内容包含三篇,本文是第一篇介绍Recovery的原因以及总体流程. 1. Recovery的起因 Rec

Solr4.8.0源码分析(21)之SolrCloud的Recovery策略(二)

Solr4.8.0源码分析(21)之SolrCloud的Recovery策略(二) 题记:  前文<Solr4.8.0源码分析(20)之SolrCloud的Recovery策略(一)>中提到Recovery有两种策略,一是PeerSync和Replication.本节将具体介绍下PeerSync策略. PeeySync是Solr的优先选择策略,每当需要进行recovery了,Solr总是会先去判断是否需要进入PeerSync,只有当PeerSync被设置为跳过或者PeerSync时候发现没符合

Solr4.8.0源码分析(17)之SolrCloud索引深入(4)

Solr4.8.0源码分析(17)之SolrCloud索引深入(4) 前面几节以add为例已经介绍了solrcloud索引链建索引的三步过程,delete以及deletebyquery跟add过程大同小异,这里暂时就不介绍了.由于commit流程较为特殊,那么本节主要简要介绍下commit的流程. 1. SolrCloud的commit流程 SolrCloud的commit流程同样分为三步,本节主要简单介绍下三步过程. 1.1 LogUpdateProcessor LogUpdateProces

Solr4.8.0源码分析(14)之SolrCloud索引深入(1)

Solr4.8.0源码分析(14) 之 SolrCloud索引深入(1) 上一章节<Solr In Action 笔记(4) 之 SolrCloud分布式索引基础>简要学习了SolrCloud的索引过程,本节开始将通过阅读源码来深入学习下SolrCloud的索引过程. 1. SolrCloud的索引过程流程图 这里借用下<solrCloud Update Request Handling 更新索引流程>流程图: 由上图可以看出,SolrCloud的索引过程主要通过一个索引链过程来实

Solr4.8.0源码分析(16)之SolrCloud索引深入(3)

Solr4.8.0源码分析(16)之SolrCloud索引深入(3) 前面两节学习了SolrCloud索引过程以及索引链的前两步,LogUpdateProcessorFactory和DistributedUpdateProcessor.本节将详细介绍了索引链的第三步DirectUpdateHandler2和UpdateLog. 1. DirectUpdateHandler2.ADD DirectUpdateHandler2过程包含了Solr到Lucene的索引过程,在整个索引链中是最复杂也最重要

Solr4.8.0源码分析(25)之SolrCloud的Split流程

Solr4.8.0源码分析(25)之SolrCloud的Split流程(一) 题记:昨天有位网友问我SolrCloud的split的机制是如何的,这个还真不知道,所以今天抽空去看了Split的原理,大致也了解split的原理了,所以也就有了这篇文章.本系列有两篇文章,第一篇为core split,第二篇为collection split. 1. 简介 这里首先需要介绍一个比较容易混淆的概念,其实Solr的HTTP API 和 SolrCloud的HTTP API是不一样,如果接受到的是Solr的

Solr4.8.0源码分析(10)之Lucene的索引文件(3)

Solr4.8.0源码分析(10)之Lucene的索引文件(3) 1. .si文件 .si文件存储了段的元数据,主要涉及SegmentInfoFormat.java和Segmentinfo.java这两个文件.由于本文介绍的Solr4.8.0,所以对应的是SegmentInfoFormat的子类Lucene46SegmentInfoFormat. 首先来看下.si文件的格式 头部(header) 版本(SegVersion) doc个数(SegSize) 是否符合文档格式(IsCompoundF