Akka MessageDispatcher是维持Akka Actor “运作”的部分, 可以说它是整个机器的引擎。
在没有为 Actor作配置的情况下,一个 ActorSystem 将有一个缺省的派发器。 缺省派发器是可配置的,缺省情况下是一个使用“fork-join-executor”的 Dispatcher , 在大多数情况下拥有非常好的性能。
1.为 Actor 指定派发器
在application.conf文件中配置
my-dispatcher { # Dispatcher 是基于事件的派发器的名称 type = Dispatcher # 使用何种ExecutionService executor = "fork-join-executor" # 配置 fork join 池 fork-join-executor { # 容纳基于倍数的并行数量的线程数下限 parallelism- min = 2 #并行数(线程) ... ceil (可用CPU数*倍数) parallelism-factor = 2.0 #容纳基于倍数的并行数量的线程数上限 parallelism-max = 10 } # Throughput 定义了线程切换到另一个actor之前处理的消息数上限 # 设置成1表示尽可能公平. throughput = 100 }
测试类:
import com.center.akka.simple.actor.SimpleActor; import com.center.akka.simple.command.Command; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class DispatchersTest { public static void main(String[] args) throws InterruptedException { ActorSystem system = ActorSystem.create( "MySystem"); //为 Actor 指定派发器 ActorRef myActor = system.actorOf(Props.create(SimpleActor. class ).withDispatcher("my-dispatcher" ), "myactor" ); myActor.tell( new Command("CMD 1" ), ActorRef.noSender()); Thread. sleep(2000); system.shutdown(); } }
2.创建自定义带优先级的信箱
编写自定义信箱类
import com.typesafe.config.Config; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.dispatch.PriorityGenerator; import akka.dispatch.UnboundedPriorityMailbox; public class MyPrioMailbox extends UnboundedPriorityMailbox { public MyPrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation // Create a new PriorityGenerator, lower prio means more important super (new PriorityGenerator() { @Override public int gen(Object message) { if (message.equals("highpriority" )) return 0; // ' highpriority messages should be treated first if possible else if (message.equals( "lowpriority")) return 2; // ' lowpriority messages should be treated last if possible else if (message.equals(PoisonPill.getInstance())) return 3; // PoisonPill when no other left else return 1; // By default they go between high and low prio } }); } }
在配置文件中配置:
prio -dispatcher { mailbox-type = "com.center.akka.dispatchers.MyPrioMailbox" }
测试类:
import com.center.akka.simple.actor.SimpleActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; public class PrioMailboxTest { public static void main(String[] args) throws InterruptedException { ActorSystem system = ActorSystem.create( "MySystem"); ActorRef myActor = system.actorOf(Props.create(SimpleActor. class ).withDispatcher("prio-dispatcher" )); myActor.tell( "lowpriority" , null ); myActor.tell( "lowpriority" , null ); myActor.tell( "highpriority" , null ); myActor.tell( "pigdog" , null ); myActor.tell( "pigdog2" , null ); myActor.tell( "pigdog3" , null ); myActor.tell( "highpriority" , null ); myActor.tell(PoisonPill. getInstance(), null); Thread. sleep(2000); system.shutdown(); } }
输出结果:
[INFO] [05/20/2015 17:07:37.459] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] SimpleActor constructor [INFO] [05/20/2015 17:07:37.459] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: highpriority [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: highpriority [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: pigdog [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: pigdog2 [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: pigdog3 [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: lowpriority [INFO] [05/20/2015 17:07:37.460] [MySystem-prio-dispatcher-5] [akka://MySystem/user/$a] Received Command: lowpriority
时间: 2024-10-12 23:47:28