akka入门-消息派发器

Akka MessageDispatcher是维持Akka Actor “运作”的部分, 可以说它是整个机器的引擎。

在没有为 Actor作配置的情况下,一个 ActorSystem 将有一个缺省的派发器。 缺省派发器是可配置的,缺省情况下是一个使用“fork-join-executor”的 Dispatcher , 在大多数情况下拥有非常好的性能。

1.为 Actor 指定派发器

在application.conf文件中配置

my-dispatcher {
  # Dispatcher 是基于事件的派发器的名称
  type = Dispatcher
  # 使用何种ExecutionService
  executor = "fork-join-executor"
  # 配置 fork join 池
  fork-join-executor {
    # 容纳基于倍数的并行数量的线程数下限
    parallelism- min = 2
    #并行数(线程) ... ceil (可用CPU数*倍数)
    parallelism-factor = 2.0
    #容纳基于倍数的并行数量的线程数上限
    parallelism-max = 10
  }
  # Throughput 定义了线程切换到另一个actor之前处理的消息数上限
  # 设置成1表示尽可能公平.
  throughput = 100
}

测试类:

import com.center.akka.simple.actor.SimpleActor;
import com.center.akka.simple.command.Command;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class DispatchersTest {
      public static void main(String[] args) throws InterruptedException {

          ActorSystem system = ActorSystem.create( "MySystem");
           //为 Actor 指定派发器
          ActorRef myActor = system.actorOf(Props.create(SimpleActor. class ).withDispatcher("my-dispatcher" ), "myactor" );
          myActor.tell( new Command("CMD 1" ), ActorRef.noSender());
          Thread. sleep(2000);
          system.shutdown();

     }

}

2.创建自定义带优先级的信箱

编写自定义信箱类

import com.typesafe.config.Config;

import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedPriorityMailbox;

public class MyPrioMailbox extends UnboundedPriorityMailbox {
       public MyPrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation
         // Create a new PriorityGenerator, lower prio means more important
         super (new PriorityGenerator() {
           @Override
           public int gen(Object message) {
             if (message.equals("highpriority" ))
               return 0; // ' highpriority messages should be treated first if possible
             else if (message.equals( "lowpriority"))
               return 2; // ' lowpriority messages should be treated last if possible
             else if (message.equals(PoisonPill.getInstance()))
               return 3; // PoisonPill when no other left
             else
               return 1; // By default they go between high and low prio
           }
         });
       }
     }

在配置文件中配置:

prio -dispatcher {
  mailbox-type = "com.center.akka.dispatchers.MyPrioMailbox"
}

测试类:

import com.center.akka.simple.actor.SimpleActor;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;

public class PrioMailboxTest {
      public static void main(String[] args) throws InterruptedException {
          ActorSystem system = ActorSystem.create( "MySystem");
          ActorRef myActor = system.actorOf(Props.create(SimpleActor. class ).withDispatcher("prio-dispatcher" ));
          myActor.tell( "lowpriority" , null );
          myActor.tell( "lowpriority" , null );
          myActor.tell( "highpriority" , null );
          myActor.tell( "pigdog" , null );
          myActor.tell( "pigdog2" , null );
          myActor.tell( "pigdog3" , null );
          myActor.tell( "highpriority" , null );
          myActor.tell(PoisonPill. getInstance(), null);

          Thread. sleep(2000);

          system.shutdown();
     }

}

输出结果:

[INFO] [05/20/2015 17:07:37.459] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] SimpleActor constructor
[INFO] [05/20/2015 17:07:37.459] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: highpriority
[INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: highpriority
[INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: pigdog
[INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: pigdog2
[INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: pigdog3
[INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: lowpriority
[INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: lowpriority
时间: 2024-10-12 23:47:28

akka入门-消息派发器的相关文章

Android开发学习之路-Handler消息派发机制源码分析

注:这里只是说一下sendmessage的一个过程,post就类似的 如果我们需要发送消息,会调用sendMessage方法 public final boolean sendMessage(Message msg) { return sendMessageDelayed(msg, 0); } 这个方法会调用如下的这个方法 public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMilli

openfire开发(四)消息拦截器

大家好,我是LD,今天给大家介绍openfire的消息拦截器.通常,我们在开发插件的过程中会有一种需求,需要对客户端发送的消息来做一些我们自己的处理,比如保存数据等等.这里我们就会使用到拦截器, 在openfire中,自定义拦截器需要实现PacketInterceptor接口.下面我们写一个简单的拦截器来介绍一下. import org.jivesoftware.openfire.interceptor.PacketInterceptor; import org.jivesoftware.ope

6)Linux程序设计入门--消息管理

6)Linux程序设计入门--消息管理 前言:Linux下的进程通信(IPC) Linux下的进程通信(IPC) POSIX无名信号量 System V信号量 System V消息队列 System V共享内存 1.POSIX无名信号量 如果你学习过操作系统,那么肯定熟悉PV操作了.PV操作是原子 操作.也就是操作是不可以中断的,在一定的时间内,只能够有一个进程的代码在CPU上面 执行.在系统当中,有时候为了顺利的使用和保护共享资源,大家提出了信号的概念. 假设 我们要使用一台打印机,如果在同一

解决WCF调用时出现错误:“创建MTOM消息读取器时出错”

如题,查询一个数据集, 存储过程返回如:select * from B 中间层定义  public DataSet GetTable(string 查询条件); 客户端定义  DataSet ds = wcfClient.GetTable("") 以前一直正常着,查询也很快速,这两天不知修改到哪了,所有的查询如果返回记录较大时(100条左右),客户端就会出现服务端返回的异常错误"创建MTOM消息读取器时出错" 客户端 app.config 配置如下 <syst

C# 之 读取Word时发生 “拒绝访问” 及 “消息筛选器显示应用程序正在使用中” 异常的处理

1.Asp.net中建立Microsoft.Office.Interop.Word.Application时出现 “ 拒绝访问 ” 错误 项目中要实现在服务器端打开一个Word模版文件,修改其内容后再下载到客户端使用,在Asp.net页面中建立Microsoft.Office.Interop.Word.Application对象时出现“拒绝访问”的错误,提示信息如下: 拒绝访问. 说明: 执行当前 Web 请求期间,出现未处理的异常.请检查堆栈跟踪信息,以了解有关该错误以及代码中导致错误的出处的

解决“消息筛选器显示应用程序正在使用中。 ((错误来自 HRESULT:0x8001010A (RPC_E_SERVERCALL_RETRYLATER)) ”的报错问题

最近写了一个小程序,用的是C#调用了一个应用程序的COM接口.在自己的机上测试都没有问题,可是发给客户进行测试的时候,就出现了问题.报错提示为“消息筛选器显示应用程序正在使用中. ((错误来自 HRESULT:0x8001010A (RPC_E_SERVERCALL_RETRYLATER))”. 在网上查了一下,发现了有前辈的解决方法,具体步骤如下: STEP1.  在客户的机器上 运行“dcomcnfg”, 会出现“组件服务”的窗体. STEP2. 打开“组件服务->计算机->我的电脑-&g

Android中的观察者模式:消息分发器(MessageDispatcher)

这个功能是在公司项目需求的时候写出来,本来是基础命令字模式的,但是个人喜欢对象,所有后来在一个小项目中使用时,改成了基于对象模式. 首先,是一个接口,我们称之为监听器: [html] view plaincopyprint? /** * * @author poet * */ public interface MessageObserver<T> { void onMessage(T t); } 这里使用的是泛型,泛型<T>除了作为实际监听的对象类型,也作为监听器管理的key,届时

使用lua实现一个简单的事件派发器

设计一个简单的事件派发器,个人觉得最重要的一点就是如何保证事件派发过程中,添加或删除同类事件,不影响事件迭代顺序和结果,只要解决这一点,其它都好办. 为了使用pairs遍历函数,重写了pairs(lua 5.2以上版本不需要): stdext.lua local _ipairs = ipairs function ipairs(t) local mt = getmetatable(t) if mt and mt.__ipairs then return mt.__ipairs(t) end re

C#制作一个消息拦截器(intercept)1

首先,我们先要制作一个自定义Attribute,让他可以具有上下文读取功能,所以我们这个Attribute类要同时继承Attribute和IContextAttribute. 接口IContextAttribute中有两个方法需要实现 1.bool   IsContextOK(Context ctx, IConstructionCallMessage msg); 2.void  GetPropertiesForNewContext(IConstructionCallMessage msg); 简