企业搜索引擎开发之连接器connector(二十八)

通常一个SnapshotRepository仓库对象对应一个DocumentSnapshotRepositoryMonitor监视器对象,同时也对应一个快照存储器对象,它们的关联是通过监视器管理对象DocumentSnapshotRepositoryMonitorManagerImpl实现的

DocumentSnapshotRepositoryMonitorManagerImpl类要实现那些行为,先查看其实现接口DocumentSnapshotRepositoryMonitorManager定义的方法规范

/**
 * Management interface to {@link DocumentSnapshotRepositoryMonitor} threads.
 *
 * @since 2.8
 */
public interface DocumentSnapshotRepositoryMonitorManager {
  /**
   * Ensures all monitor threads are running.
   *
   * @param checkpoint for the last completed document or null if none have
   *        been completed.
   * @throws RepositoryException
   */
  void start(String checkpoint) throws RepositoryException;

  /**
   * Stops all the configured {@link DocumentSnapshotRepositoryMonitor} threads.
   */
  void stop();

  /**
   * Removes persisted state for {@link DocumentSnapshotRepositoryMonitor}
   * threads. After calling this {@link DocumentSnapshotRepositoryMonitor}
   * threads will no longer be able to resume from where they left off last
   * time.
   */
  void clean();

  /**
   * Returns the number of {@link DocumentSnapshotRepositoryMonitor} threads
   * that are alive. This method is for testing purposes.
   */
  int getThreadCount();

  /**
   * Returns the {@link CheckpointAndChangeQueue} for this
   * {@link DocumentSnapshotRepositoryMonitorManager}
   */
  CheckpointAndChangeQueue getCheckpointAndChangeQueue();

  /** Returns whether we are after a start() call and before a stop(). */
  boolean isRunning();

  /**
   * Receives information specifying what is guaranteed to be delivered to GSA.
   * Every entry in passed in Map is a monitor name and MonitorCheckpoint.
   * The monitor of that name can expect that all documents before and including
   * document related with MonitorCheckpoint will be delivered to GSA.
   * This information is for the convenience and efficiency of the Monitor so
   * that it knows how many changes it has to resend.  It‘s valid for a monitor
   * to ignore these updates if it feels like it for some good reason.
   * FileConnectorSystemMonitor instances use this information to trim their
   * file system snapshots.
   */
  void acceptGuarantees(Map<String, MonitorCheckpoint> guarantees);

  /**
   * Receives {@link TraversalSchedule} from TraversalManager which is
   * {@link TraversalScheduleAware}.
   */
  void setTraversalSchedule(TraversalSchedule traversalSchedule);
}

然后再来看DocumentSnapshotRepositoryMonitorManagerImpl类怎么实现上述接口中定义的行为

先来了解相关属性及如何初始化它们的

private volatile TraversalSchedule traversalSchedule;

//监控器线程
  private final List<Thread> threads =
      Collections.synchronizedList(new ArrayList<Thread>());
  //监控器映射容器
  private final Map<String, DocumentSnapshotRepositoryMonitor> fileSystemMonitorsByName =
      Collections.synchronizedMap(new HashMap<String, DocumentSnapshotRepositoryMonitor>());
  private boolean isRunning = false;  // Monitor threads start in off state.
  private final List<? extends SnapshotRepository<? extends DocumentSnapshot>>
      repositories;

  private final File snapshotDir;
  private final ChecksumGenerator checksumGenerator;
  //CheckpointAndChange对象容器(List)
  private final CheckpointAndChangeQueue checkpointAndChangeQueue;
  //Change对象容器(阻塞队列)
  private final ChangeQueue changeQueue;

  private final DocumentSnapshotFactory documentSnapshotFactory;

/**
   * Constructs {@link DocumentSnapshotRepositoryMonitorManagerImpl}
   * for the {@link DiffingConnector}.
   *
   * @param repositories a {@code List} of {@link SnapshotRepository
   *        SnapshotRepositorys}
   * @param documentSnapshotFactory a {@link DocumentSnapshotFactory}
   * @param snapshotDir directory to store {@link SnapshotRepository}
   * @param checksumGenerator a {@link ChecksumGenerator} used to
   *        detect changes in a document‘s content
   * @param changeQueue a {@link ChangeQueue}
   * @param checkpointAndChangeQueue a
   *        {@link CheckpointAndChangeQueue}
   */
  public DocumentSnapshotRepositoryMonitorManagerImpl(
      List<? extends SnapshotRepository<
          ? extends DocumentSnapshot>> repositories,
      DocumentSnapshotFactory documentSnapshotFactory,
      File snapshotDir, ChecksumGenerator checksumGenerator,
      ChangeQueue changeQueue,
      CheckpointAndChangeQueue checkpointAndChangeQueue) {
    this.repositories = repositories;
    this.documentSnapshotFactory = documentSnapshotFactory;
    this.snapshotDir = snapshotDir;
    this.checksumGenerator = checksumGenerator;
    this.changeQueue = changeQueue;
    this.checkpointAndChangeQueue = checkpointAndChangeQueue;
  }

下面我们再来看它的start方法,在该方法中,主要动作为分别为调用checkpointAndChangeQueue对象的start方法,初始化各个仓库对象相关联的快照存储对象SnapshotStore,最后是启动各个仓库对象的监控器实例

/**
   * 启动方法
   */
  /** Go from "cold" to "warm" including CheckpointAndChangeQueue. */
  public void start(String connectorManagerCheckpoint)
      throws RepositoryException {

    try {
        //启动 获取Change(主要动作:从json格式队列文件加载monitorPoints和checkpointAndChangeList队列)
      checkpointAndChangeQueue.start(connectorManagerCheckpoint);
    } catch (IOException e) {
      throw new RepositoryException("Failed starting CheckpointAndChangeQueue.",
          e);
    }
    //MonitorCheckpoint容器
    Map<String, MonitorCheckpoint> monitorPoints
        = checkpointAndChangeQueue.getMonitorRestartPoints();

    Map<String, SnapshotStore> snapshotStores = null;

    //加载monitorName与SnapshotStore映射容器
    try {
      snapshotStores =
          recoverSnapshotStores(connectorManagerCheckpoint, monitorPoints);     

    } catch (SnapshotStoreException e) {
      throw new RepositoryException("Snapshot recovery failed.", e);
    } catch (IOException e) {
      throw new RepositoryException("Snapshot recovery failed.", e);
    } catch (InterruptedException e) {
      throw new RepositoryException("Snapshot recovery interrupted.", e);
    }   

    //启动监控线程
    startMonitorThreads(snapshotStores, monitorPoints);    

    isRunning = true;
  }

在初始化每个仓库对象的快照存储对象SnapshotStore时,同时传入相关联的MonitorCheckPoint对象实例,必要时修复快照文件

 /* For each start path gets its monitor recovery files in state were monitor
   * can be started. */
  /**
   * 加载monitorName与SnapshotStore映射容器
   * @param connectorManagerCheckpoint
   * @param monitorPoints
   * @return
   * @throws IOException
   * @throws SnapshotStoreException
   * @throws InterruptedException
   */
  private Map<String, SnapshotStore> recoverSnapshotStores(
      String connectorManagerCheckpoint, Map<String,
      MonitorCheckpoint> monitorPoints)
      throws IOException, SnapshotStoreException, InterruptedException {
    Map<String, SnapshotStore> snapshotStores =
        new HashMap<String, SnapshotStore>();
    for (SnapshotRepository<? extends DocumentSnapshot> repository
        : repositories) {
      String monitorName = makeMonitorNameFromStartPath(repository.getName());
      File dir = new File(snapshotDir,  monitorName);

      boolean startEmpty = (connectorManagerCheckpoint == null)
          || (!monitorPoints.containsKey(monitorName));
      if (startEmpty) {
        LOG.info("Deleting " + repository.getName()
            + " global checkpoint=" + connectorManagerCheckpoint
            + " monitor checkpoint=" + monitorPoints.get(monitorName));
        //删除该快照目录
        delete(dir);
      } else {
          //修复该快照目录
        SnapshotStore.stitch(dir, monitorPoints.get(monitorName),
            documentSnapshotFactory);
      }

      SnapshotStore snapshotStore = new SnapshotStore(dir,
          documentSnapshotFactory);

      snapshotStores.put(monitorName, snapshotStore);
    }
    return snapshotStores;
  }

下面继续跟踪启动监控器线程的方法

 /**
   * 启动监控线程(貌似MonitorCheckpoint与SnapshotStore与monitor有映射关系)
   * Creates a {@link DocumentSnapshotRepositoryMonitor} thread for each
   * startPath.
   *
   * @throws RepositoryDocumentException if any of the threads cannot be
   *         started.
   */
  private void startMonitorThreads(Map<String, SnapshotStore> snapshotStores,
      Map<String, MonitorCheckpoint> monitorPoints)
      throws RepositoryDocumentException {

    for (SnapshotRepository<? extends DocumentSnapshot> repository
            : repositories) {
      String monitorName = makeMonitorNameFromStartPath(repository.getName());
      //monitorName snapshotStores映射
      //快照存储器(读写器)
      SnapshotStore snapshotStore = snapshotStores.get(monitorName);
      //创建监控线程
      Thread monitorThread = newMonitorThread(repository, snapshotStore,
          monitorPoints.get(monitorName));
      threads.add(monitorThread);

      LOG.info("starting monitor for <" + repository.getName() + ">");
      monitorThread.setName(repository.getName());
      monitorThread.setDaemon(true);
      monitorThread.start();
    }
  }

监控器对象的创建在下面的方法

/**
   * 创建监控线程
   * Creates a {@link DocumentSnapshotRepositoryMonitor} thread for the provided
   * folder.
   *
   * @throws RepositoryDocumentException if {@code startPath} is not readable,
   *         or if there is any problem reading or writing snapshots.
   */
  private Thread newMonitorThread(
      SnapshotRepository<? extends DocumentSnapshot> repository,
      SnapshotStore snapshotStore, MonitorCheckpoint startCp)
      throws RepositoryDocumentException {
      //注意monitorName
    String monitorName = makeMonitorNameFromStartPath(repository.getName());
    //document在监控线程里面处理
    DocumentSnapshotRepositoryMonitor monitor =
        new DocumentSnapshotRepositoryMonitor(monitorName, repository,
            snapshotStore, changeQueue.newCallback(), DOCUMENT_SINK, startCp,
            documentSnapshotFactory);
    monitor.setTraversalSchedule(traversalSchedule);
    LOG.fine("Adding a new monitor for " + monitorName + ": " + monitor);
    fileSystemMonitorsByName.put(monitorName, monitor);
    return new Thread(monitor);
  }

stop方法实现监控器线程的停止

/**
   * 停止监控器
   */
  private void flagAllMonitorsToStop() {
    for (SnapshotRepository<? extends DocumentSnapshot> repository
        : repositories) {
      String monitorName = makeMonitorNameFromStartPath(repository.getName());
      DocumentSnapshotRepositoryMonitor
          monitor = fileSystemMonitorsByName.get(monitorName);
      if (null != monitor) {
        monitor.shutdown();
      }
      else {
        LOG.fine("Unable to stop non existent monitor thread for "
            + monitorName);
      }
    }
  }
  /**
   * 停止监控器线程
   */
  /* @Override */
  public synchronized void stop() {
    for (Thread thread : threads) {
      thread.interrupt();
    }
    for (Thread thread : threads) {
      try {
        thread.join(MAX_SHUTDOWN_MS);
        if (thread.isAlive()) {
          LOG.warning("failed to stop background thread: " + thread.getName());
        }
      } catch (InterruptedException e) {
        // Mark this thread as interrupted so it can be dealt with later.
        Thread.currentThread().interrupt();
      }
    }
    threads.clear();

    /* in case thread.interrupt doesn‘t stop monitors */
    flagAllMonitorsToStop();

    fileSystemMonitorsByName.clear();
    changeQueue.clear();
    this.isRunning = false;
  }

在flagAllMonitorsToStop()方法中调用监控器对象的monitor.shutdown()方法,设置监控器对象 的标识属性

 /* The monitor should exit voluntarily if set to false */
  private volatile boolean isRunning = true;

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: [email protected]#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789613.html

企业搜索引擎开发之连接器connector(二十八),布布扣,bubuko.com

时间: 2024-12-20 11:08:47

企业搜索引擎开发之连接器connector(二十八)的相关文章

企业搜索引擎开发之连接器connector(十八)(待编辑)

创建并启动连接器实例之后,连接器就会基于Http协议向指定的数据接收服务器发送xmlfeed格式数据,我们可以通过配置http代理服务器抓取当前基于http协议格式的数据(或者也可以通过其他网络抓包工具抓取) // 设置代理 /Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("IP地址", "端口")); synchronized (this) { uc = (HttpURLConnect

企业搜索引擎开发之连接器connector(十九)

连接器是基于http协议通过推模式(push)向数据接收服务端推送数据,即xmlfeed格式数据(xml格式),其发送数据接口命名为Pusher Pusher接口定义了与发送数据相关的方法 public interface Pusher { /** * Status indicating the readiness of the Pusher. */ public static enum PusherStatus { OK, LOW_MEMORY, LOCAL_FEED_BACKLOG, GSA

企业搜索引擎开发之连接器connector(二十二)

下面来分析线程执行类,线程池ThreadPool类 对该类的理解需要对java的线程池比较熟悉 该类引用了一个内部类 /** * The lazily constructed LazyThreadPool instance. */ private LazyThreadPool lazyThreadPool; 该成员实现了单例模式,即该对象只有一个实例,属于懒汉式单例模式,当实例化该成员时,启用了线程同步机制 /** * Shut down the {@link ThreadPool}. Afte

企业搜索引擎开发之连接器connector(二十六)

连接器通过监视器对象DocumentSnapshotRepositoryMonitor从上文提到的仓库对象SnapshotRepository(数据库仓库为DBSnapshotRepository)中迭代获取数据 监视器类DocumentSnapshotRepositoryMonitor在其构造方法初始化相关成员变量,这些成员属性都是与数据获取及数据处理逻辑相关的对象 /** This connector instance's current traversal schedule. */ pri

企业搜索引擎开发之连接器connector(二十九)

在哪里调用监控器管理对象snapshotRepositoryMonitorManager的start方法及stop方法,然后又在哪里调用CheckpointAndChangeQueue对象的resume方法获取List<CheckpointAndChange> guaranteedChanges集合 下面跟踪到DiffingConnectorTraversalManager类的相关方法,在该类实现的方法中,调用了监控器管理对象snapshotRepositoryMonitorManager的相

企业搜索引擎开发之连接器connector(二十五)

下面开始具体分析连接器是怎么与连接器实例交互的,这里主要是分析连接器怎么从连接器实例获取数据的(前面文章有涉及基于http协议与连接器的xml格式的交互,连接器对连接器实例的设置都是通过配置文件操作的,具体文件操作尚未详细分析(com.google.enterprise.connector.persist.FileStore类)) 本文以数据库连接器实例为例来分析,数据库类型连接器是通过调用mybatis(sqlmap框架)组件与数据库进行操作的,我们通过前端提交的数据库连接器实例表单信息最终存

企业搜索引擎开发之连接器connector(二十)

连接器里面衔接数据源与数据推送对象的是QueryTraverser类对象,该类实现了Traverser接口 /** * Interface presented by a Traverser. Used by the Scheduler. */ public interface Traverser { /** * Interval to wait after a transient error before retrying a traversal. */ public static final

企业搜索引擎开发之连接器connector(二十四)

本人在上文中提到,连接器实现了两种事件依赖的机制 ,其一是我们手动操作连接器实例时:其二是由连接器的自动更新机制 上文中分析了连接器的自动更新机制,即定时器执行定时任务 那么,如果我们手动操作连接器实例时,是怎么发出事件更新连接器实例的呢 通过eclipse开发工具,追踪调用ChangeDetector接口的detect()方法的方法 ChangeDetectorTask类的run方法里面调用我们再上文中已经分析了,其他方法便是ConnectorCoordinatorImpl实例对象的方法 即C

企业搜索引擎开发之连接器connector(二十七)

ChangeQueue类实现ChangeSource接口,声明了拉取下一条Change对象的方法 * A source of {@link Change} objects. * * @since 2.8 */ public interface ChangeSource { /** * @return the next change, or {@code null} if there is no change available */ public Change getNextChange();