企业搜索引擎开发之连接器connector(二十六)

连接器通过监视器对象DocumentSnapshotRepositoryMonitor从上文提到的仓库对象SnapshotRepository(数据库仓库为DBSnapshotRepository)中迭代获取数据

监视器类DocumentSnapshotRepositoryMonitor在其构造方法初始化相关成员变量,这些成员属性都是与数据获取及数据处理逻辑相关的对象

 /** This connector instance‘s current traversal schedule. */
  private volatile TraversalSchedule traversalSchedule;

  /** Directory that contains snapshots. */
  private final SnapshotStore snapshotStore;

  /** The root of the repository to monitor */
  private final SnapshotRepository<? extends DocumentSnapshot> query;

  /** Reader for the current snapshot. */
  private SnapshotReader snapshotReader;

  /** Callback to invoke when a change is detected. */
  private final Callback callback;

  /** Current record from the snapshot. */
  private DocumentSnapshot current;

  /** The snapshot we are currently writing */
  private OrderedSnapshotWriter snapshotWriter;

  private final String name;

  private final DocumentSnapshotFactory documentSnapshotFactory;

  private final DocumentSink documentSink;

  /* Contains a checkpoint confirmation from CM. */
  private MonitorCheckpoint guaranteeCheckpoint;

  /* The monitor should exit voluntarily if set to false */
  private volatile boolean isRunning = true;

  /**
   * Creates a DocumentSnapshotRepositoryMonitor that monitors the
   * Repository rooted at {@code root}.
   *
   * @param name the name of this monitor (a hash of the start path)
   * @param query query for files
   * @param snapshotStore where snapshots are stored
   * @param callback client callback
   * @param documentSink destination for filtered out file info
   * @param initialCp checkpoint when system initiated, could be {@code null}
   * @param documentSnapshotFactory for un-serializing
   *        {@link DocumentSnapshot} objects.
   */
  public DocumentSnapshotRepositoryMonitor(String name,
      SnapshotRepository<? extends DocumentSnapshot> query,
      SnapshotStore snapshotStore, Callback callback,
      DocumentSink documentSink, MonitorCheckpoint initialCp,
      DocumentSnapshotFactory documentSnapshotFactory) {
    this.name = name;
    this.query = query;
    this.snapshotStore = snapshotStore;
    this.callback = callback;
    this.documentSnapshotFactory = documentSnapshotFactory;
    this.documentSink = documentSink;
    guaranteeCheckpoint = initialCp;
  }

同时实现了Runnable接口,在override的run方法里面实现数据的处理逻辑

@Override
  public void run() {
    // Call NDC.push() via reflection, if possible.
    invoke(ndcPush, "Monitor " + name);
    try {
      while (true) {
        tryToRunForever();
        // TODO: Remove items from this monitor that are in queues.
        // Watch out for race conditions. The queues are potentially
        // giving docs to CM as bad things happen in monitor.
        // This TODO would be mitigated by a reconciliation with GSA.
        performExceptionRecovery();
      }
    } catch (InterruptedException ie) {
      LOG.info("Repository Monitor " + name + " received stop signal. " + this);
    } finally {
      // Call NDC.remove() via reflection, if possible.
      invoke(ndcRemove);
    }
  }

进一步调用tryToRunForever()方法

private void tryToRunForever() throws InterruptedException {
    try {
      while (true) {
        if (traversalSchedule == null || traversalSchedule.shouldRun()) {
          // Start traversal
          doOnePass();
        }
        else {
          LOG.finest("Currently out of traversal window. "
              + "Sleeping for 15 minutes.");
          // TODO(nashi): Calculate when it should wake up while
          // handling TraversalScheduleAware events properly.
          //没到点,休息
          callback.passPausing(15*60*1000);
        }
      }
    } catch (SnapshotWriterException e) {
      String msg = "Failed to write to snapshot file: " + snapshotWriter.getPath();
      LOG.log(Level.SEVERE, msg, e);
    } catch (SnapshotReaderException e) {
      String msg = "Failed to read snapshot file: " + snapshotReader.getPath();
      LOG.log(Level.SEVERE, msg, e);
    } catch (SnapshotStoreException e) {
      String msg = "Problem with snapshot store.";
      LOG.log(Level.SEVERE, msg, e);
    } catch (SnapshotRepositoryRuntimeException e) {
      String msg = "Failed reading repository.";
      LOG.log(Level.SEVERE, msg, e);
    }
  }

在doOnePass()方法实现从仓库对象SnapshotRepository中获取数据,并将数据快照持久化到快照文件,并实现相关的数据处理逻辑(判断是新增 删除或更新等,

这些数据最后通过回调Callback接口添加到ChangeQueue对象中的阻塞队列)

/**
   * 在doOnePass()方法中生成独立的快照读写器
   * Makes one pass through the repository, notifying {@code visitor} of any
   * changes.
   *
   * @throws InterruptedException
   */
  private void doOnePass() throws SnapshotStoreException,
      InterruptedException {
    callback.passBegin();
    try {
        //快照读取器
      // Open the most recent snapshot and read the first record.
      this.snapshotReader = snapshotStore.openMostRecentSnapshot();
      current = snapshotReader.read();
       //快照写入器
      // Create an snapshot writer for this pass.
      this.snapshotWriter =
          new OrderedSnapshotWriter(snapshotStore.openNewSnapshotWriter());
      //下面代码为从仓库里面获取数据
      for(DocumentSnapshot ss : query) {
          //检查是否停止
        if (false == isRunning) {
          LOG.log(Level.INFO, "Exiting the monitor thread " + name
              + " " + this);
          throw new InterruptedException();
        }

        if (Thread.currentThread().isInterrupted()) {
          throw new InterruptedException();
        }
        processDeletes(ss);
        safelyProcessDocumentSnapshot(ss);
      }
      // Take care of any trailing paths in the snapshot.
      processDeletes(null);

    } finally {
      try {
        snapshotStore.close(snapshotReader, snapshotWriter);
      } catch (IOException e) {
        LOG.log(Level.WARNING, "Failed closing snapshot reader and writer.", e);
        // Try to proceed anyway.  Weird they are not closing.
      }
    }
    if (current != null) {
      throw new IllegalStateException(
          "Should not finish pass until entire read snapshot is consumed.");
    }
    //完工了,休息
    callback.passComplete(getCheckpoint(-1));
    snapshotStore.deleteOldSnapshots();
    if (!callback.hasEnqueuedAtLeastOneChangeThisPass()) {
      // No monitor checkpoints from this pass went to queue because
      // there were no changes, so we can delete the snapshot we just wrote.
      new java.io.File(snapshotWriter.getPath()).delete();
      // TODO: Check return value; log trouble.
    }
    snapshotWriter = null;
    snapshotReader = null;
  }

processDeletes方法实现数据删除逻辑的处理

/**
   * Process snapshot entries as deletes until {@code current} catches up with
   * {@code documentSnapshot}. Or, if {@code documentSnapshot} is {@code null},
   * process all remaining snapshot entries as deletes.
   *
   * @param documentSnapshot where to stop
   * @throws SnapshotReaderException
   * @throws InterruptedException
   */
  private void processDeletes(DocumentSnapshot documentSnapshot)
      throws SnapshotReaderException, InterruptedException {
      //参数documentSnapshot大于当前current的,则删除当前的current;然后继续迭代快照里面下一个documentSnapshot
    while (current != null
        && (documentSnapshot == null
            || COMPARATOR.compare(documentSnapshot, current) > 0)) {
      callback.deletedDocument(
          new DeleteDocumentHandle(current.getDocumentId()), getCheckpoint());
      current = snapshotReader.read();
    }
  }

下面跟踪safelyProcessDocumentSnapshot方法

private void safelyProcessDocumentSnapshot(DocumentSnapshot snapshot)
      throws InterruptedException, SnapshotReaderException,
      SnapshotWriterException {
    try {
      processDocument(snapshot);
    } catch (RepositoryException re) {
      //TODO Log the exception or its message? in document sink perhaps.
        //处理异常的snapshot
      documentSink.add(snapshot.getDocumentId(), FilterReason.IO_EXCEPTION);
    }
  }

进一步调用processDocument方法,里面包括更新和新增数据的处理逻辑

/**
   * Processes a document found in the document repository.
   *
   * @param documentSnapshot
   * @throws RepositoryException
   * @throws InterruptedException
   * @throws SnapshotReaderException
   * @throws SnapshotWriterException
   */
  private void processDocument(DocumentSnapshot documentSnapshot)
      throws InterruptedException, RepositoryException, SnapshotReaderException,
          SnapshotWriterException {
    // At this point ‘current‘ >= ‘file‘, or possibly current == null if
    // we‘ve processed the previous snapshot entirely.
    if (current != null
        && COMPARATOR.compare(documentSnapshot, current) == 0) {
        //处理发生变化的documentSnapshot,并更新当前的documentSnapshot
      processPossibleChange(documentSnapshot);
    } else {
      // This file didn‘t exist during the previous scan.
        //不存在该documentSnapshot
      DocumentHandle documentHandle  = documentSnapshot.getUpdate(null);
      snapshotWriter.write(documentSnapshot);

      // Null if filtered due to mime-type.
      if (documentHandle != null) {
        callback.newDocument(documentHandle, getCheckpoint(-1));
      }
    }
  }

处理更新情况

/**
   * Processes a document found in the document repository that also appeared
   * in the previous scan. Determines whether the document has changed,
   * propagates changes to the client and writes the snapshot record.
   *
   * @param documentSnapshot
   * @throws RepositoryException
   * @throws InterruptedException
   * @throws SnapshotWriterException
   * @throws SnapshotReaderException
   */
  private void processPossibleChange(DocumentSnapshot documentSnapshot)
      throws RepositoryException, InterruptedException, SnapshotWriterException,
             SnapshotReaderException {
      //大概是对比hash值
    DocumentHandle documentHandle = documentSnapshot.getUpdate(current);
    //写入快照文件
    snapshotWriter.write(documentSnapshot);
    if (documentHandle == null) {
      // No change.
        //如果未发生改变,则不发送
    } else {
      // Normal change - send the gsa an update.
      callback.changedDocument(documentHandle, getCheckpoint());
    }
    current = snapshotReader.read();
  }

更新数据的快照和新增数据的快照首先持久化到最新的快照文件

数据提交通过回调callback成员的相关方法,最后将数据提交到ChangeQueue队列对象

Callback接口定义了数据处理的相关方法

/**
   * 回调接口
   * The client provides an implementation of this interface to receive
   * notification of changes to the repository.
   */
  public static interface Callback {
    public void passBegin() throws InterruptedException;

    public void newDocument(DocumentHandle documentHandle,
        MonitorCheckpoint mcp) throws InterruptedException;

    public void deletedDocument(DocumentHandle documentHandle,
        MonitorCheckpoint mcp) throws InterruptedException;

    public void changedDocument(DocumentHandle documentHandle,
        MonitorCheckpoint mcp) throws InterruptedException;

    public void passComplete(MonitorCheckpoint mcp) throws InterruptedException;

    public boolean hasEnqueuedAtLeastOneChangeThisPass();

    public void passPausing(int sleepms) throws InterruptedException;
  }

在ChangeQueue队列类内部定义了内部类Callback,实现了该接口,在其实现方法里面将提交的数据添加到ChangeQueue队列类的成员阻塞队列之中

/**
   * 回调接口实现:向阻塞队列pendingChanges加入Change元素
   * Adds {@link Change Changes} to this queue.
   */
  private class Callback implements DocumentSnapshotRepositoryMonitor.Callback {
    private int changeCount = 0;

    public void passBegin() {
      changeCount = 0;
      activityLogger.scanBeginAt(new Timestamp(System.currentTimeMillis()));
    }

    /* @Override */
    public void changedDocument(DocumentHandle dh, MonitorCheckpoint mcp)
        throws InterruptedException {
      ++changeCount;
      pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));
      activityLogger.gotChangedDocument(dh.getDocumentId());
    }

     /* @Override */
    public void deletedDocument(DocumentHandle dh, MonitorCheckpoint mcp)
        throws InterruptedException {
      ++changeCount;
      pendingChanges.put(new Change(Change.FactoryType.INTERNAL, dh, mcp));
      activityLogger.gotDeletedDocument(dh.getDocumentId());
    }

    /* @Override */
    public void newDocument(DocumentHandle dh, MonitorCheckpoint mcp)
        throws InterruptedException {
      ++changeCount;
      pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));
      activityLogger.gotNewDocument(dh.getDocumentId());
    }

    /* @Override */
    public void passComplete(MonitorCheckpoint mcp) throws InterruptedException {
      activityLogger.scanEndAt(new Timestamp(System.currentTimeMillis()));
      if (introduceDelayAfterEveryScan || changeCount == 0) {
        Thread.sleep(sleepInterval);
      }
    }

    public boolean hasEnqueuedAtLeastOneChangeThisPass() {
      return changeCount > 0;
    }

    /* @Override */
    public void passPausing(int sleepms) throws InterruptedException {
      Thread.sleep(sleepms);
    }
  }

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: [email protected]#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789505.html

企业搜索引擎开发之连接器connector(二十六),布布扣,bubuko.com

时间: 2024-10-12 20:55:59

企业搜索引擎开发之连接器connector(二十六)的相关文章

企业搜索引擎开发之连接器connector(十八)(待编辑)

创建并启动连接器实例之后,连接器就会基于Http协议向指定的数据接收服务器发送xmlfeed格式数据,我们可以通过配置http代理服务器抓取当前基于http协议格式的数据(或者也可以通过其他网络抓包工具抓取) // 设置代理 /Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("IP地址", "端口")); synchronized (this) { uc = (HttpURLConnect

企业搜索引擎开发之连接器connector(十九)

连接器是基于http协议通过推模式(push)向数据接收服务端推送数据,即xmlfeed格式数据(xml格式),其发送数据接口命名为Pusher Pusher接口定义了与发送数据相关的方法 public interface Pusher { /** * Status indicating the readiness of the Pusher. */ public static enum PusherStatus { OK, LOW_MEMORY, LOCAL_FEED_BACKLOG, GSA

企业搜索引擎开发之连接器connector(二十八)

通常一个SnapshotRepository仓库对象对应一个DocumentSnapshotRepositoryMonitor监视器对象,同时也对应一个快照存储器对象,它们的关联是通过监视器管理对象DocumentSnapshotRepositoryMonitorManagerImpl实现的 DocumentSnapshotRepositoryMonitorManagerImpl类要实现那些行为,先查看其实现接口DocumentSnapshotRepositoryMonitorManager定义

企业搜索引擎开发之连接器connector(二十二)

下面来分析线程执行类,线程池ThreadPool类 对该类的理解需要对java的线程池比较熟悉 该类引用了一个内部类 /** * The lazily constructed LazyThreadPool instance. */ private LazyThreadPool lazyThreadPool; 该成员实现了单例模式,即该对象只有一个实例,属于懒汉式单例模式,当实例化该成员时,启用了线程同步机制 /** * Shut down the {@link ThreadPool}. Afte

企业搜索引擎开发之连接器connector(二十九)

在哪里调用监控器管理对象snapshotRepositoryMonitorManager的start方法及stop方法,然后又在哪里调用CheckpointAndChangeQueue对象的resume方法获取List<CheckpointAndChange> guaranteedChanges集合 下面跟踪到DiffingConnectorTraversalManager类的相关方法,在该类实现的方法中,调用了监控器管理对象snapshotRepositoryMonitorManager的相

企业搜索引擎开发之连接器connector(二十五)

下面开始具体分析连接器是怎么与连接器实例交互的,这里主要是分析连接器怎么从连接器实例获取数据的(前面文章有涉及基于http协议与连接器的xml格式的交互,连接器对连接器实例的设置都是通过配置文件操作的,具体文件操作尚未详细分析(com.google.enterprise.connector.persist.FileStore类)) 本文以数据库连接器实例为例来分析,数据库类型连接器是通过调用mybatis(sqlmap框架)组件与数据库进行操作的,我们通过前端提交的数据库连接器实例表单信息最终存

企业搜索引擎开发之连接器connector(二十)

连接器里面衔接数据源与数据推送对象的是QueryTraverser类对象,该类实现了Traverser接口 /** * Interface presented by a Traverser. Used by the Scheduler. */ public interface Traverser { /** * Interval to wait after a transient error before retrying a traversal. */ public static final

企业搜索引擎开发之连接器connector(二十四)

本人在上文中提到,连接器实现了两种事件依赖的机制 ,其一是我们手动操作连接器实例时:其二是由连接器的自动更新机制 上文中分析了连接器的自动更新机制,即定时器执行定时任务 那么,如果我们手动操作连接器实例时,是怎么发出事件更新连接器实例的呢 通过eclipse开发工具,追踪调用ChangeDetector接口的detect()方法的方法 ChangeDetectorTask类的run方法里面调用我们再上文中已经分析了,其他方法便是ConnectorCoordinatorImpl实例对象的方法 即C

企业搜索引擎开发之连接器connector(二十七)

ChangeQueue类实现ChangeSource接口,声明了拉取下一条Change对象的方法 * A source of {@link Change} objects. * * @since 2.8 */ public interface ChangeSource { /** * @return the next change, or {@code null} if there is no change available */ public Change getNextChange();