akka远程调用有两种形式:
一种是查找远程Actors,一种是创建远程Actors。
公用的类:
import java.io.Serializable; public class Op { public interface MathOp extends Serializable { } public interface MathResult extends Serializable { } static class Add implements MathOp { private static final long serialVersionUID = 1L; private final int n1 ; private final int n2 ; public Add( int n1 , int n2) { this. n1 = n1; this. n2 = n2; } public int getN1() { return n1; } public int getN2() { return n2; } } static class AddResult implements MathResult { private static final long serialVersionUID = 1L; private final int n1 ; private final int n2 ; private final int result ; public AddResult( int n1 , int n2, int result) { this. n1 = n1; this. n2 = n2; this. result = result; } public int getN1() { return n1; } public int getN2() { return n2; } public int getResult() { return result; } } static class Subtract implements MathOp { private static final long serialVersionUID = 1L; private final int n1 ; private final int n2 ; public Subtract( int n1 , int n2) { this. n1 = n1; this. n2 = n2; } public int getN1() { return n1; } public int getN2() { return n2; } } static class SubtractResult implements MathResult { private static final long serialVersionUID = 1L; private final int n1 ; private final int n2 ; private final int result ; public SubtractResult( int n1 , int n2, int result) { this. n1 = n1; this. n2 = n2; this. result = result; } public int getN1() { return n1; } public int getN2() { return n2; } public int getResult() { return result; } } static class Multiply implements MathOp { private static final long serialVersionUID = 1L; private final int n1 ; private final int n2 ; public Multiply( int n1 , int n2) { this. n1 = n1; this. n2 = n2; } public int getN1() { return n1; } public int getN2() { return n2; } } static class MultiplicationResult implements MathResult { private static final long serialVersionUID = 1L; private final int n1 ; private final int n2 ; private final int result ; public MultiplicationResult( int n1 , int n2, int result ) { this. n1 = n1; this. n2 = n2; this. result = result; } public int getN1() { return n1; } public int getN2() { return n2; } public int getResult() { return result; } } static class Divide implements MathOp { private static final long serialVersionUID = 1L; private final double n1 ; private final int n2 ; public Divide( double n1 , int n2) { this. n1 = n1; this. n2 = n2; } public double getN1() { return n1; } public int getN2() { return n2; } } static class DivisionResult implements MathResult { private static final long serialVersionUID = 1L; private final double n1 ; private final int n2 ; private final double result ; public DivisionResult( double n1 , int n2, double result ) { this. n1 = n1; this. n2 = n2; this. result = result; } public double getN1() { return n1; } public int getN2() { return n2; } public double getResult() { return result; } } }
import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; public class CreationActor extends UntypedActor { @Override public void onReceive(Object message ) throws Exception { if ( message instanceof Op .MathOp) { ActorRef calculator = getContext().actorOf(Props.create(CalculatorActor. class )); calculator.tell( message, getSelf()); } else if (message instanceof Op.MultiplicationResult) { Op .MultiplicationResult result = (Op.MultiplicationResult) message; System. out .printf("Mul result: %d * %d = %d\n" , result .getN1(), result .getN2(), result.getResult()); getContext().stop(getSender()); } else if (message instanceof Op .DivisionResult) { Op .DivisionResult result = (Op .DivisionResult) message ; System. out .printf("Div result: %.0f / %d = %.2f\n" , result .getN1(), result .getN2(), result.getResult()); getContext().stop(getSender()); } else { unhandled( message); } } }
import akka.actor.UntypedActor; /** * 计算加减乘除的Actor getSender getSelf */ public class CalculatorActor extends UntypedActor { @Override public void onReceive(Object message ) { if ( message instanceof Op.Add) { Op.Add add = (Op.Add) message; System. out .println("Calculating " + add .getN1() + " + " + add.getN2()); Op.AddResult result = new Op.AddResult(add .getN1(), add .getN2(), add .getN1() + add.getN2()); getSender().tell( result, getSelf()); } else if (message instanceof Op.Subtract) { Op.Subtract subtract = (Op.Subtract) message; System. out .println("Calculating " + subtract .getN1() + " - " + subtract .getN2()); Op.SubtractResult result = new Op.SubtractResult(subtract .getN1(),subtract.getN2(), subtract .getN1() - subtract .getN2()); getSender().tell( result, getSelf()); } else if (message instanceof Op.Multiply) { Op.Multiply multiply = (Op.Multiply) message; System. out .println("Calculating " + multiply .getN1() + " * " + multiply .getN2()); Op.MultiplicationResult result = new Op.MultiplicationResult(multiply .getN1(), multiply .getN2(), multiply .getN1() * multiply.getN2()); getSender().tell( result, getSelf()); } else if (message instanceof Op.Divide) { Op.Divide divide = (Op.Divide) message; System. out .println("Calculating " + divide .getN1() + " / " + divide .getN2()); Op.DivisionResult result = new Op.DivisionResult(divide .getN1(), divide .getN2(), divide .getN1() / divide .getN2()); getSender().tell( result, getSelf()); } else { unhandled( message); } } }
基础的配置文件:
commom.conf文件
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" } } }
calculater.conf文件
include "common" akka { # LISTEN on tcp port 2552 remote.netty.tcp.port = 8989 }
1.创建远程Actors
import java.util.Arrays; import java.util.concurrent.Callable; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; import akka.dispatch.OnSuccess; import akka.japi.Function; import akka.pattern.Patterns; import akka.util.Timeout; import com.typesafe.config.ConfigFactory; public class RemoteActorDemo { public static void main(String args[]) { // 不使用默认的配置,而是选择加载选定的remote actor配置 final ActorSystem system = ActorSystem.create( "CalculatorWorkerSystem", ConfigFactory.load(( "calculator"))); // remote actor的ref final ActorRef calculatorActor = system .actorOf(Props.create(CalculatorActor. class ), "CalculatorActor" ); System. out.println( "Started CalculatorWorkerSystem" ); final Timeout timeout = new Timeout(Duration.create(5, "seconds")); Future<Object> addFuture = Patterns.ask( calculatorActor, new Op.Add(1, 2), timeout ); Future<Object> subtractFuture = Patterns.ask( calculatorActor, newOp.Subtract(1, 2), timeout ); Future<Object> multiplyFuture = Patterns.ask( calculatorActor, newOp.Multiply(1, 2), timeout ); Future<Object> divideFuture = Patterns.ask( calculatorActor, newOp.Divide(1, 2), timeout ); Iterable<Future<Object>> futureArray = Arrays.asList (addFuture , subtractFuture , multiplyFuture, divideFuture ); Future<Iterable<Op.MathResult>> futureResult = Futures.traverse( futureArray, new Function<Future<Object>, Future<Op.MathResult>>() { public Future<Op.MathResult> apply( final Future<Object> param ) throws Exception { return Futures.future( new Callable<Op.MathResult>() { public Op.MathResult call() throws Exception { return (Op.MathResult) Await.result( param, timeout .duration()); } }, system.dispatcher()); } }, system.dispatcher()); futureResult.onSuccess( new OnSuccess<Iterable<Op.MathResult>>() { @Override public void onSuccess(Iterable<Op.MathResult> result ) throws Throwable { for (Op.MathResult r : result ) { if (r instanceof Op.AddResult) { System. out .println("add result=" + ((Op.AddResult) r ).getResult()); } else if (r instanceof Op.SubtractResult) { System. out .println("subtract result=" + ((Op.SubtractResult) r ).getResult()); } else if (r instanceof Op.MultiplicationResult) { System. out .println("multiply result=" + ((Op.MultiplicationResult) r ).getResult()); } else if (r instanceof Op.DivisionResult) { System. out .println("divide result=" + ((Op.DivisionResult) r ).getResult()); } } } }, system.dispatcher()); } }
输出结果:
[INFO] [05/25/2015 23:48:23.062] [main] [Remoting] Starting remoting [INFO] [05/25/2015 23:48:23.225] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:8989] [INFO] [05/25/2015 23:48:23.227] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:8989] Started CalculatorWorkerSystem Calculating 1 + 2 Calculating 1 - 2 Calculating 1 * 2 Calculating 1.0 / 2 add result=3 subtract result=-1 multiply result=2 divide result=0.5
2.查找远程Actors
remotelookup.conf文件
include "common" akka { remote.netty.tcp.port = 2553 }
import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import com.typesafe.config.ConfigFactory; public class RemoteActorSelectionDemo { public static class HandlerResult extends UntypedActor { @Override public void preStart() throws Exception { ActorSelection selection = this .getContext().actorSelection("akka.tcp://[email protected]:8989/user/CalculatorActor" ); System. out .println("selection : " + selection ); selection.tell( new Op.Add(1, 2), this.getSelf()); } @Override public void onReceive(Object message ) throws Exception { if ( message instanceof Op.AddResult) { System. out .println("add result=" + ((Op.AddResult) message ).getResult()); } else if (message instanceof Op.SubtractResult) { System. out .println("subtract result=" + ((Op.SubtractResult) message ).getResult()); } else if (message instanceof Op.MultiplicationResult) { System. out .println("multiply result=" + ((Op.MultiplicationResult) message ).getResult()); } else if (message instanceof Op.DivisionResult) { System. out .println("divide result=" + ((Op.DivisionResult) message ).getResult()); } } } public static void main(String args[]) { // 不使用默认的配置,而是选择加载选定的remote actor配置 final ActorSystem system = ActorSystem.create( "CalculatorWorkerSystem", ConfigFactory.load(( "calculator"))); // 初始化远程actor ActorRef actref = system .actorOf(Props.create(CalculatorActor. class ),"CalculatorActor" ); System. out.println( "Started CalculatorWorkerSystem" ); // 初始化本地的Actor final ActorSystem localSystem = ActorSystem.create( "localSystem", ConfigFactory.load(( "remotelookup"))); localSystem .actorOf(Props.create(HandlerResult. class ), "handlerResult" ); } }
输出结果:
[INFO] [05/25/2015 23:50:17.523] [main] [Remoting] Starting remoting [INFO] [05/25/2015 23:50:17.689] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:8989] [INFO] [05/25/2015 23:50:17.691] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:8989] Started CalculatorWorkerSystem [INFO] [05/25/2015 23:50:17.714] [main] [Remoting] Starting remoting [INFO] [05/25/2015 23:50:17.724] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2553] [INFO] [05/25/2015 23:50:17.725] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2553] selection : ActorSelection[Anchor(akka.tcp://[email protected]:8989/), Path(/user/CalculatorActor)] Calculating 1 + 2 add result=3
时间: 2024-10-15 02:31:50