企业搜索引擎开发之连接器connector(二十五)

下面开始具体分析连接器是怎么与连接器实例交互的,这里主要是分析连接器怎么从连接器实例获取数据的(前面文章有涉及基于http协议与连接器的xml格式的交互,连接器对连接器实例的设置都是通过配置文件操作的,具体文件操作尚未详细分析(com.google.enterprise.connector.persist.FileStore类))

本文以数据库连接器实例为例来分析,数据库类型连接器是通过调用mybatis(sqlmap框架)组件与数据库进行操作的,我们通过前端提交的数据库连接器实例表单信息最终存储在配置文件里面(默认采用文件方式,也可以采用数据库方式存储),连接器启动时通过加载该配置文件映射到数据库连接实例的上下文对象(类似反序列化的概念)

数据库连接实例的上下文对象类属性记录了配置信息及数据操作客户端对象类,同时在其初始化方法将上下文对象设置为数据操作客户端对象的属性

 private final MimeTypeDetector mimeTypeDetector = new MimeTypeDetector();

  private DBClient client;
  private String connectionUrl;
  private String connectorName;
  private String login;
  private String password;
  private String sqlQuery;
  private String authZQuery;
  private String googleConnectorWorkDir;
  private String primaryKeys;
  private String xslt;
  private String driverClassName;
  private String documentURLField;
  private String documentIdField;
  private String baseURL;
  private String lobField;
  private String fetchURLField;
  private String lastModifiedDate;
  private String extMetadataType;
  private int numberOfRows = 500;
  private Integer minValue = -1;
  private boolean publicFeed = true;
  private boolean parameterizedQueryFlag = false;
  private Boolean nullsSortLow = null;
  private Collator collator;

  public DBContext() {
  }

  public void init() throws DBException {
    client.setDBContext(this);

    // If the NULL value sort behaviour has not been explicitly overriden
    // in the configuration, fetch it from the DatabaseMetadata.
    if (nullsSortLow == null) {
      nullsSortLow = client.nullsAreSortedLow();
      if (nullsSortLow == null) {
        throw new DBException("nullsSortLowFlag must be set in configuration.");
      }
    }
  }

DBClient对象封装了数据读取方法,数据读取调用了mybatis组件的相关API,加载配置信息,生成数据操作sqlsession对象

private boolean hasCustomCollationQuery = false;
  protected DBContext dbContext;
  protected SqlSessionFactory sqlSessionFactory;
  protected DatabaseType databaseType;

  static {
    org.apache.ibatis.logging.LogFactory.useJdkLogging();
  }

  public DBClient() {
  }

  public void setDBContext(DBContext dbContext) throws DBException {
    this.dbContext = dbContext;
    generateSqlMap();
    this.sqlSessionFactory = getSqlSessionFactory(generateMyBatisConfig());
    LOG.info("DBClient for database " + getDatabaseInfo() + " is instantiated");
    this.databaseType = getDatabaseType();
  }

  /**
   * Constructor used for testing purpose. DBCLient initialized with sqlMap
   * having crawl query without CDATA section.
   */
  @VisibleForTesting
  DBClient(DBContext dbContext) throws DBException {
    this.dbContext = dbContext;
    this.sqlSessionFactory = getSqlSessionFactory(generateMyBatisConfig());
    this.databaseType = getDatabaseType();
  }

  private SqlSessionFactory getSqlSessionFactory(String config) {
    try {
      SqlSessionFactoryBuilder builder = new SqlSessionFactoryBuilder();
      return builder.build(new StringReader(config));
    } catch (RuntimeException e) {
      throw new RuntimeException("XML is not well formed", e);
    }
  }

  /**
   * @return a SqlSession
   */
  @VisibleForTesting
  SqlSession getSqlSession()
      throws SnapshotRepositoryRuntimeException {
    try {
      return sqlSessionFactory.openSession();
    } catch (RuntimeException e) {
      Throwable cause = (e.getCause() != null &&
          e.getCause() instanceof SQLException) ? e.getCause() : e;
      LOG.log(Level.WARNING, "Unable to connect to the database.", cause);
      throw new SnapshotRepositoryRuntimeException(
          "Unable to connect to the database.", cause);
    }
  }

具体的数据读取方法如下:

/**
   * @param skipRows number of rows to skip in the database.
   * @param maxRows max number of rows to return.
   * @return rows - subset of the result of executing the SQL query. E.g.,
   *         result table with columns id and lastName and two rows will be
   *         returned as
   *
   *         <pre>
   *         [{id=1, lastName=last_01}, {id=2, lastName=last_02}]
   * </pre>
   * @throws DBException
   */
  public List<Map<String, Object>> executePartialQuery(int skipRows, int maxRows)
      throws SnapshotRepositoryRuntimeException {
    // TODO(meghna): Think about a better way to scroll through the result set.
    List<Map<String, Object>> rows;
    LOG.info("Executing partial query with skipRows = " + skipRows + " and "
        + "maxRows = " + maxRows);
    SqlSession session = getSqlSession();
    try {
      rows = session.selectList("IbatisDBClient.getAll", null,
                                new RowBounds(skipRows, maxRows));
      LOG.info("Sucessfully executed partial query with skipRows = "
          + skipRows + " and maxRows = " + maxRows);
    } catch (RuntimeException e) {
      checkDBConnection(session, e);
      rows = new ArrayList<Map<String, Object>>();
    } finally {
      session.close();
    }
    LOG.info("Number of rows returned " + rows.size());
    return rows;
  }

RepositoryHandler类里面通过对DBContext dbContext和DBClient dbClient的引用来读取数据信息

里面还包装了内部类PartialQueryStrategy实现对数据偏移的控制

/**
   * 实际调用的默认是这个实现类
   * @author Administrator
   *
   */
  private class PartialQueryStrategy implements QueryStrategy {
    private int skipRows = 0;

    @Override
    public List<Map<String, Object>> executeQuery() {
      return dbClient.executePartialQuery(skipRows,
            dbContext.getNumberOfRows());
    }

    @Override
    public void resetCursor() {
      skipRows = 0;
    }

    @Override
    public void updateCursor(List<Map<String, Object>> rows) {
      skipRows += rows.size();
    }

    @Override
    public void logComplete() {
      LOG.info("Total " + skipRows
          + " records are crawled during this crawl cycle");
    }
  }

然后在executeQueryAndAddDocs()方法里面调用该内部类实例对象

/**
   * 重启后都是重新开始获取数据,不记录批次信息
   * Function for fetching database rows and providing a collection of
   * snapshots.
   */
  public List<DocumentSnapshot> executeQueryAndAddDocs()
      throws SnapshotRepositoryRuntimeException {
    List<Map<String, Object>> rows = null;

    try {
      rows = queryStrategy.executeQuery();
    } catch (SnapshotRepositoryRuntimeException e) {
      LOG.info("Repository Unreachable. Resetting DB cursor to "
          + "start traversal from begining after recovery.");
      queryStrategy.resetCursor();
      LOG.warning("Unable to connect to the database\n" + e.toString());
      throw new SnapshotRepositoryRuntimeException(
          "Unable to connect to the database.", e);
    }
    if (rows.size() == 0) {
      queryStrategy.logComplete();
      LOG.info("Crawl cycle of database is complete. Resetting DB cursor to "
          + "start traversal from begining");
      queryStrategy.resetCursor();
    } else {
      queryStrategy.updateCursor(rows);
    }

    if (traversalContext == null) {
      LOG.info("Setting Traversal Context");
      traversalContext = traversalContextManager.getTraversalContext();
      JsonDocument.setTraversalContext(traversalContext);
    }

    return getDocList(rows);
  }

getDocList(rows)方法实现将数据记录包装为List<DocumentSnapshot>对象

/**
   * 将数据包装为List<DocumentSnapshot>
   * @param rows
   * @return
   */
  private List<DocumentSnapshot> getDocList(List<Map<String, Object>> rows) {
    LOG.log(Level.FINE, "Building document snapshots for {0} rows.",
        rows.size());
    List<DocumentSnapshot> docList = Lists.newArrayList();
    for (Map<String, Object> row : rows) {
      try {
        DocumentSnapshot snapshot = docBuilder.getDocumentSnapshot(row);
        if (snapshot != null) {
          if (LOG.isLoggable(Level.FINER)) {
            LOG.finer("DBSnapshotRepository returns document with docID "
                + snapshot.getDocumentId());
          }
          docList.add(snapshot);
        }
      } catch (DBException e) {
        // See the similar log message in DBSnapshot.getDocumentHandle.
        LOG.log(Level.WARNING, "Cannot convert database record to snapshot "
            + "for record " + row, e);
      }
    }
    LOG.info(docList.size() + " document(s) to be fed to GSA");
    return docList;
  }

RepositoryHandlerIterator类进一步对repositoryHandler的封装,实现数据的迭代器

/**
 * Iterates over the collections of {@link DocumentSnapshot} objects
 * produced by a {@code RepositoryHandler}.
 */
public class RepositoryHandlerIterator
    extends AbstractIterator<DocumentSnapshot> {
  private final RepositoryHandler repositoryHandler;
  private Iterator<DocumentSnapshot> current;

  /**
   * @param repositoryHandler RepositoryHandler object for fetching DB rows in
   *        DocumentSnapshot form.
   */
  public RepositoryHandlerIterator(RepositoryHandler repositoryHandler) {
    this.repositoryHandler = repositoryHandler;
    this.current = Iterators.emptyIterator();
  }

  @Override
  protected DocumentSnapshot computeNext() {
    if (current.hasNext()) {
      return current.next();
    } else {
      current = repositoryHandler.executeQueryAndAddDocs().iterator();
      if (current.hasNext()) {
        return current.next();
      } else {
        return endOfData();
      }
    }
  }
}

最后将迭代器交给了DBSnapshotRepository仓库(继承自连接器的SnapshotRepository仓库类,实现了与连接器的接口对接(适配器模式))

/**
 * An iterable over the database rows. The main building block for
 * interacting with the diffing package.
 */
public class DBSnapshotRepository
    implements SnapshotRepository<DocumentSnapshot> {
  private final RepositoryHandler repositoryHandler;

  public DBSnapshotRepository(RepositoryHandler repositoryHandler) {
    this.repositoryHandler = repositoryHandler;
  }

  @Override
  public Iterator<DocumentSnapshot> iterator()
      throws SnapshotRepositoryRuntimeException {
    return new RepositoryHandlerIterator(repositoryHandler);
  }

  @Override
  public String getName() {
    return DBSnapshotRepository.class.getName();
  }
}

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: [email protected]#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789054.html

企业搜索引擎开发之连接器connector(二十五),布布扣,bubuko.com

时间: 2024-08-02 02:49:20

企业搜索引擎开发之连接器connector(二十五)的相关文章

企业搜索引擎开发之连接器connector(十八)(待编辑)

创建并启动连接器实例之后,连接器就会基于Http协议向指定的数据接收服务器发送xmlfeed格式数据,我们可以通过配置http代理服务器抓取当前基于http协议格式的数据(或者也可以通过其他网络抓包工具抓取) // 设置代理 /Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("IP地址", "端口")); synchronized (this) { uc = (HttpURLConnect

企业搜索引擎开发之连接器connector(十九)

连接器是基于http协议通过推模式(push)向数据接收服务端推送数据,即xmlfeed格式数据(xml格式),其发送数据接口命名为Pusher Pusher接口定义了与发送数据相关的方法 public interface Pusher { /** * Status indicating the readiness of the Pusher. */ public static enum PusherStatus { OK, LOW_MEMORY, LOCAL_FEED_BACKLOG, GSA

企业搜索引擎开发之连接器connector(二十八)

通常一个SnapshotRepository仓库对象对应一个DocumentSnapshotRepositoryMonitor监视器对象,同时也对应一个快照存储器对象,它们的关联是通过监视器管理对象DocumentSnapshotRepositoryMonitorManagerImpl实现的 DocumentSnapshotRepositoryMonitorManagerImpl类要实现那些行为,先查看其实现接口DocumentSnapshotRepositoryMonitorManager定义

企业搜索引擎开发之连接器connector(二十二)

下面来分析线程执行类,线程池ThreadPool类 对该类的理解需要对java的线程池比较熟悉 该类引用了一个内部类 /** * The lazily constructed LazyThreadPool instance. */ private LazyThreadPool lazyThreadPool; 该成员实现了单例模式,即该对象只有一个实例,属于懒汉式单例模式,当实例化该成员时,启用了线程同步机制 /** * Shut down the {@link ThreadPool}. Afte

企业搜索引擎开发之连接器connector(二十六)

连接器通过监视器对象DocumentSnapshotRepositoryMonitor从上文提到的仓库对象SnapshotRepository(数据库仓库为DBSnapshotRepository)中迭代获取数据 监视器类DocumentSnapshotRepositoryMonitor在其构造方法初始化相关成员变量,这些成员属性都是与数据获取及数据处理逻辑相关的对象 /** This connector instance's current traversal schedule. */ pri

企业搜索引擎开发之连接器connector(二十九)

在哪里调用监控器管理对象snapshotRepositoryMonitorManager的start方法及stop方法,然后又在哪里调用CheckpointAndChangeQueue对象的resume方法获取List<CheckpointAndChange> guaranteedChanges集合 下面跟踪到DiffingConnectorTraversalManager类的相关方法,在该类实现的方法中,调用了监控器管理对象snapshotRepositoryMonitorManager的相

企业搜索引擎开发之连接器connector(二十)

连接器里面衔接数据源与数据推送对象的是QueryTraverser类对象,该类实现了Traverser接口 /** * Interface presented by a Traverser. Used by the Scheduler. */ public interface Traverser { /** * Interval to wait after a transient error before retrying a traversal. */ public static final

企业搜索引擎开发之连接器connector(二十四)

本人在上文中提到,连接器实现了两种事件依赖的机制 ,其一是我们手动操作连接器实例时:其二是由连接器的自动更新机制 上文中分析了连接器的自动更新机制,即定时器执行定时任务 那么,如果我们手动操作连接器实例时,是怎么发出事件更新连接器实例的呢 通过eclipse开发工具,追踪调用ChangeDetector接口的detect()方法的方法 ChangeDetectorTask类的run方法里面调用我们再上文中已经分析了,其他方法便是ConnectorCoordinatorImpl实例对象的方法 即C

企业搜索引擎开发之连接器connector(二十七)

ChangeQueue类实现ChangeSource接口,声明了拉取下一条Change对象的方法 * A source of {@link Change} objects. * * @since 2.8 */ public interface ChangeSource { /** * @return the next change, or {@code null} if there is no change available */ public Change getNextChange();