Akka框架使用注意点

1.mailbox

Akka的每个actor默认有一个mailbox,按照FIFO顺序单线程处理。在抛出异常导致父actor根据设置的监管策略执行重启或恢复操作时,会从触发异常的消息的后续消息开始处理,邮箱并不会被清空。如果你想重新处理那个触发异常的消息,可以通过重写preRestart方法来访问该消息,java 中的preRestart参数为(Throwable reason, Option<Object> message),message.get()可以获得该消息(因为是从Option对象中get,所以可能为空),可以将该消息再次发给自己或做其它处理。

默认邮箱的大小没有限制,也就是内存的上限。可以设置bounded邮箱来限定大小,还可以设置邮箱以文件形式持久存储。

2.监管策略设置

  1)在actor类中重写supervisorStrategy()

  2)创建父actor时在Props参数中使用FromConfig.getInstance().withSupervisorStrategy(strategy).props(XXX)

可以使用下面的类来方便设置:

import akka.actor.AllForOneStrategy;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.japi.Function;
import scala.concurrent.duration.Duration;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static akka.actor.SupervisorStrategy.escalate;

/**
 * Created by fyk on 16-4-2.
 */
public class StrategySetter {
    private Map<Class<? extends Throwable>, SupervisorStrategy.Directive> map;
    private boolean oneForOne;
    private int maxNrOfRetries=5;
    private Duration withinTimeRange=Duration.create(1, TimeUnit.MINUTES);//Duration.create("1 minute")
    public StrategySetter(boolean oneForOne) {
        this.oneForOne=oneForOne;
        map=new HashMap<Class<? extends Throwable>, SupervisorStrategy.Directive>();
    }
    public void setOptParam(int maxNrOfRetries,Duration withinTimeRange){
        this.maxNrOfRetries=maxNrOfRetries;
        this.withinTimeRange=withinTimeRange;
    }
    public void put(Class<? extends Throwable> t, SupervisorStrategy.Directive action){
        map.put(t,action);
    }
    /**
     * 设定监管策略并返回
     * cls.isInstance(yourObject)
     * instead of using the instanceof operator, which can only be used if you know the class at compile time.
     */
    public SupervisorStrategy getSupervisorStrategy(){
        SupervisorStrategy strategy=null;
        if(oneForOne){
            strategy=new OneForOneStrategy(maxNrOfRetries, withinTimeRange,
                    new Function<Throwable, SupervisorStrategy.Directive>() {
                        @Override
                        public SupervisorStrategy.Directive apply(Throwable t) {
                            for(Class c:map.keySet()){
                                if(c.isInstance(t)) return map.get(c);
                            }
                            return escalate();//提交给上一级监管
                        }
                    });
        }else{
            strategy=new AllForOneStrategy(maxNrOfRetries, withinTimeRange,
                    new Function<Throwable, SupervisorStrategy.Directive>() {
                        @Override
                        public SupervisorStrategy.Directive apply(Throwable t) {
                            for(Class c:map.keySet()){
                                if(c.isInstance(t)) return map.get(c);
                            }
                            return escalate();//提交给上一级监管
                        }
                    });
        }

        return strategy;
    }
}

3.continue...

时间: 2024-12-25 18:10:00

Akka框架使用注意点的相关文章

分布式应用框架Akka快速入门

本文结合网上一些资料,对他们进行整理,摘选和翻译而成,对Akka进行简要的说明.引用资料在最后列出. 1.什么是Akka Akka 是一个用 Scala 编写的库,用于简化编写容错的.高可伸缩性的 Java 和 Scala 的 Actor 模型应用. 官方网站 (http://akka.io/)的介绍是: Akka is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant ev

akka概述

概述 并发分布式框架:作者号称可以在jvm用来构建高并发,分布式,消息驱动式的程序.依据当前我使用的范围还只局限于并发与任务调度的使用层面上, 在这两方面的表现还是很令我满意的. 当前自己所熟悉的互联网开发模式有 ------------------------------------------------------------ 借组于队列,异步处理问题,处理不需要及时作出响应的任务 使用缓存,减少对常用数据的查询 使用内存数据库 加快查询某些需要快速查询响应的速度 数据库,主从模式,读写分

Akka简单的性能测试

因为最近工作的关系,要把异步任务从应用服务器中拆分到专门的异步处理服务器中. 方案一 是采用MQ的方式将任务消息发出,在服务端进行处理,如下图所示: 这种方案是采用MQ作为中间的媒介,在服务端采用线程池异步处理任务,处理完成之后将结果发送到MQ中,客户端采用侦听的方式得到结果继续进行处理. 这种方案的不足是,可能在某些需求的情况下,需要将结果存放到共享的HashMap或者Threadlocal中进行存放结果,客户端会一直阻塞,直到得到结果,从多线程的角度来说,还是用了共享变量,虽然共享变量可能是

AKKA文档(java版)—角色

角色 角色模型对编写并发.分布式系统进行了高度抽象.它减轻了开发者必须对互斥锁与线程管理的负担,更容易编写出正确的并发与并行系统.早在1973 年 Carl Hewitt 发表的论文中定义了角色,但一直流行于Erlang 语言中,随后被爱立信公司应用于建立高并发.可靠通信系统,取得了巨大成功. Akka 框架里面角色的API 跟Scala 框架里面角色相似,后者一些语法曾经模仿Erlang语言. 创建角色 注意:由于Akka强迫父级监管者监督每一个角色和(潜在的子级)监管者,建议你熟悉角色系统.

Akka源码分析-Actor&amp;ActorContext&amp;ActorRef&amp;ActorCell

分析源码的过程中我们发现,Akka出现了Actor.ActorRef.ActorCell.ActorContext等几个相似的概念,它们之间究竟有什么区别和联系呢? /** * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': * <a href="http://en.wikipedia.org/wiki/Actor

Akka 简介

Akka简介 一.知识补充 耦合与解耦: 耦合 -->两个或两个以上体系或两种运动形式间通过相互作用而彼此影响以至联合起来的现象 ? 对于软件工程,对象之间的耦合度是对象之间的依赖度,耦合程度越高,维护成本越高 解耦 -->解除耦合关系 ? 模块间有依赖关系必然存在耦合,理论上绝对零耦合做不到.要使数据模型,业务逻辑,视图显示三层之间彼此降低耦合,把关系依赖降低 RPC(Remote Procedure Call) RPC 用于同一组件中各个不同模块之间的通信(面向过程) RESTful 用于

Spark Streaming源代码学习总结(一)

1.Spark Streaming 代码分析: 1.1 演示样例代码DEMO: 实时计算的WorldCount: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def mai

Spark Streaming源码学习总结(一)

1.Spark Streaming 代码分析: 1.1 示例代码DEMO: 实时计算的WorldCount: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def main(

ENode 2.0 - 介绍一下关于ENode中对Command的调度设计

CQRS架构,C端的职责是处理从上层发送过来的command.对于单台机器来说,我们如何尽快的处理command呢?本文想通过不断提问和回答的方式,把我的思考写出来. 首先,我们最容易想到的是使用多线程.那当我们要处理一个command时,能直接丢到线程池中,直接交给线程池去调度吗?不行.因为假如多个command修改同一个聚合根时,会导致db的并发冲突,从而会导致command的不断重试,大大降低了command的处理速度. 那该怎么解决呢?既然直接多线程处理会有并发冲突的问题,那就对comm