一,Akka简介
二,Akka中的Actor模型
三,Akka实战案例之HelloActor
四,Akka实战案例之PingPong
五,案例基于 Actor 的聊天模型
正文
一,Akka简介
写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时。Akka 用 Scala 语言写成,同时提供了 Scala 和 JAVA 的开发接口
二,Akka中的Actor模型
Akka 处理并发的方法基于 Actor 模型。在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是 Actor 模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor 与 Actor 之间只能通过消息通信。
1)对并发模型进行了更高的抽象
2)异步、非阻塞、高性能的事件驱动编程模型
3)轻量级事件处理(1GB 内存可容纳百万级别个 Actor)
为什么 Actor 模型是一种处理并发问题的解决方案?
处理并发问题就是如何保证共享数据的一致性和正确性,为什么会有保持共享数据正确性这个问题呢?无非是我们的程序是多线程的,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。那么我们是不是可以转换一下思维,用单线程去处理相应的请求,但是又有人会问了,若是用单线程处理,那系统的性能又如何保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。
三,Akka实战案例之HelloActor
Akka的几个重要的角色:
MailBox:用来存储Actor之间收发的消息,是一个队列,所以收发到的消息是按顺序解析的。
Dispatcher Message:消息分发器,Actor之间发送的消息先发送到分发器,再发送到MailBox
Actor的Ref:用来发送消息。
实例:
package helloActor import akka.actor.{Actor, ActorRef, ActorSystem, Props} class HelloActor extends Actor { // 接收消息并处理 override def receive: Receive = { case "hellow" => print("en ,hellow") case _ =>{ context.stop(self) //停止自己的actorRef context.system.terminate()// 关闭ActorSystem } } } object HelloActor { private val nBfactory = ActorSystem("NBfactory") // 工厂 // 创建自己的ActorRef:通过Ref进行数据发送 private val helloActor: ActorRef = nBfactory.actorOf(Props[HelloActor], "helloActor") def main(args: Array[String]): Unit = { helloActor ! "hellow" helloActor ! "老王八" } }
四,Akka实战案例之PingPong
两个人进行pingPong进行数据发送。
第一个人的Actor:
package pingPangActor import akka.actor.Actor class FengActor extends Actor{ override def receive: Receive = { case "start" => print("峰峰说:I am ok") case "啪" =>{ println("峰峰:那必须滴!") Thread.sleep(1000) sender() ! "啪啪" // 向发送者发送数据 } } }
第二个人的Actor:
package pingPangActor import akka.actor.{Actor, ActorRef} class LongActor(fg: ActorRef) extends Actor{ override def receive: Receive = { case "start" =>{ print("龙龙:I am ok!") fg ! "啪" // 向ff发送数据 } case "啪啪" =>{ print("你真猛") Thread.sleep(1000) fg ! "啪" } } }
启动:
package pingPangActor import akka.actor.{ActorRef, ActorSystem, Props} object PingPangApp extends App{ // Actor工厂用来参数Ref private val pingPangActorSystem = ActorSystem("pingPangActorSystem") // 产生FF 的Ref private val ff: ActorRef = pingPangActorSystem.actorOf(Props[FengActor], "ff") // 参数ll 的Ref private val ll: ActorRef = pingPangActorSystem.actorOf(Props(new LongActor(ff)), "ll") ff ! "start" // 向自己的Mail发送start ll ! "start" // 向自己的Mail发送start }
五,案例基于 Actor 的聊天模型
如下实例:
智能机器人回复系统实现:
1) 创建一个 Server 端用于服务客户端发送过来的问题,并作处理并返回信息给客户
端!
2) 创建一个 Client 端,用于向服务端发送问题,并接收服务端发送过来的消息
server端:
package robot import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory class RobotServer extends Actor{ override def receive: Receive = { case "start" =>print("开始。。。") case ClientMessage(msg) => { println(s"收到客户端消息: $msg") msg match { case "你叫啥" => sender() ! ServerMessage("铁扇公主") case "你是男是女" => sender() ! ServerMessage("女") case "你又男票吗" => sender() ! ServerMessage("没有") case _ => sender() ! ServerMessage("What do you say") } } } } object RobotServer{ def main(args: Array[String]): Unit = { var host = "127.0.0.1" var port = 8808 val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin ) val server = ActorSystem("Server", config) val shanshan = server.actorOf(Props[RobotServer], "shanshan") shanshan ! "start" } }
client端:
package robot import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.io.StdIn class PeopleClient(host: String, port: Int) extends Actor{ var serverActorRef: ActorSelection = _ // 服务端的代理对象 override def preStart(): Unit = { serverActorRef = context.actorSelection(s"akka.tcp://[email protected]${host}:${port}/user/shanshan") } override def receive: Receive = { case "start" =>print("老王系列启动....") case msg: String=>{ // 把客户端输入的内容发送给 服务端(actorRef)--》服务端的mailbox中 -> 服务端的receive serverActorRef ! ClientMessage(msg) } case ServerMessage(msg) =>println(s"搜到服务端消息:$msg") } } object PeopleClient{ def main(args: Array[String]): Unit = { val host = "127.0.0.1" val port = 8809 val serverHost = "127.0.0.1" val serverPort = 8808 var config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin ) val client = ActorSystem("client", config) // 创建dispatch | mailbox val actorRef = client.actorOf(Props(new PeopleClient(serverHost, serverPort.toInt)), "001") // 自己给自己发送了一条消息 到自己的mailbox => receive actorRef ! "start" while (true){ val question = StdIn.readLine() // 同步阻塞的, shit actorRef ! question // mailbox -> receive } } }
参数序列化:
package robot // 服务端发送客户端的消息格式 case class ServerMessage(msg: String) // 客户端发送到服务单的消息格式 实现了serilize接口,可以用于网络传输 case class ClientMessage(msg: String)
原文地址:https://www.cnblogs.com/tashanzhishi/p/10961633.html