怎样在 Akka Persistence 中实现分页查询

  在 Akka Persistence 中,数据都缓存在服务内存(状态),后端存储的都是一些持久化的事件日志,没法使用类似 SQL 一样的 DSL 来进行分页查询。利用 Akka Streams 和 Actor 我们可以通过编码的方式来实现分页查询的效果,而且这个分页查询还是分步式并行的……
  
  EventSourcedBehavior
  
  Akka Persistence的EventSourcedBehavior里实现了CQRS模型,通过commandHandler与eventHandler解耦了命令处理与事件处理。commandHandler处理传入的命令并返回一个事件,并可选择将这个事件持久化;若事件需要持久化,则事件将被传给eventHandler处理,eventHandler处理完事件后将返回一个“新的”状态(也可以不更新,直接返回原状态)。
  
  def apply[Command, Event, State](
  
  persistenceId: PersistenceId,
  
  emptyState: State,
  
  commandHandler: (State, Command) => Effect[Event, State],
  
  eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State]
  
  建模
  
  以我们习惯的数据库表建模来说,我们会有以下一张表:
  
  create table t_config
  
  (
  
  data_id varchar(64),
  
  namespace varchar(64) not null,
  
  config_type varchar(32) not null,
  
  content text not null,
  
  constraint t_config_pk primary key (namespace, data_id)
  
  );
  
  create index t_config_idx_data_id on t_config (data_id);
  
  ConfigManager actor 可以看作 t_config 表,它的 entityId 就是 namespace, State 里保存了所有记录的主键值(ConfigManagerState),这就相当于 t_config 表的 t_config_idx_data_id 索引。
  
  而 ConfigEntity actor 可看作 t_config 表里存储的记录,每个 actor 实例就是一行记录。它的 entityId 由 namespace + data_id 组成,这就相当于 t_config 表的 t_config_pk 复合主键。 这里我们定义两个 EventSourcedBehavior:
  
  ConfigManager:拥有所有配置ID列表,并作为 State 保存在 EventSourcedBehavior
  
  ConfigEntity: 拥有每个配置数据,并作为 State 保存在 EventSourcedBehavior
  
  实现
  
  这里先贴出 ConfigManager 和 ConfigEntity 的部分代码,接下来再详解怎样实现分页查询。
  
  ConfigManager
  
  object ConfigManager {
  
  sealed trait Command extends CborSerializable
  
  sealed trait Event extends CborSerializable
  
  sealed trait Response extends CborSerializable
  
  final case class Query(dataId: Option[String], configType: Option[String], page: Int, size: Int) extends Command
  
  final case class ReplyCommand(in: AnyRef, replyTo: ActorRef[Response]) extends Command
  
  private final case class InternalResponse(replyTo: ActorRef[Response], response: Response) extends Command
  
  case class ConfigResponse(status: Int, message: String = "", data: Option[AnyRef] = None) extends Response
  
  final case class ConfigManagerState(dataIds: Vector[String] = Vector()) extends CborSerializable
  
  val TypeKey: EntityTypeKey[Command] = EntityTypeKey("ConfigManager")
  
  }
  
  import ConfigManager._
  
  class ConfigManager private (namespace: String, context: ActorContext[Command]) {
  
  private implicit val system =www.shentuylzc.cn context.system
  
  private implicit val timeout: Timeout = 5.seconds
  
  import context.executionContext
  
  private val configEntity = ConfigEntity.init(context.system)
  
  def eventSourcedBehavior(www.tengyao3zc.cn): EventSourcedBehavior[Command, Event, ConfigManagerState] =
  
  EventSourcedBehavior(
  
  PersistenceId.of(TypeKey.name, namespace),
  
  ConfigManagerState(), {
  
  case (state, ReplyCommand(in, replyTo)) =>
  
  replyCommandHandler(state, replyTo, in)
  
  case (_, InternalResponse(replyTo,www.xinyueylzc.cn response)) =>
  
  Effect.reply(replyTo)(response)
  
  eventHandler)
  
  private def processPageQuery(
  
  state: ConfigManagerState,
  
  replyTo: ActorRef[Response],
  
  in: Query): Effect[Event, ConfigManagerState] = {
  
  val offset = if (in.page > 0) (in.page - 1) * in.size else 0
  
  val responseF = if (offset < state.dataIds.size) {
  
  Source(state.dataIds)
  
  .filter(dataId => in.dataId.forall(www.huizhonggjpt.cn=> v.contains(dataId)))
  
  .mapAsync(20) { dataId =>
  
  configEntity.ask[Option[ConfigState]](replyTo =>
  
  ShardingEnvelope(dataId, ConfigEntity.Query(in.configType, replyTo)))
  
  .collect { case Some(value) => value }
  
  .drop(offset)
  
  .take(in.size)
  
  .runWith(Sink.seq)
  
  .map(items => ConfigResponse(IntStatus.OK, data www.lafei6d.cn = Some(items)))
  
  } else {
  
  Future.successful(ConfigResponse(IntStatus.NOT_FOUND))
  
  context.pipeToSelf(responseF) {
  
  case Success(value) => InternalResponse(replyTo, value)
  
  case Failure(e) => InternalResponse(replyTo, ConfigResponse(IntStatus.INTERNAL_ERROR, e.getLocalizedMessage))
  
  ConfigEntity
  
  object ConfigEntity {
  
  case class ConfigState(namespace: String, dataId: String, configType: String, content: String)
  
  sealed trait Command extends CborSerializable
  
  sealed trait Event extends CborSerializable
  
  final case class Query(configType: Option[String], replyTo: ActorRef[Option[ConfigState]]) extends Command
  
  final case class ConfigEntityState(config: Option[ConfigState] = None) extends CborSerializable
  
  val TypeKey: EntityTypeKey[Command] = EntityTypeKey("ConfigEntity"
  
  import ConfigEntity._
  
  class ConfigEntity private (namespace: String, dataId: String, context: ActorContext[Command]) {
  
  def eventSourcedBehavior(): EventSourcedBehavior[Command, Event, ConfigEntityState] =
  
  EventSourcedBehavior(PersistenceId.of(TypeKey.name, dataId), ConfigEntityState(), commandHandler, eventHandler)
  
  def commandHandler(state: ConfigEntityState, command: Command): Effect[Event, ConfigEntityState] = command match {
  
  case Query(configType, replyTo) www.jiuyueguojizc.cn=>
  
  state.config match {
  
  case None =>
  
  Effect.reply(replyTo)(None)
  
  case Some(config) =>
  
  val resp = if (configType.forall(v => config.configType.contains(v))) Some(config) else None
  
  Effect.reply(replyTo)(resp)
  
  ConfigManager#processPageQuery 函数实现了大部分的分页查询逻辑(有部分逻辑需要由 ConfigEntity 处理)。
  
  val offset = if (in.page > 0) (in.page - 1) * in.size else 0
  
  val responseF = if (offset <www.baihuaylezc.cn state.dataIds.size) {
  
  // process paging
  
  } else {
  
  Future.successful(ConfigResponse(IntStatus.OK, data = Some(Nil)))
 
  这里首先获取实际的分页数据偏移量 offset ,再于 ConfigManager 状态里保存的 dataIds 的大小进行判断,若 offset < state.dataIds.size 则我们进行分页逻辑,否则直接返回一个空列表给前端。
  
  Source(state.dataIds)
  
  .filter(dataId => in.dataId.forall(v => v.contains(dataId)))
  
  .mapAsync(20) { dataId =>
  
  configEntity.ask[Option[ConfigState]](replyTo =>
  
  ShardingEnvelope(s"[email protected]$dataId", ConfigEntity.Query(in.configType, replyTo)))
  
  }
  
  .collect { case Some(value) www.jujinyule.com=> value }
  
  .drop(offset)
  
  .take(in.size)
  
  .runWith(Sink.seq)
  
  .map(items => ConfigResponse(IntStatus.OK, www.letianhuanchao.cn data = Some(items)))
  
  这个 Akka Streams 流即是分页处理的主要实现,若是SQL的话,它类似:
  
  select * from t_config where data_id like ‘%"in.dataId"%‘ offset "offset" limit "in.size"
  
  .mapAsync 在流执行流程中起了20个并发的异步操作,将委托每个匹配的 ConfigEntity (由s"[email protected]$dataId"生成entityId)执行 config_type 字段的查询。这样,完整的SQL语句类似:
  
  select * from t_config where data_id like ‘%"in.dataId"%‘ and change_type = "in.changeType" offset "offset" limit "in.size"
  
  ConfigEntity 对 change_type 部分的查询逻辑实现如下:
  
  case Query(configType, replyTo) =>
  
  state.config match {
  
  case None =>
  
  Effect.reply(replyTo)(None)
  
  case Some(config) www.huizhongdl.cn=>
  
  val resp = if (configType.forall(v => config.configType.contains(v))) Some(config) else None
  
  Effect.reply(replyTo)(resp)
  
  }
  
  若in.configType为空,既不需要判断 change_type 这个字段,直接返回 Some(config) 即可,而这时的SQL语句类似:
  
  select * from t_config where data_id like ‘%"in.dataId"%‘ and true offset "offset" limit "in.size"
  
  Tip这里有个小技巧,对于 Option[T] 字段的判断,直接使用了 .forall 方法,它等价于:
  
  option match {
  
  case Some(x) => p(x)
  
  case None => true

原文地址:https://www.cnblogs.com/laobeipai/p/12168845.html

时间: 2024-08-02 03:00:30

怎样在 Akka Persistence 中实现分页查询的相关文章

mongo中的分页查询

/** * @param $uid * @param $app_id * @param $start_time * @param $end_time * @param $start_page * @param $limit_page * mongodb中的分页查询 */ public static function getUserRevenueInfoList($uid, $source,$app_id, $start_time, $end_time,$skip, $limit){ $match

ssh框架中的分页查询

ssh中的分页查询是比较常用的,接下来我用代码来介绍如何实现一个分页查询 首先建立一个Model用来储存查询分页的信息 package com.haiziwang.qrlogin.utils; import java.util.List; public class prospage<T> { private int page; // 当前页数 private int totalCount; // 总记录数 private int totalPage; // 总页数 private int er

SQL Server中的分页查询 select top

SQL Server中的分页查询 https://blog.csdn.net/tswc_byy/article/details/82053091 零.码仙励志 比我差的人还没放弃,比我好的人仍在努力,我就更没资格说我无能为力 一.建库和建表 create database scort use scort create table emp ( empno int primary key, ename nvarchar(10), sal int, deptno int ) insert into e

freecms中的分页查询

freecms支持分页查询,很多Service中都有用到,可以作为参考. 如果想支持分页查询,Action需要从BaseAction中派生出来,BaseAction中有order, currPage, pageSize等属性,在查询时传给Service,如 **Service.find(**, order, currPage,  pageSize).在Service的查询中,需要使用 mapper 的 selectPageByExample, 在 mapper中加入 selectPageByEx

SQL Server中的分页查询

分页查询很简单,具体代码如下: --分页查询 --查询1-3行数据 select top 3 * from emp order by sal desc; --查询4-6行数据 select top 3 * from emp where empno not in (select top 3 empno from emp order by sal desc) order by sal desc; --查询7-9行数据 select top 3 * from emp where empno not i

Hibernate支持类中的分页查询的实现

Hibernate支持类的实现 package com.myHibernateDao; import java.sql.SQLException; import java.util.List; import javax.annotation.Resource; import org.hibernate.HibernateException; import org.hibernate.Query; import org.hibernate.Session; import org.hibernate

在javaweb中使用分页查询的详细步骤

首先在原有的数据库实体bean上在建立一个分页实体bean /** * 用于展示分页数据的Javabean对象 * @author Lenovo * */public class PageenationBean { private Integer currPage;//当前页数 private Integer totalPage;//总页数 private List<UserBean> dataList;//table的数据 public Integer getCurrPage() { ret

Java中oracle分页查询01

在数据库汇中的查询语句:oracle中使用rownum进行分页,从每页在总记录数的第n条到第m条:这里的是每页10条记录,下面sql所查询的是第页的记录数:oracle的分页是从1开始的,这里写成rownum>=0,rownum<=9,查询的记录是9条. SELECT * FROM (SELECT ROW_.*, ROWNUM ROWNUM_ FROM (SELECT * FROM TABLE1 WHERE TABLE1_ID = XX ORDER BY GMT_CREATE DESC) RO

用Servlet与JSP中实现分页查询

这个题目,总共分4个部分 1.查询总页数,把页码显示在页面上 2.把每页的内容显示在页面上 3.首页,尾页,上一页,下一页 4.下拉列表中的页数,(根据页数变动,选中第几页提交到第几页) 详解如下:(一部分代码在servlet中,一部分代码在JSP中) Servlet package com.lianxi; import java.io.IOException; import java.util.ArrayList; import javax.servlet.ServletException;