akka入门-远程调用

akka远程调用有两种形式:

一种是查找远程Actors,一种是创建远程Actors。

公用的类:

import java.io.Serializable;

public class Op {

  public interface MathOp extends Serializable {
  }

  public interface MathResult extends Serializable {
  }

  static class Add implements MathOp {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;

    public Add( int n1 , int n2) {
      this. n1 = n1;
      this. n2 = n2;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class AddResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;
    private final int result ;

    public AddResult( int n1 , int n2, int result) {
      this. n1 = n1;
      this. n2 = n2;
      this. result = result;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public int getResult() {
      return result;
    }
  }

  static class Subtract implements MathOp {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;

    public Subtract( int n1 , int n2) {
      this. n1 = n1;
      this. n2 = n2;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class SubtractResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;
    private final int result ;

    public SubtractResult( int n1 , int n2, int result) {
      this. n1 = n1;
      this. n2 = n2;
      this. result = result;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public int getResult() {
      return result;
    }
  }

  static class Multiply implements MathOp {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;

    public Multiply( int n1 , int n2) {
      this. n1 = n1;
      this. n2 = n2;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class MultiplicationResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final int n1 ;
    private final int n2 ;
    private final int result ;

    public MultiplicationResult( int n1 , int n2, int result ) {
      this. n1 = n1;
      this. n2 = n2;
      this. result = result;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public int getResult() {
      return result;
    }
  }

  static class Divide implements MathOp {
    private static final long serialVersionUID = 1L;
    private final double n1 ;
    private final int n2 ;

    public Divide( double n1 , int n2) {
      this. n1 = n1;
      this. n2 = n2;
    }

    public double getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class DivisionResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final double n1 ;
    private final int n2 ;
    private final double result ;

    public DivisionResult( double n1 , int n2, double result ) {
      this. n1 = n1;
      this. n2 = n2;
      this. result = result;
    }

    public double getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public double getResult() {
      return result;
    }
  }
}
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class CreationActor extends UntypedActor {

  @Override
  public void onReceive(Object message ) throws Exception {

    if ( message instanceof Op .MathOp) {
      ActorRef calculator = getContext().actorOf(Props.create(CalculatorActor. class ));
      calculator.tell( message, getSelf());

    } else if (message instanceof Op.MultiplicationResult) {
      Op .MultiplicationResult result = (Op.MultiplicationResult) message;
      System. out .printf("Mul result: %d * %d = %d\n" , result .getN1(), result .getN2(), result.getResult());
      getContext().stop(getSender());

    } else if (message instanceof Op .DivisionResult) {
      Op .DivisionResult result = (Op .DivisionResult) message ;
      System. out .printf("Div result: %.0f / %d = %.2f\n" , result .getN1(), result .getN2(), result.getResult());
      getContext().stop(getSender());

    } else {
      unhandled( message);
    }
  }
}
import akka.actor.UntypedActor;

/**
 * 计算加减乘除的Actor getSender getSelf
 */
public class CalculatorActor extends UntypedActor {
  @Override
  public void onReceive(Object message ) {

    if ( message instanceof Op.Add) {
      Op.Add add = (Op.Add) message;
      System. out .println("Calculating " + add .getN1() + " + " + add.getN2());
      Op.AddResult result = new Op.AddResult(add .getN1(), add .getN2(), add .getN1() + add.getN2());
      getSender().tell( result, getSelf());

    } else if (message instanceof Op.Subtract) {
      Op.Subtract subtract = (Op.Subtract) message;
      System. out .println("Calculating " + subtract .getN1() + " - " + subtract .getN2());
      Op.SubtractResult result = new Op.SubtractResult(subtract .getN1(),subtract.getN2(), subtract .getN1() - subtract .getN2());
      getSender().tell( result, getSelf());

    } else if (message instanceof Op.Multiply) {
      Op.Multiply multiply = (Op.Multiply) message;
      System. out .println("Calculating " + multiply .getN1() + " * " + multiply .getN2());
      Op.MultiplicationResult result = new Op.MultiplicationResult(multiply .getN1(), multiply .getN2(), multiply .getN1() * multiply.getN2());
      getSender().tell( result, getSelf());

    } else if (message instanceof Op.Divide) {
      Op.Divide divide = (Op.Divide) message;
      System. out .println("Calculating " + divide .getN1() + " / " + divide .getN2());
      Op.DivisionResult result = new Op.DivisionResult(divide .getN1(), divide .getN2(), divide .getN1() / divide .getN2());
      getSender().tell( result, getSelf());

    } else {
      unhandled( message);
    }
  }
}

基础的配置文件:

commom.conf文件

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
     enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }
}

calculater.conf文件

include "common"

akka {
  # LISTEN on tcp port 2552
  remote.netty.tcp.port = 8989
}

1.创建远程Actors

import java.util.Arrays;
import java.util.concurrent.Callable;

import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;

import com.typesafe.config.ConfigFactory;

public class RemoteActorDemo {

  public static void main(String args[]) {

    // 不使用默认的配置,而是选择加载选定的remote actor配置
    final ActorSystem system = ActorSystem.create( "CalculatorWorkerSystem", ConfigFactory.load(( "calculator")));

    // remote actor的ref
    final ActorRef calculatorActor = system .actorOf(Props.create(CalculatorActor. class ), "CalculatorActor" );

    System. out.println( "Started CalculatorWorkerSystem" );

    final Timeout timeout = new Timeout(Duration.create(5, "seconds"));
    Future<Object> addFuture = Patterns.ask( calculatorActor, new Op.Add(1, 2), timeout );
    Future<Object> subtractFuture = Patterns.ask( calculatorActor, newOp.Subtract(1, 2), timeout );
    Future<Object> multiplyFuture = Patterns.ask( calculatorActor, newOp.Multiply(1, 2), timeout );
    Future<Object> divideFuture = Patterns.ask( calculatorActor, newOp.Divide(1, 2), timeout );

    Iterable<Future<Object>> futureArray = Arrays.asList (addFuture , subtractFuture , multiplyFuture, divideFuture );
    Future<Iterable<Op.MathResult>> futureResult = Futures.traverse( futureArray, new Function<Future<Object>, Future<Op.MathResult>>() {
      public Future<Op.MathResult> apply( final Future<Object> param ) throws Exception {
        return Futures.future( new Callable<Op.MathResult>() {
          public Op.MathResult call() throws Exception {
            return (Op.MathResult) Await.result( param, timeout .duration());
          }
        }, system.dispatcher());
      }
    }, system.dispatcher());

    futureResult.onSuccess( new OnSuccess<Iterable<Op.MathResult>>() {
      @Override
      public void onSuccess(Iterable<Op.MathResult> result ) throws Throwable {
        for (Op.MathResult r : result ) {
          if (r instanceof Op.AddResult) {
            System. out .println("add result=" + ((Op.AddResult) r ).getResult());
          } else if (r instanceof Op.SubtractResult) {
            System. out .println("subtract result=" + ((Op.SubtractResult) r ).getResult());
          } else if (r instanceof Op.MultiplicationResult) {
            System. out .println("multiply result=" + ((Op.MultiplicationResult) r ).getResult());
          } else if (r instanceof Op.DivisionResult) {
            System. out .println("divide result=" + ((Op.DivisionResult) r ).getResult());
          }
        }
      }
    }, system.dispatcher());
  }
}

输出结果:

[INFO] [05/25/2015 23:48:23.062] [main] [Remoting] Starting remoting
[INFO] [05/25/2015 23:48:23.225] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:8989]
[INFO] [05/25/2015 23:48:23.227] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:8989]
Started CalculatorWorkerSystem
Calculating 1 + 2
Calculating 1 - 2
Calculating 1 * 2
Calculating 1.0 / 2
add result=3
subtract result=-1
multiply result=2
divide result=0.5

2.查找远程Actors

remotelookup.conf文件

include "common"

akka {
  remote.netty.tcp.port = 2553
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;

import com.typesafe.config.ConfigFactory;

public class RemoteActorSelectionDemo {

  public static class HandlerResult extends UntypedActor {

    @Override
    public void preStart() throws Exception {
      ActorSelection selection = this .getContext().actorSelection("akka.tcp://[email protected]:8989/user/CalculatorActor" );
      System. out .println("selection : " + selection );
      selection.tell( new Op.Add(1, 2), this.getSelf());
    }

    @Override
    public void onReceive(Object message ) throws Exception {
      if ( message instanceof Op.AddResult) {
        System. out .println("add result=" + ((Op.AddResult) message ).getResult());
      } else if (message instanceof Op.SubtractResult) {
        System. out .println("subtract result=" + ((Op.SubtractResult) message ).getResult());
      } else if (message instanceof Op.MultiplicationResult) {
        System. out .println("multiply result=" + ((Op.MultiplicationResult) message ).getResult());
      } else if (message instanceof Op.DivisionResult) {
        System. out .println("divide result=" + ((Op.DivisionResult) message ).getResult());
      }
    }
  }

  public static void main(String args[]) {
    // 不使用默认的配置,而是选择加载选定的remote actor配置
    final ActorSystem system = ActorSystem.create( "CalculatorWorkerSystem", ConfigFactory.load(( "calculator")));
    // 初始化远程actor
    ActorRef actref = system .actorOf(Props.create(CalculatorActor. class ),"CalculatorActor" );
    System. out.println( "Started CalculatorWorkerSystem" );

    // 初始化本地的Actor
    final ActorSystem localSystem = ActorSystem.create( "localSystem", ConfigFactory.load(( "remotelookup")));
    localSystem .actorOf(Props.create(HandlerResult. class ), "handlerResult" );

  }
}

输出结果:

[INFO] [05/25/2015 23:50:17.523] [main] [Remoting] Starting remoting
[INFO] [05/25/2015 23:50:17.689] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:8989]
[INFO] [05/25/2015 23:50:17.691] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:8989]
Started CalculatorWorkerSystem
[INFO] [05/25/2015 23:50:17.714] [main] [Remoting] Starting remoting
[INFO] [05/25/2015 23:50:17.724] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2553]
[INFO] [05/25/2015 23:50:17.725] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2553]
selection : ActorSelection[Anchor(akka.tcp://[email protected]:8989/), Path(/user/CalculatorActor)]
Calculating 1 + 2
add result=3
时间: 2024-10-15 02:31:50

akka入门-远程调用的相关文章

bos 第5天(定区的添加、定区的分页查询、hessian远程调用实现获取客户信息)

BOS项目笔记 第5天 今天内容安排: 1.添加定区功能 2.定区分页查询 3.hessian入门----远程调用技术 4.基于hessian实现定区关联客户 1. 添加定区 定区可以将取派员.分区.客户信息关联到一起. 页面:WEB-INF/pages/base/decidedzone.jsp 第一步:使用下拉框展示取派员数据,修改combobox的URL地址,发送请求 第二步:在StaffAction中提供listajax方法,查询没有作废的取派员,返回json数据 第三步:在StaffSe

远程调用——hessian使用入门

Hessian是一个轻量级的remoting onhttp工具,使用简单的方法提供了RMI的功能. 相比WebService,Hessian更简单.快捷.采用的是二进制RPC协议,因为采用的是二进制协议,所以它很适合于发送二进制数据. 常见的远程调用的技术: 1.webservice(CXF.axis)soap 2.httpclient 3.hessian---http协议.二进制数据 4.dubbo---阿里巴巴 hessian有两种发布服务的方式: 1.使用hessian框架自己提供的Ser

C#远程调用技术WebService葵花宝典

一.课程介绍 直接开门见山吧,在学习之前阿笨想问大家一句,关于WebService远程过程调用技术(RPC) 你真的会了吗?不要跟老夫扯什么WebService技术已经过时,如果你的内心有在偷偷告诉你其实我是真的不会WebService的话,那么恭喜你,因为你在这茫茫的IT编程世界里找到了这本<C#远程调用技术WebService葵花宝典>!曾经有一位不知名的讲师说过这么一句名言: 一门RPC技术不会,那么千万万门RPC技术将都不会! 本次阿笨的分享课包含以下知识点. 1.C# WebServ

【Rest】在Dubbo中开发REST风格的远程调用(RESTful Remoting)

目录 概述 REST的优点 应用场景 快速入门 标准Java REST API:JAX-RS简介 REST服务提供端详解 HTTP POST/GET的实现 Annotation放在接口类还是实现类 JSON.XML等多数据格式的支持 中文字符支持 XML数据格式的额外要求 定制序列化 配置REST Server的实现 获取上下文(Context)信息 配置端口号和Context Path 配置线程数和IO线程数 配置长连接 配置最大的HTTP连接数 配置每个消费端的超时时间和HTTP连接数 GZ

Dubbo搭建HelloWorld-搭建服务提供者与服务消费者并完成远程调用(附代码下载)

场景 Dubbo简介与基本概念: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/103555224 Dubbo环境搭建-ZooKeeper注册中心: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/103555470 Dubbo环境搭建-管理控制台dubbo-admin实现服务监控: https://blog.csdn.net/BADAO_LIUMANG_QI

Sql Server远程调用失败

这篇文章主要为大家介绍了SQL server服务显示远程过程调用失败的解决方法,还为大家提供了解决SQL SERVER 2008 R2配置管理器出现"远程过程调用失败"(0x800706be)错误提示的方案,感兴趣的小伙伴们可以参考一下 刚刚打开SQL Server 2008,想要新建一个数据库,却发现出现了一个问题,这个问题由于之前没有遇到过,所以下面整理解决SQL server服务远程调用失败的几个方法,供大家参考,具体内容如下 先看看出现的问题: 出现上面这个错误的原因可能是由于

RHEL6-X Window System-8.图形桌面的本地调用与远程调用解析

Linux的X或X11 (X Window System)是一个基础的图形框架接口,拥有基本的图形显示.在此框架基础之上,有诸如GNOME/KDE之类的图形桌面窗口管理应用软件(Window manager).而且图形框架X11包括两大部分X Client与X Server,采用C/S主从架构.所以它是一个system而不单单只是一个组件. Linux图形桌面的本地调用与远程调用解析如图 说明: 通过图形还是文本控制台传递命令,经过内核处理后,返回相应的数据给对方 本地情况: 1.如果是图形,则

远程调用方式概述

在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI.MINA.ESB. Burlap.Hessian.SOAP.EJB和JMS 等,这些名词之间到底是些什么关系呢,它们背后到底是基于什么原理实现的呢,了解这些是实现分布式服务框架的基础知识,而如果在性能上有高的要求的话,那 深入了解这些技术背后的机制就是必须的了,本文中我们将来一探究竟,抛砖引玉,欢迎大家提供更多的实现远程通讯的技术和原理的介绍. 基本原理 要实现网络机器间的通讯

如何基于Vert.x实现远程调用?

Vert.x的微服务 最近关于微服务的概念到处都在宣传,而Vert.x的verticle本身就是很好的一种服务定义,你可以把verticle看成一个service,也可以把verticle看成一个actor.这样你的视角会切到Actor模型里.本文我们将讨论如何基于Vert.x实现远程调用. 传统Java开发人员受EJB以及Spring的影响比较深,所以对面向接口编程了解的比较多.哪怕跨JVM也可以通过接口来调用对方提供的方法.这是非常友好方便的开发方式,因为框架层面做了服务发现以及服务生命周期