使用Akka持久化——持久化与快照

前言

对于java web而言,一个用户的HTTP请求最终会转换为一条java线程处理。HTTP本身是无状态的,具体的请求逻辑一般也是无状态的。如果进程奔溃或者系统宕机,用户会发觉当前网页不可用之类的错误。虽然会影响一些用户体验,但是只要服务重启了,用户依然可以完成他的请求并满足其需要。但是有些情况下则势必会造成混乱甚至恐慌,例如跨行转账。用户从自己A银行的账户转账1万元至自己在B银行的账户,如果转出的动作成功了,但是转入却失败了,用户的心情是可想而知的,自己的财产不翼而飞了!一种解决的方式是引入事务,在此场景下还必须是分布式事务。如果只是银行内部实现分布式事务多少还是可行的,但是不同银行之间要实现的成本是可想而知的,甚至不可行的。如果A银行转出时对用户的状态作持久化,B银行对收到的转入请求也进行持久化,那么恢复用户的损失才有可能。

以上啰里啰嗦说了这么多,无非就是抛出个引子,进而介绍Akka提供的持久化功能。

Akka的持久化架构

  • UntypedPersistentActor: 是一个持久化、有状态的Actor。它能够将将事件持久化到日志并且以线程安全的方式重新执行这些事件。它能够被用于实现命令和事件源的Actor。当一个集成了UntypedPersistentActor的Actor启动或者重启时,日志化的消息被重新发送给此Actor,以便于通过这些消息回复内部状态。
  • UntypedPersistentView: 视图是一个持久化、有状态的Actor,它接收由另一个持久化Actor写入日志的消息。视图本身不会日志化新的消息,它仅仅从一个持久化Actor的复制消息流来更新内部状态。
  • UntypedPersistentActorAtLeastOnceDelivery: 用于实现向目标至少发送一次消息的语义,也适用于发送者和接收者的JVM进程奔溃。
  • AsyncWriteJournal: 异步存储发送给持久化Actor的消息序列的日志。应用程序能够通过持久化Actor控制哪个消息被日志化,哪个消息不用被日志化。日志为每条消息维护一个不断增加的序列号。日志存储的底层实现是可插拔的。Akka的持久化扩展自带一个叫做"leveldb",向本地文件系统写入的日志插件。Akka社区里还有更多日志存储插件提供。
  • Snapshot store: 快照存储对持久化Actor或持久化视图的内部状态的快照进行持久化。快照用于优化回复的时间。快照存储的底层是可插拔的。Akka持久化扩展自带一个向本地文件系统写入的“本地”快照存储插件。Akka社区里还有更多快照存储插件提供。

本文基于Akka官网提供的持久化例子,并对其进行一些适应性改造,将着重介绍UntypedPersistentActor、AsyncWriteJournal及Snapshot store的应用。

配置

有关Akka的日志持久化和快照持久化的配置如下:

  persistence {
    journal {
      plugin = "akka.persistence.journal.leveldb"
      leveldb.dir = "target/example/journal"
      leveldb.native = false
    }
    snapshot-store {
      plugin = "akka.persistence.snapshot-store.local"
      local.dir = "target/example/snapshots"
    }
  }

根据配置,我们知道日志插件使用了leveldb,leveldb的存储目录为当前项目编译路径下的example/journal路径下。快照插件使用了local,存储路径与前者相同。

持久化Actor的例子

消息与状态

本例子中需要用到Cmd和Evt两种消息,Cmd代表命令,Evt代表事件。ExampleState代表我们例子中的状态。以上三个类的定义如下:

public interface Persistence {

	public static class Cmd implements Serializable {
		private static final long serialVersionUID = 1L;
		private final String data;

		public Cmd(String data) {
			this.data = data;
		}

		public String getData() {
			return data;
		}
	}

	public static class Evt implements Serializable {
		private static final long serialVersionUID = 1L;
		private final String data;

		public Evt(String data) {
			this.data = data;
		}

		public String getData() {
			return data;
		}
	}

	public static class ExampleState implements Serializable {
		private static final long serialVersionUID = 1L;
		private final ArrayList<String> events;

		public ExampleState() {
			this(new ArrayList<String>());
		}

		public ExampleState(ArrayList<String> events) {
			this.events = events;
		}

		public ExampleState copy() {
			return new ExampleState(new ArrayList<String>(events));
		}

		public void update(Evt evt) {
			events.add(evt.getData());
		}

		public int size() {
			return events.size();
		}

		@Override
		public String toString() {
			return events.toString();
		}
	}
}

上面代码展示的Cmd和Evt都很简单,它们有一样的data字段作为内容。ExampleState中维护了一个列表,次列表用于缓存所有的事件内容。

持久化Actor的实现

在具体介绍本例中持久化Actor之前,先看看其实现,其代码清单如下:

@Named("ExamplePersistentActor")
@Scope("prototype")
public class ExamplePersistentActor extends UntypedPersistentActor {
	LoggingAdapter log = Logging.getLogger(getContext().system(), this);
	@Override
	public String persistenceId() {
		return "sample-id-1";
	}

	private ExampleState state = new ExampleState();

	public int getNumEvents() {
		return state.size();
	}

	@Override
	public void onReceiveRecover(Object msg) {
		if (msg instanceof Evt) {
			state.update((Evt) msg);
		} else if (msg instanceof SnapshotOffer) {
			state = (ExampleState) ((SnapshotOffer) msg).snapshot();
		} else {
			unhandled(msg);
		}
	}

	@Override
	public void onReceiveCommand(Object msg) {
		if (msg instanceof Cmd) {
			final String data = ((Cmd) msg).getData();
			final Evt evt1 = new Evt(data + "-" + getNumEvents());
			final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
			persistAll(Arrays.asList(evt1, evt2), new Procedure<Evt>() {
				public void apply(Evt evt) throws Exception {
					state.update(evt);
					if (evt.equals(evt2)) {
						getContext().system().eventStream().publish(evt);
					}
				}
			});
		} else if (msg.equals("snap")) {
			// IMPORTANT: create a copy of snapshot
			// because ExampleState is mutable !!!
			saveSnapshot(state.copy());
		} else if (msg.equals("print")) {
			log.info(state.toString());
		} else {
			unhandled(msg);
		}
	}
}

ExamplePersistentActor继承了UntypedPersistentActor,并覆盖实现了三个方法:

  • persistenceId:持久化Actor必须有一个标识符,此标识符必须通过persistenceId方法定义;
  • onReceiveRecover:此方法将在恢复期间被调用,并交由用户处理那些持久化的消息或者快照;
  • onReceiveCommand:此方法用于处理正常的消息;

根据ExamplePersistentActor的实现,我们知道onReceiveCommand方法的作用如下:

  • 如果接收到的消息是Cmd时,则根据Cmd的data,构造两个新的Evt。例如:Cmd的data是test,那么两个Evt的内容分别是test-0,test-1。之后会调用UntypedPersistentActor提供的持久化方法persistAll,对两个生成的Evt持久化。最后,当持久化完成时会回调匿名内部类Procedure的apply方法将连个Evt按序更新到ExampleState中。
  • 如果接收到的消息是snap,那么复制ExampleState中的缓存并调用UntypedPersistentActor提供的方法saveSnapshot生成快照。
  • 如果接收到的消息是print,那么打印输出ExampleState中的缓存内容。

运行与验证

消息持久化验证

我们先使用一段代码向持久化Actor连续发送三个Cmd,内容分别是foo、baz、bar,最后再发送一条print消息,代码如下:

        final ActorRef persistentActor =
        		actorSystem.actorOf(springExt.props("ExamplePersistentActor"), "examplePersistentActor");

        persistentActor.tell(new Cmd("foo"), null);
        persistentActor.tell(new Cmd("baz"), null);
        persistentActor.tell(new Cmd("bar"), null);
        persistentActor.tell("print", null);

运行以上代码的结果如图1所示:

图1

这说明当前的程序逻辑准确无误。下面我们开始验证消息持久化的功效了。

我们将上述代码修改为代码清单1所示。

代码清单1

        final ActorRef persistentActor =
        		actorSystem.actorOf(springExt.props("ExamplePersistentActor"), "examplePersistentActor");

        persistentActor.tell("print", null);

也就是只打印状态,再次运行,输出与图1一模一样。这是因为默认情况下,持久化Actor在启动或者重启的时候会“重播”日志化的消息。这些“重播”的消息被onReceiveRecover处理后,重新更新到ExampleState的缓存中了。

最后,将上述代码修改为代码清单2所示。

代码清单2

        final ActorRef persistentActor =
        		actorSystem.actorOf(springExt.props("ExamplePersistentActor"), "examplePersistentActor");

        persistentActor.tell(new Cmd("buzz"), null);
        persistentActor.tell("print", null);

再次运行,输出如图2所示。


图2

可以看到持久化的消息依然被“重播”,并且新打印出了我们最新发送的内容为buzz的Cmd。这些消息依然被有序的放入ExampleState的缓存。这是由于在恢复期间,新发送给持久化Actor的不会干扰到“重播”的消息,新的消息将被缓存直到恢复阶段完成之后再由持久化Actor接收。

持久化时间考量与快照

上面的例子我们只发送了4个Cmd消息,并对其恢复。一般而言这不会有什么问题,但是当系统中的消息频率很高时,那么通过一条一条消息“重播”的方式显然是低效的,假如应用本身能够每隔一段时间利用快照存储,会极大地缩短恢复过程所需要的时间。

我们对上面的例子先进行一些修改,加入快照的生成:

        final ActorRef persistentActor =
        		actorSystem.actorOf(springExt.props("ExamplePersistentActor"), "examplePersistentActor");

        persistentActor.tell(new Cmd("foo"), null);
        persistentActor.tell(new Cmd("baz"), null);
        persistentActor.tell(new Cmd("bar"), null);
        persistentActor.tell("snap", null);
        persistentActor.tell("print", null);

输出与图1一致。我们此时如果再次执行代码清单1,效果依然与图1一致。此时如果你通过debug方式,你会发现其中的不同:之前的恢复过程会“重播”6条消息,这次只会收到一条SnapshotOffer消息,并直接从快照恢复。

我们再次执行代码清单2,其输出也仍然与图2一致。

总结

通过本文的介绍,发现使用Akka的持久化功能,类似于使用java多线程中的wait/notify,lock/unlock,将功能从语法层面解决,对程序员而言能更多地关注于自身业务。

其它Akka应用的博文如下:

  1. Spring与Akka的集成》;
  2. 使用Akka的远程调用》;
  3. 使用Akka构建集群(一)》;
  4. 使用Akka构建集群(二)》;
  5. 使用Akka持久化——持久化与快照》;
  6. 使用Akka持久化——消息发送与接收》;

后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。

京东:http://item.jd.com/11846120.html

当当:http://product.dangdang.com/23838168.html

时间: 2024-08-28 03:45:07

使用Akka持久化——持久化与快照的相关文章

redis 持久化之 rdb 快照持久化

解释1: 虽然redis是单进程,但是它有一个单独的子进程进行rdb操作,为了保证的数据的一致性,当进行rdb操作失败的时候,主进程就停止写入 所以才有了stop-write-on-bgsave-error 这条命令 如何停止rdb? 答案: 把下面三个参数屏蔽掉就行 开始使用rdb 第一步: 为了方便 修改配置文件 让60秒内写入3000秒就进行快照保存 第二步: 杀掉redis 用最新的配置文件重启redis 第三步: 使用redis自带的测试工具进行3000多条的写入 第四步: 查看执行目

SparkRDD未持久化——持久化

RDD的持久化策略: cache.persist.checkpoint三种策略(持久化的单位是partition) 1.cache是persist的一个简化版,会将rdd中的数据持久化到内存中 cache = persists(StorageLevel.MEMORY_ONLY) 不进行序列化特点: cache的返回值 必须赋值给一个新的RDD变量, 在其他的job中直接使用这个RDD变量就可以 cache是一个懒执行(其他两个也是懒执行),必须有action类的算子触发(也就是说,实现缓存要先触

持久化机制(快照和aof)

数据持久化通俗讲就是把数据保存到磁盘上,保证不会因为断电等因素丢失数据.redis需要经常将内存中的数据同步到磁盘来保证持久化.redis支持两种持久化方式,一种是 Snapshotting(快照)也是默认方式,另一种是Append-only file(缩写aof)的方式snapshotting(快照)方式:这种方式是将内存中数据以快照的方式写入到二进制文件中,默认的文件名为dump.rdb(redis的bin目录下).可以通过配置设置自动做快照持久化的方式.我们可以配置redis在n秒内如果超

Redis持久化

Redis持久化功能简介: Redis为了内部数据的安全考虑,会把本身的数据以文件形式保存到硬盘中一份,在服务器重启之后会自动把硬盘的数据恢复到内存(redis)的里边. 数据保存到硬盘的过程就称为“持久化”效果. Redis持久化的两种方式:snap shotting  快照持久化  /  append only file   AOF持久化 snap shotting快照持久化 该持久化默认开启,一次性把redis中全部的数据保存一份存储在硬盘中,如果数据非常多(10-20G)就不适合频繁进行

redis 数据持久化

转:redis 数据持久化 1.快照(snapshots) 缺省情况情况下,Redis把数据快照存放在磁盘上的二进制文件中,文件名为dump.rdb.你可以配置Redis的持久化策略,例如数据集中每N秒钟有超过M次更新,就将数据写入磁盘:或者你可以手工调用命令SAVE或BGSAVE. 数据保存的目录: 工作原理 Redis forks. 子进程开始将数据写到临时RDB文件中. 当子进程完成写RDB文件,用新文件替换老文件. 这种方式可以使Redis使用copy-on-write技术. 2.APP

Redis基础学习(四)&mdash;Redis的持久化

一.概述      Redis的强大性能很大程度上都是因为数据时存在内存中的,然而当Redis重启时,所有存储在内存中的数据将会丢失,所以我们要将内存中的数据持久化. Redis支持两种数据持久化的方式: RDB方式和AOF方式. (1)RDB方式会根据配置的规则定时的将内存中的数据持久化到硬盘上. (2)AOF则是在每次执行写命令之后将命令记录下来.   1.RDB方式      RDB方式的持久化是通过快照的方式完成的.当符合某种规则时,会将内存中的数据全部生成一个副本存储在硬盘上,Redi

Redis实战(八)Redis的持久化

Redis相比Memcached的很大一个优势是支持数据的持久化, 通常持久化的场景一个是做数据库使用,另一个是Redis在做缓存服务器时,防止缓存失效. Redis的持久化主要有快照Snapshotting和AOF日志文件两种方式. 前者会根据配置的规则定时将内存中的数据持久化到硬盘上, 后者则是在每次执行写命令之后将命令记录下来. >>RDB方式 Redis是会以快照的形式将数据持久化到磁盘上. 默认会将快照文件存储在Redis当前进程的工作目录的dump.rdb文件中, 可以通过配置文件

细说Redis持久化机制

概述 Redis不仅能够作为缓存来使用,也能够作为内存数据库. Redis作为内存数据库使用时.必需要解决一个问题:数据的持久性.有些将Redis作为缓存使用的场景也需要将缓存的数据持久化到存储介质上,这样在Redis重新启动后仍然能对热点数据提供缓存服务.不会由于缓存数据的缺失而对整个系统造成冲击. 本文就Redis内置的持久化机制进行说明. Redis持久化方式 Redis内置的持久化方式有两种:快照方式和AOF方式. 快照方式是将某一时点的内存数据写到硬盘上.AOF方式是将写入的命令记录在

redis持久化详解

redis是一个支持持久化的内存型数据库, 由于是在内存中,即使有主从,数据冗余备份,也难保数据丢失, redis持久化就是解决这个问题. redis持久化,是通过把内存里的数据同步到磁盘上来保证持久化. redis有两种持久化方式 一种是快照,snapshotting,也是默认方式,还有一种是只追加文件,缩写aof(apppend-only-file). 快照(snapshotting):将某一时刻的所有数据都写入硬盘. 只追加文件(AOF):在执行写命令时,将被执行的写命令复制到硬盘里. s