Scala 学习 (七) 并发编程模型Akka

一,Akka简介

二,Akka中的Actor模型

三,Akka实战案例之HelloActor

四,Akka实战案例之PingPong

五,案例基于 Actor 的聊天模型

正文

一,Akka简介

  写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时。Akka 用 Scala 语言写成,同时提供了 Scala 和 JAVA 的开发接口

二,Akka中的Actor模型  

  Akka 处理并发的方法基于 Actor 模型。在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是 Actor 模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor 与 Actor 之间只能通过消息通信。

  1)对并发模型进行了更高的抽象
  2)异步、非阻塞、高性能的事件驱动编程模型
  3)轻量级事件处理(1GB 内存可容纳百万级别个 Actor)

为什么 Actor 模型是一种处理并发问题的解决方案?
  处理并发问题就是如何保证共享数据的一致性和正确性,为什么会有保持共享数据正确性这个问题呢?无非是我们的程序是多线程的,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。那么我们是不是可以转换一下思维,用单线程去处理相应的请求,但是又有人会问了,若是用单线程处理,那系统的性能又如何保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。

三,Akka实战案例之HelloActor

  Akka的几个重要的角色:

  MailBox:用来存储Actor之间收发的消息,是一个队列,所以收发到的消息是按顺序解析的。

  Dispatcher Message:消息分发器,Actor之间发送的消息先发送到分发器,再发送到MailBox

  Actor的Ref:用来发送消息。

实例:

package helloActor

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

class HelloActor extends Actor {
    // 接收消息并处理
    override def receive: Receive = {
        case "hellow" => print("en ,hellow")
        case _ =>{
            context.stop(self)  //停止自己的actorRef
            context.system.terminate()// 关闭ActorSystem
        }
    }
}

object HelloActor {
    private val nBfactory = ActorSystem("NBfactory")  // 工厂
    // 创建自己的ActorRef:通过Ref进行数据发送
    private val helloActor: ActorRef = nBfactory.actorOf(Props[HelloActor], "helloActor")
    def main(args: Array[String]): Unit = {
        helloActor ! "hellow"
        helloActor ! "老王八"
    }
}

四,Akka实战案例之PingPong

  两个人进行pingPong进行数据发送。

  第一个人的Actor:

package pingPangActor

import akka.actor.Actor

class FengActor extends Actor{
    override def receive: Receive = {
        case "start" => print("峰峰说:I am ok")
        case "啪" =>{
            println("峰峰:那必须滴!")
            Thread.sleep(1000)
            sender() ! "啪啪" // 向发送者发送数据
        }
    }
}

  第二个人的Actor:

package pingPangActor

import akka.actor.{Actor, ActorRef}

class LongActor(fg: ActorRef) extends Actor{
    override def receive: Receive = {
        case "start" =>{
            print("龙龙:I am ok!")
            fg ! "啪" // 向ff发送数据
        }
        case "啪啪" =>{
            print("你真猛")
            Thread.sleep(1000)
            fg ! "啪"
        }
    }
}

  启动:

package pingPangActor

import akka.actor.{ActorRef, ActorSystem, Props}

object PingPangApp extends App{

    // Actor工厂用来参数Ref
    private val pingPangActorSystem = ActorSystem("pingPangActorSystem")
    // 产生FF 的Ref
    private val ff: ActorRef = pingPangActorSystem.actorOf(Props[FengActor], "ff")
    // 参数ll 的Ref
    private val ll: ActorRef = pingPangActorSystem.actorOf(Props(new LongActor(ff)), "ll")

    ff ! "start" // 向自己的Mail发送start
    ll ! "start" // 向自己的Mail发送start
}

五,案例基于 Actor 的聊天模型

  如下实例:

  

  智能机器人回复系统实现:

  1) 创建一个 Server 端用于服务客户端发送过来的问题,并作处理并返回信息给客户
端!
  2) 创建一个 Client 端,用于向服务端发送问题,并接收服务端发送过来的消息

  server端:

package robot
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class RobotServer extends Actor{
    override def receive: Receive = {
        case "start" =>print("开始。。。")
        case ClientMessage(msg) => {
            println(s"收到客户端消息:  $msg")
            msg match {
                case "你叫啥" => sender() ! ServerMessage("铁扇公主")
                case "你是男是女" => sender() ! ServerMessage("女")
                case "你又男票吗" => sender() ! ServerMessage("没有")
                case _ => sender() ! ServerMessage("What do you say")
            }
        }
    }
}

object RobotServer{
    def main(args: Array[String]): Unit = {
        var host = "127.0.0.1"
        var port = 8808
        val config = ConfigFactory.parseString(
            s"""
              |akka.actor.provider="akka.remote.RemoteActorRefProvider"
              |akka.remote.netty.tcp.hostname=$host
              |akka.remote.netty.tcp.port=$port
            """.stripMargin
        )
        val server = ActorSystem("Server", config)
        val shanshan = server.actorOf(Props[RobotServer], "shanshan")
        shanshan ! "start"

    }
}

  client端:

package robot

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.io.StdIn

class PeopleClient(host: String, port: Int) extends Actor{

    var serverActorRef: ActorSelection = _ // 服务端的代理对象
    override def preStart(): Unit = {
        serverActorRef = context.actorSelection(s"akka.tcp://[email protected]${host}:${port}/user/shanshan")
    }

    override def receive: Receive = {
        case "start" =>print("老王系列启动....")
        case msg: String=>{
            // 把客户端输入的内容发送给 服务端(actorRef)--》服务端的mailbox中 -> 服务端的receive
            serverActorRef ! ClientMessage(msg)
        }
        case ServerMessage(msg) =>println(s"搜到服务端消息:$msg")
    }
}

object PeopleClient{
    def main(args: Array[String]): Unit = {
        val host = "127.0.0.1"
        val port  = 8809
        val serverHost = "127.0.0.1"
        val serverPort = 8808
        var config = ConfigFactory.parseString(
            s"""
               |akka.actor.provider="akka.remote.RemoteActorRefProvider"
               |akka.remote.netty.tcp.hostname=$host
               |akka.remote.netty.tcp.port=$port
            """.stripMargin
        )
        val client = ActorSystem("client", config)
        // 创建dispatch | mailbox
        val actorRef = client.actorOf(Props(new PeopleClient(serverHost, serverPort.toInt)), "001")
        // 自己给自己发送了一条消息 到自己的mailbox => receive
        actorRef ! "start"

        while (true){
            val question = StdIn.readLine() // 同步阻塞的, shit
            actorRef ! question // mailbox -> receive
        }
    }
}

  参数序列化:

package robot

// 服务端发送客户端的消息格式
case class ServerMessage(msg: String)

// 客户端发送到服务单的消息格式 实现了serilize接口,可以用于网络传输
case class ClientMessage(msg: String)

原文地址:https://www.cnblogs.com/tashanzhishi/p/10961633.html

时间: 2024-11-05 22:39:53

Scala 学习 (七) 并发编程模型Akka的相关文章

大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目

第十五章 客户信息管理系统15.1 项目的开发流程15.2 项目的需求分析15.3 项目的界面15.4 项目的设计-程序框架图15.5 项目的功能实现15.5.1 完成 Customer 类15.5.2 完成显示主菜单和退出软件功能15.5.3 完成显示客户列表的功能15.5.4 完成添加客户的功能15.5.5 完成删除客户的功能15.5.6 完善退出确认功能15.5.7 完善删除确认功能15.5.8 完成修改客户的功能第十六章 并发编程模型 Akka16.1 Akka 的介绍16.2 Acto

Scala-Unit7-Scala并发编程模型AKKA

一.Akka简介 Akka时spark的底层通信框架,Hadoop的底层通信框架时rpc. 并发的程序编写很难,但是Akka解决了spark的这个问题. Akka构建在JVM平台上,是一种高并发.分布式.并且容错的应用工具包: Akka使用Scala语言编写,同时它提供了Scala和Java的开发接口,Akka可以开发一些高并发的程序. 二.Akka的Acor模型 A卡卡处理并发的方法基于actor模型,在基于actor的系统中,所有事物都是actor(类似于Java的万物皆对象): actor

程序员必知的七种并发编程模型

1.线程与锁线程与锁模型有很多众所周知的不足,但仍是其他模型的技术基础,也是很多并发软件开发的首选. 2.函数式编程 函数式编程日渐重要的原因之一,是其对并发编程和并行编程提供了良好的支持.函数式编程消除了可变状态,所以从根本上是线程安全的,而且易于并行执行. 3.Clojure之道——分离标识与状态 编程语言Clojure是一种指令式编程和函数式编程的混搭方案,在两种编程方式上取得了微妙的平衡来发挥两者的优势. 4.actor actor模型是一种适用性很广的并发编程模型,适用于共享内存模型和

Java多线程-并发编程模型

以下内容转自http://ifeve.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E6%A8%A1%E5%9E%8B/: 并发系统可以采用多种并发编程模型来实现.并发模型指定了系统中的线程如何通过协作来完成分配给它们的作业.不同的并发模型采用不同的方式拆分作业,同时线程间的协作和交互方式也不相同.这篇并发模型教程将会较深入地介绍目前(2015年,本文撰写时间)比较流行的几种并发模型. 并发模型与分布式系统之间的相似性 本文所描述的并发模型类似于分布式系统中使

4.并发编程模型

并发系统可以采用多种并发编程模型来实现.并发模型指定了系统中的线程如何通过协作来完成分配给它们的作业.不同的并发模型采用不同的方式拆分作业,同时线程间的协作和交互方式也不相同.这篇并发模型教程将会较深入地介绍目前(2015年,本文撰写时间)比较流行的几种并发模型. 并发模型与分布式系统之间的相似性 本文所描述的并发模型类似于分布式系统中使用的很多体系结构.在并发系统中线程之间可以相互通信.在分布式系统中进程之间也可以相互通信(进程有可能在不同的机器中).线程和进程之间具有很多相似的特性.这也就是

【专家坐堂】四种并发编程模型简介

本文来自网易云社区 概述 并发往往和并行一起被提及,但是我们应该明确的是"并发"不等同于"并行" ?       并发 :同一时间 对待 多件事情 (逻辑层面) ?       并行 :同一时间 做(执行) 多件事情 (物理层面) 并发可以构造出一种问题解决方法,该方法能够被用于并行化,从而让原本只能串行处理的事务并行化,更好地发挥出当前多核CPU,分布式集群的能力. 但是,并发编程和人们正常的思维方式是不一样的,因此才有了各种编程模型的抽象来帮助我们更方便,更不容

IO复用、多进程和多线程三种并发编程模型

I/O复用模型 I/O复用原理:让应用程序可以同时对多个I/O端口进行监控以判断其上的操作是否可以进行,达到时间复用的目的.在书上看到一个例子来解释I/O的原理,我觉得很形象,如果用监控来自10根不同地方的水管(I/O端口)是否有水流到达(即是否可读),那么需要10个人(即10个线程或10处代码)来做这件事.如果利用某种技术(比如摄像头)把这10根水管的状态情况统一传达到某一点,那么就只需要1个人在那个点进行监控就行了,而类似与select或epoll这样的多路I/O复用机制就好比是摄像头的功能

基本的并发编程模型

基于进程的并发 基本模型 在TCP服务器编程中,多进程并发服务器通常由主进程负责连接的建立,然后fork出子进程,负责该连接剩下的行为,直到关闭. 关于多进程并发服务器有几点重要的内容: 通常服务器会运行很长时间,因此必须要包括一个SIGCHLD处理程序,来回收僵死子进程的资源.因为当SIGCHLD处理程序执行时,SIGCHLD信号是阻塞的,而Unix信号是不排队的,所以SIGCHLD处理程序必须准备好回收多个僵死子进程的资源. 父进程在fork调用后,将连接交给子进程处理,父子进程必须关闭他们

并发编程模型小结

1. 临界区加排他锁(Go sync.Mutex.Lock()) 如果并发量大,锁竞争激烈,会导致性能开销大 2. 读多写少场景,使用读写锁(Go sync.Mutex.RLock()) 支持并发读,但写锁会block住读和写,读多场景性能会好很多 3. 对计数使用CAS操作(Go sync.atomic.CompareAndSwapInt64()) CAS由CPU原子指令实现.是一种无锁结构,由于消耗的CPU指令周期少,性能要优于锁结构 4. actor并发模型 Erlang和scala ak