Scala的actor

1. Scala Actor

(1)相关介绍:

??Scala中的actor能够实现并行编程的强大功能,他是基于事件模型的并发机制,scala是运用消息(message)的发送、接收来实现多线程的(Java是使用共享内存实现的)。使用 Scala 能够更容易地实现多线程应用的开发。
??一个actor是一个容器,它包含状态,行为,信箱,子actor和监督策略。所有这些包含在一个actorReference(Actor引用)里。一个actor需要与外界隔离才能从actor模型中获益,所以actor是以actor引用的形式展现给外界的。

(2)Java并发编程模型与scala actor模型的区别:

? ?1)Java :

???? - Java 中的并发编程基本上满足了事件之间相互独立,但是事件不能够同时发生的场景的需要。
???? - Java 中的并发编程是基于共享数据和加锁的一种机制,即会有一个共享的数据,然后有 若干个线程去访问这个共享的数据(主要是对这个共享的数据进行修改),同时 Java 利用加锁 的机制(即 synchronized)来确保同一时间只有一个线程对我们的共享数据进行访问,进而保 证共享数据的一致性。
???? - Java 中的并发编程存在资源争夺和死锁等多种问题,因此程序越大问题越麻烦。

? ?2)scala actor :

???? - Scala 中的 Actor 是一种不共享数据,依赖于消息传递的一种并发编程模式,避免了死锁、资源争夺等情况。在具体实 现的过程中,Scala 中的 Actor 会不断的循环自己的邮箱,并通过 receive 偏函数进行消息的模式匹配并进行相应的处理。
???? - 如果 Actor A 和 Actor B 要相互沟通的话,首先 A 要给 B 传递一个消息,B 会有一个收件箱,然后 B 会不断的循环自己的收件箱,若看见 A 发过来的消息,B 就会解析 A 的消息并执行,处理完之后就有可能将处理的结果通过邮件的方式发送给 A。

??对于 Java,我们都知道它的多线程实现需要对共享资源(变量、对象等)使用 synchronized 关键字进行代码块同步、对象锁互斥等等。而且,常常一大块的 try…catch 语句块中加上 wait 方法、notify 方法、notifyAll 方法是让人很头疼的。原因就在于 Java 中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。而在 Scala 中, 我们可以通过复制不可变状态的资源(即对象,Scala 中一切都是对象,连函数、方法也是) 的一个副本,再基于 Actor 的消息发送、接收机制进行并行编程。

(3)actor中发消息的方式:

(4)actor入门实例:

pom.xml

<properties>
      <scala.version>2.11.8</scala.version>
      <scala.actors.version>2.11.8</scala.actors.version>
</properties>
<dependencies>
      <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
      </dependency>
                  <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-actors -->
      <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-actors</artifactId>
          <version>${scala.actors.version}</version>
      </dependency>
</dependencies>
object Base_Actor {
  def main(args: Array[String]): Unit = {
    //调用start方法
    new MyActor1().start()
    new MyActor2().start()
  }
}
////继承Actor类,相当于Java中的Thread
class MyActor1 extends Actor{
  //重新 act 方法,相当于Java中的run方法
  override def act(): Unit = {
    println("hahaha")
  }
}
//继承Actor类,相当于Java中的Thread
class MyActor2 extends Actor{
  //重新 act 方法,相当于Java中的run方法
  override def act(): Unit = {
    println("hello world")
  }
}
// 注意:上面分别调用了两个单例对象的 start()方法,他们的 act()方法会被执行,相同与在 Java 中开启了两个线程,线程的 run()方法会被执行,这两个 Actor 是并行执行的。

?

2. Akka Actor

? (1) Akka Actor介绍:

??Akka 基于 Actor 模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。
????????
??Actor 模型:在计算机科学领域,Actor 模型是一个并行计算(Concurrent Computation)模型, 它把 actor 作为并行计算的基本元素来对待:为响应一个接收到的消息,一个 actor 能够自己做出一些决策,如创建更多的 actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。
??Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每一个actor都有自己的收件箱。通过Actor能够简化锁及线程管理,可以非常容易的开发出正确的并发程序和并行系统。
??Actor 特性:? 提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下 的编程开发? 提供了异步非阻塞的、高性能的事件驱动编程模型? 超级轻量级事件处理(每 GB 堆内存几百万 Actor)

? (2) Akka Actor 重要API:

??- ActorSystem:在 Akka 中,ActorSystem 是一个重量级的结构,他需要分配多个线程,所以在实际应用中, ActorSystem 通常是一个单例对象,我们可以使用这个 ActorSystem 的 actorOf 方法创建很多 Actor。
??- Actor:在 Akka 中,Actor 负责通信,在 Actor 中有一些重要的生命周期方法。
??- preStart()方法:该方法在 Actor 对象构造方法执行后执行,整个 Actor 生命周期中仅执行一次。
??- receive()方法:该方法在 Actor 的 preStart 方法执行完成后执行,用于接收消息,会被反复执行。

? (3) Akka Actor 入门案例:

??????
pom.xml

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
      <scala.version>2.11.8</scala.version>
      <scala.actors.version>2.11.8</scala.actors.version>
      <akka.version>2.4.17</akka.version>
  </properties>
    <dependencies>
      <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
      </dependency>

      <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-actor_2.11</artifactId>
          <version>${akka.version}</version>
      </dependency>

      <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-actors -->
      <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-actors</artifactId>
          <version>${scala.actors.version}</version>
      </dependency>

      <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-remote_2.11</artifactId>
          <version>${akka.version}</version>
      </dependency>
  </dependencies>

master:

import java.text.SimpleDateFormat
import java.util._

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

class MyMaster extends  Actor{
  def doHell(): Unit ={
    println("我是master,我接受到了worker的消息!")
  }

  /**
    * 这就是一个偏函数, 用来处理当前这个actor对象接收到的所有的消息
    */
  override def receive: Receive = {
    case "hello" =>{
      doHell
      //用以发送信息到相应的worker,!表示 异步无返回值
      sender() ! "hi"
    }
    case "getNow" =>{
      doHell
      sender() ! new SimpleDateFormat("yyyy-MM-dd").format(new Date())
    }
  }
}
object MyMaster{
  def main(args: Array[String]): Unit = {
    //1.构建一个:ActorSystem
    val strConfig=
      """
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = localhost
        |akka.remote.netty.tcp.port = 9527
      """.stripMargin
    val config: Config = ConfigFactory.parseString(strConfig)
    val myMaster: ActorSystem = ActorSystem("ActorSystemMaster",config)
    //2.通过actorsystem创建actor
    myMaster.actorOf(Props(new MyMaster()),"MasterActor")
  }
}

worker

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

class MyWorker extends Actor{

  override def preStart(): Unit = {
    val hostname="localhost"
    val serveractorsystem="ActorSystemMaster"
    val serveractor="MasterActor"
    val port="9527"
    //在创建worker actor之前向master发送一个消息
    val master=context.actorSelection(s"akka.tcp://${serveractorsystem}@${hostname}:${port}/user/${serveractor}")
    val message="getNow"
    //获得master相关对象,向master发送信息
    master ! message

  }
  //处理相应的来自master返回的信息
  override def receive: Receive = {
    case date:String => {
      println("时间日期:"+date)
    }
    case "hi" =>{
      println("我是worker,接收到master发送过来的结果: hi")
    }
  }
}
object MyWorker{
  def main(args: Array[String]): Unit = {
    //1.构建一个:ActorSystem
    val strConfig:String=
      """
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = localhost
      """.stripMargin
    val config: Config = ConfigFactory.parseString(strConfig)
    val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem",config)
    workerActorSystem.actorOf(Props(new MyWorker()),"workerActor")
  }
}

原文地址:http://blog.51cto.com/14048416/2337504

时间: 2024-10-12 02:11:54

Scala的actor的相关文章

基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验

学习了基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验,应用actor模型,位置透明,做到高并发.可伸缩.容错.单机也可以用,水平扩展.垂直扩展.容错都有很好的表现,spark中的例子如下: private def initializeEventProcessActor(){ implicat val timeout=Timeout( 30 seconds) val initEventActorReply= dagSchedulerActorSupervisor ? Prop

Scala 深入浅出实战经典 第90讲:基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验

akka提供了可伸缩的实时事务处理功能. akka基于actor,并提供了位置透明. 1GB的heap可以有2500000个actor. 水平扩展,垂直扩展,容错3个方面的解决方式. 树形结构的actor,每个actor都有状态和行为. DT大数据梦工厂微信公众账号:DT_Spark. DT大数据梦工厂的微信公众号是DT_Spark,每天都会有大数据实战视频发布,请您持续学习. 王家林DT大数据梦工厂scala的所有视频.PPT和代码在百度云盘的链接:http://pan.baidu.com/s

[scala] akka actor编程

Akka基础 Akka笔记之Actor简介  Akka中的Actor遵循Actor模型.你可以把Actor当作是人.这些人不会亲自去和别人交谈.他们只通过邮件来交流.  1. 消息传递 2. 并发 3. 异常处理 4. 多任务 5. 消息链 Akka笔记之消息传递 消息发送给actor代理: 消息是不可变对象(可带有属性的case class): 分发器dispatcher和邮箱: dispatcher从actorRef取出一条消息放在目标actor邮箱中,然后放mailbox放在一个Threa

scala中的Actor

1.介绍 2.简单示例 3.第二个程序 4.通信程序 1 package day01 2 import scala.actors.Actor 3 case class Message(content: String, sender: Actor) 4 class LeoActor extends Actor{ 5 def act(){ 6 while (true){ 7 receive{ 8 case Message(content,sender)=>{ 9 println("leo: &

Scala 深入浅出实战经典 第67讲:Scala并发编程匿名Actor、消息传递、偏函数解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/LwsfuGIsWEk/优酷:http://v.youku.com/v_show/id_

Scala学习笔记--Actor和并发

未完成. SimpleActor.scala //actor是一个类似线程的实体,它有一个用来接收消息的信箱. //实现actor的方法是继承Scala.actors.Actor并完成其act方法 //通过调用actor的start方法来启动它 class SillyActor extends Actor{ def act(){ for(i<- 1 to 5){ println("Actor"+i) Thread.sleep(1000); } } } object MyActor

scala学习笔记-Actor(19)

Scala的Actor类似于Java中的多线程编程.但是不同的是,Scala的Actor提供的模型与多线程有所不同.Scala的Actor尽可能地避免锁和共享状态,从而避免多线程并发时出现资源争用的情况,进而提升多线程编程的性能.此外,Scala Actor的这种模型还可以避免死锁等一系列传统多线程编程的问题. Spark中使用的分布式多线程框架,是Akka.Akka也实现了类似Scala Actor的模型,其核心概念同样也是Actor Actor的创建.启动和消息收发 1 // Scala提供

scala并发编程Actor实战

今天在这里跟大家分享下王家林老师讲的scala编程中的actor的并发编程. 在java中,同时进行一个操作的时候,java会先把共享数据锁死,从而避免不同的操作来同时操作一个数据,这个就形成了资源调度的问题,而且如果用不好,还会造成死锁.而在scala中,通过类似消息的发送和接收的队列的方式,来访问同一个共享数据,这样一来,当轮到一个操作来访问某个数据的时候,不会发生另一个操作也同时访问的该数据的情况,这样就避免了资源争用的问题及死锁的发生.下面我们通过一个小小的实例来看看scala是怎样通过

scala Actor -03

1.对于上一篇讲解的scala的一些补充 val files = Array[String]("a.txt","b.txt","c.txt") for(f <- files){xxxx} 目标一:熟悉Scala Actor并发编程 目标二:为学习Akka做准备 注:我们现在学的Scala Actor是scala 2.10.x版本及以前版本的Actor. Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor, 老版本的Ac