大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目

第十五章 客户信息管理系统15.1 项目的开发流程15.2 项目的需求分析15.3 项目的界面15.4 项目的设计-程序框架图15.5 项目的功能实现15.5.1 完成 Customer 类15.5.2 完成显示主菜单和退出软件功能15.5.3 完成显示客户列表的功能15.5.4 完成添加客户的功能15.5.5 完成删除客户的功能15.5.6 完善退出确认功能15.5.7 完善删除确认功能15.5.8 完成修改客户的功能第十六章 并发编程模型 Akka16.1 Akka 的介绍16.2 Actor 模型用于解决什么问题16.3 Akka 中 Actor 模型详解16.4 Actor 模型工作机制说明16.5 Actor 模型应用实例16.5.1 Actor 自我通讯16.5.2 Actor 之间通讯16.7 Akka 网络编程16.7.1 Akka 网络编程基本介绍16.7.2 协议(tcp/ip)16.7.3 OSI 与 Tcp/ip 参考模型16.7.4 ip 地址16.7.5 端口(port)16.8 Akka 网络编程-小黄鸡客服案例16.8.1 需求分析 + 界面设计16.8.2 程序框架图16.8.3 功能实现16.9 Akka 网络编程-Spark Master Worker 进程通讯项目16.9.1 项目意义16.9.2 项目需求分析16.9.3 项目界面设计16.9.4 实现功能 1-Worker 完成注册16.9.5 实现功能 2-Worker 定时发送心跳16.9.6 实现功能 3-Master 启动定时任务,定时检测注册的 Worker16.9.7 实现功能 4-Master,Worker 的启动参数运行时指定


第十五章 客户信息管理系统

15.1 项目的开发流程

15.2 项目的需求分析

  模拟实现基于文本界面的《客户信息管理软件》。
  该软件 scala 能够实现对客户对象的插入、修改、删除、显示、查询(用 ArrayBuffer 或者 ListBuffer 实现),并能够打印客户明细表。

15.3 项目的界面

主界面


添加客户

修改客户

删除客户

客户列表

15.4 项目的设计-程序框架图

程序框架图:设计系统有多少个文件,以及文件之间的调用关系,可以帮助程序员实现模块的设计(清晰),便于程序员之间对项目交流分析。【业务优化,设计方案】

15.5 项目的功能实现

15.5.1 完成 Customer 类

根据需求文档或者页面,写出 Customer 类
Customer.scala

package com.atguigu.chapter15.customercrm.bean

class Customer {  // 属性  var id: Int = _  var name: String = _  var gender: Char = _  var age: Short = _  var tel: String = _  var email: String = _

  // 辅助构造器  def this(id: Int, name: String, gender: Char, age: Short, tel: String, email: String) {    this    this.id = id    this.name = name    this.gender = gender    this.age = age    this.tel = tel    this.email = email  }}

15.5.2 完成显示主菜单和退出软件功能

CustomerView.scala 功能分析:
  1. 将主菜单的显示放入到 while
  2. 用户可以根据输入,选择自己的操作
  3. 如果输入5退出
CustomerView.scala

package com.atguigu.chapter15.customercrm.view

import scala.io.StdIn

class CustomerView {  // 定义一个循环变量,控制是否退出  var loop = true  // 定义一个 key 用于接收用户输入的选项  var key = ‘ ‘

  def mainMenu(): Unit = {    do {      println("-----------------客户信息管理软件-----------------")      println("                  1 添 加 客 户                  ")      println("                  2 修 改 客 户                  ")      println("                  3 删 除 客 户                  ")      println("                  4 客 户 列 表                  ")      println("                  5 退       出                  ")      println("                  请选择(1-5):                   ")      key = StdIn.readChar()      key match {        case ‘1‘ => println("添 加 客 户")        case ‘2‘ => println("修 改 客 户")        case ‘3‘ => println("删 除 客 户")        case ‘4‘ => println("客 户 列 表")        case ‘5‘ => this.loop = false      }    } while (loop)    println("你退出了系统...")  }}

示例代码如下:

package com.atguigu.chapter15.customercrm.app

import com.atguigu.chapter15.customercrm.view.CustomerView

object CustomerCrm {  def main(args: Array[String]): Unit = {    new CustomerView().mainMenu()  }}

15.5.3 完成显示客户列表的功能

CustomerView.scala 功能分析:
  1. 接收4,显示客户列表
  2. 调用 CustomerService 的方法 list
  3. 需要一个 CustomerService 对象(属性)
CustomerService.sacla 功能分析:
  1. 编写一个方法 list,返回当前系统有哪些客户
  2. 客户放在哪?--> 内存 --> 可变集合 --> ArrayBuffer
1、在 Customer.sacla 中重写 toString 方法

  override def toString: String = {    this.id + "\t\t" + this.name + "\t\t" + this.gender + "\t\t" + this.age + "\t\t" + this.tel + "\t\t" + this.email  }

2、在 CustomerService.scala 中编写一个方法 list,返回当前系统有哪些客户

class CustomerService {  // customers 是存放客户用的,为了方便测试,我们先进行初始化  val customers = ArrayBuffer(new Customer(1, "tom", ‘男‘, 20, "110", "[email protected]"))

  // 查询客户列表的方法  def list(): ArrayBuffer[Customer] = {    this.customers  }}

3、在 CustomerView.scala 中 调用 CustomerService 的方法 list

  val customerService = new CustomerService()

  /*---------------------------客户列表---------------------------编号  姓名       性别    年龄   电话            邮箱 1    张三       男      30     010-56253825   [email protected] 2    李四       女      23     010-56253825    [email protected] 3    王芳       女      26     010-56253825   [email protected]-------------------------客户列表完成-------------------------   */  def list(): Unit = {    println()    println("---------------------------客户列表---------------------------")    println("编号\t\t姓名\t\t性别\t\t年龄\t\t电话\t\t邮箱")    // 遍历    // 调用 CustomerService 的方法 list    val customers = customerService.list()    for (customer <- customers) {      // 方式一:输出      // println(customer.id + "\t\t" + ...)      // 方式二:重写 Customer 的 toString 方法,返回信息,并且格式化      println(customer)    }    println("-------------------------客户列表完成-------------------------")  }

15.5.4 完成添加客户的功能

CustomerView.scala 功能分析:
  1. 接收客户的信息,并封装成对应的 Customer 对象
  2. 调用 CustomerService 的方法 add
CustomerService.sacla 功能分析:
  1. 编写一个方法 add,接收一个 Customer 对象
  2. 加入到 ArrayBuffer 中
  3. 规定:以添加客户是第几个作为它的 id
1、在 Customer.sacla 中添加一个新的 辅助构造器(没有id属性)

  // 辅助构造器(没有id属性)  def this(name: String, gender: Char, age: Short, tel: String, email: String) {    this    this.name = name    this.gender = gender    this.age = age    this.tel = tel    this.email = email  }

2、在 CustomerService.scala 中编写一个方法 add,接收一个 Customer 对象,并设置 id 后再加入到 ArrayBuffer 中

  // 用于设置用户 id  var customerNum = 1

  // 添加客户的方法  def add(customer: Customer): Boolean = {    // 设置 id    customerNum += 1    customer.id = customerNum    // 加入到 ArrayBuffer 中    customers.append(customer)    true  }

3、在 CustomerView.scala 中 调用 CustomerService 的方法 add

  /*---------------------添加客户---------------------姓名:张三性别:男年龄:30电话:010-56253825邮箱:[email protected]---------------------添加完成---------------------   */  def add(): Unit = {    println()    println("---------------------添加客户---------------------")    println("姓名:")    val name = StdIn.readLine()    println("性别:")    val gender = StdIn.readChar()    println("年龄:")    val age = StdIn.readShort()    println("电话:")    val tel = StdIn.readLine()    println("邮箱:")    val email = StdIn.readLine()    // 封装对象    val customer = new Customer(name, gender, age, tel, email)    // 调用 CustomerService 的方法 add    customerService.add(customer)    println("---------------------添加完成---------------------")  }

15.5.5 完成删除客户的功能

CustomerView.scala 功能分析:
  1. 接收客户 id,准备删除
  2. 调用 CustomerService 的 del(id)
CustomerService.sacla 功能分析:
  1. 编写一个方法 del,接收一个 id,先去调用另一个方法 findIndexById,判断
  2. 编写一个方法 findIndexById(因为我们的 ArrayBuffer 索引和 id 并不是对应的)
  3. 如果发现有,则删除,如果没有就返回 false
1、在 CustomerService.scala 中编写一个方法 del,接收一个 id,先去调用另一个方法 findIndexById,判断

  // 先根据 id 查找 用户的 index  def findIndexById(id: Int): Int = {    // 先假定一个索引,默认 -1,如果找到就改成对应的,如果没有找到就返回 -1    var index = -1    // 遍历 ArrayBuffer    breakable {      for (i <- 0 until customers.length) {        if (customers(i).id == id) {          index = i          break()        }      }    }    index  }

  // 再根据 id 删除用户  def del(id: Int): Boolean = {    val index = findIndexById(id)    if (index != -1) {      customers.remove(index)      true    } else {      false    }  }

2、在 CustomerView.scala 中接收客户 id,调用 CustomerService 的 del(id)

  /*---------------------删除客户---------------------请选择待删除客户编号(-1退出):1确认是否删除(Y/N):y---------------------删除完成---------------------   */  def del(): Unit = {    println()    println("---------------------删除客户---------------------")    println("请选择待删除客户编号(-1退出):")    val id = StdIn.readInt()    if (id == -1) {      println("---------------------删除没有完成---------------------")      return    }    println("确认是否删除(Y/N):")    val choice = StdIn.readChar().toLower    if (choice == ‘y‘) {      if (customerService.del(id)) {        println("---------------------删除完成---------------------")        return      }    }    println("---------------------删除没有完成---------------------")  }

15.5.6 完善退出确认功能

功能说明:
  要求用户在退出时提示 "确认是否退出(Y/N):",用户必须输入y/n,否则循环提示。且输入为y时,退出系统;输入为n时,不退出系统。
1、在 CustomerView.scala 中定义一个方法 isOut,并修改 key 所对应的函数。

  // 要求用户在退出时提示"确认是否退出(Y/N):",用户必须输入y/n,否则循环提示。且输入为y时,退出系统;输入为n时,不退出系统。  def isOut(): Unit = {    println()    println("确认是否退出(Y/N):")    key = StdIn.readChar().toLower    key match {      case ‘y‘ => this.loop = false      case ‘n‘ => this.loop = true      case _ => isOut()    }  }

15.5.7 完善删除确认功能

功能说明:
  要求用户在删除确认时提示 "确认是否删除(Y/N):",用户必须输入y/n,否则循环提示。
1、在 CustomerView.scala 中,修改 del() 方法即可

  /*---------------------删除客户---------------------请选择待删除客户编号(-1退出):1确认是否删除(Y/N):y---------------------删除完成---------------------   */  def del(): Unit = {    println()    println("---------------------删除客户---------------------")    println("请选择待删除客户编号(-1退出):")    val id = StdIn.readInt()    if (id == -1) {      println("---------------------删除没有完成---------------------")      return    }    println("确认是否删除(Y/N):")    var choice = ‘ ‘

    // 要求用户在删除确认时提示 "确认是否删除(Y/N):",用户必须输入y/n,否则循环提示。    breakable {      do {        choice = StdIn.readChar().toLower        if (choice == ‘y‘ || choice == ‘n‘) {          break()        }        println("确认是否删除(Y/N):")      } while (true)    }

    if (choice == ‘y‘) {      if (customerService.del(id)) {        println("---------------------删除完成---------------------")        return      }    }    println("---------------------删除没有完成---------------------")  }

15.5.8 完成修改客户的功能

1、在 CustomerService.scala 中定义一个方法根据 id 修改用户(更新用户)的方法 和 // 根据 id 查找用户信息 的方法

  // 根据 id 查找用户信息  def findCustomerById(id: Int): Customer = {    val index = findIndexById(id)    if (index != -1) {      customers(index)    } else {      null    }  }

  // 根据 id 修改用户(更新用户)  def update(id: Int, customer: Customer): Boolean = {    val index = findIndexById(id)    customers.update(index, customer)    true  }

2、在 CustomerView.scala 中定义一个方法 update

  /*---------------------修改客户---------------------请选择待修改客户编号(-1退出):1姓名(张三):<直接回车表示不修改>性别(男):年龄(30):电话(010-56253825):邮箱([email protected]):[email protected]---------------------修改完成---------------------   */  def update(): Unit = {    println()    println("---------------------修改客户---------------------")    println("请选择待修改客户编号(-1退出):")    var id = StdIn.readInt()    if (id == -1) {      println("---------------------修改没有完成---------------------")      return    }    val customer = customerService.findCustomerById(id)    if (customer == null) {      println("---------------------修改没有完成---------------------")      return    }

    var name = customer.name    print(s"姓名(${name}):")    name = StdIn.readLine()    if (name.length == 0) name = customer.name

    var gender = customer.gender    print(s"性别(${gender}):")    gender = StdIn.readChar()

    var age = customer.age    print(s"年龄(${age}):")    age = StdIn.readShort()

    var tel = customer.tel    print(s"电话(${tel}):")    tel = StdIn.readLine()    if (tel.length == 0) tel = customer.tel

    var email = customer.email    print(s"邮箱(${email}):")    email = StdIn.readLine()    if (email.length == 0) email = customer.email

    // 封装对象    val newCustomer = new Customer(id, name, gender, age, tel, email)    // 调用 CustomerService 的方法 update    customerService.update(id, newCustomer)    println("---------------------修改完成---------------------")  }

第十六章 并发编程模型 Akka

16.1 Akka 的介绍

16.2 Actor 模型用于解决什么问题

16.3 Akka 中 Actor 模型详解

Actor 模型及其说明


对上图的详解如下:

16.4 Actor 模型工作机制说明


Actor模型工作机制说明(对照工作机制示意图理解):

Actor 间传递消息机制(对照工作机制示意图理解)

16.5 Actor 模型应用实例

16.5.1 Actor 自我通讯

应用实例需求

代码实现
SayHelloActor 项目步骤:
1) 创建项目 Mew -> New Project -> 选择 Maven
2) 给项目命名


3) 下一步 -> Finish
4) 会生成 pom.xml 文件(maven 文件, 项目包的依赖)

5) 将下面的 maven 配置模板拷贝到 pom.xml 文件中,新的 pom.xml 文件文件内容如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.akka</groupId>    <artifactId>SayHelloActor</artifactId>    <version>1.0-SNAPSHOT</version>    <!-- 定义一下常量 -->    <properties>        <encoding>UTF-8</encoding>        <scala.version>2.11.8</scala.version>        <scala.compat.version>2.11</scala.compat.version>        <akka.version>2.4.17</akka.version>    </properties>

    <dependencies>        <!-- 添加scala的依赖 -->        <dependency>            <groupId>org.scala-lang</groupId>            <artifactId>scala-library</artifactId>            <version>${scala.version}</version>        </dependency>

        <!-- 添加akka的actor依赖 -->        <dependency>            <groupId>com.typesafe.akka</groupId>            <artifactId>akka-actor_${scala.compat.version}</artifactId>            <version>${akka.version}</version>        </dependency>

        <!-- 多进程之间的Actor通信 -->        <dependency>            <groupId>com.typesafe.akka</groupId>            <artifactId>akka-remote_${scala.compat.version}</artifactId>            <version>${akka.version}</version>        </dependency>    </dependencies>

    <!-- 指定插件-->    <build>        <!-- 指定源码包和测试包的位置 -->        <sourceDirectory>src/main/scala</sourceDirectory>        <testSourceDirectory>src/test/scala</testSourceDirectory>        <plugins>            <!-- 指定编译scala的插件 -->            <plugin>                <groupId>net.alchim31.maven</groupId>                <artifactId>scala-maven-plugin</artifactId>                <version>3.2.2</version>                <executions>                    <execution>                        <goals>                            <goal>compile</goal>                            <goal>testCompile</goal>                        </goals>                        <configuration>                            <args>                                <arg>-dependencyfile</arg>                                <arg>${project.build.directory}/.scala_dependencies</arg>                            </args>                        </configuration>                    </execution>                </executions>            </plugin>

            <!-- maven打包的插件 -->            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-shade-plugin</artifactId>                <version>2.4.3</version>                <executions>                    <execution>                        <phase>package</phase>                        <goals>                            <goal>shade</goal>                        </goals>                        <configuration>                            <filters>                                <filter>                                    <artifact>*:*</artifact>                                    <excludes>                                        <exclude>META-INF/*.SF</exclude>                                        <exclude>META-INF/*.DSA</exclude>                                        <exclude>META-INF/*.RSA</exclude>                                    </excludes>                                </filter>                            </filters>                            <transformers>                                <transformer                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">                                    <resource>reference.conf</resource>                                </transformer>                                <!-- 指定main方法 -->                                <transformer                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                                    <mainClass>xxx</mainClass>                                </transformer>                            </transformers>                        </configuration>                    </execution>                </executions>            </plugin>        </plugins>    </build></project>

6) 因为按照配置模板的内容 "指定源码包和测试包的位置" 的部分:

    <sourceDirectory>src/main/scala</sourceDirectory>    <testSourceDirectory>src/test/scala</testSourceDirectory>

我们需要创建对应的 scala 目录,并 mark 为 Sources Root
7) 当修改后,第一次速度比较慢,因为 maven 需要 resolve 包的依赖,要下载相关的包。
注意:需要如图勾选,update snapshots,而且不需要联网,如果使用 maven 解决依赖后,仍然 pom.xml 有误,则只需要重启下 idea, 或者动一下 pom.xml 文件(不用改),重新保存即可。


8) 代码实现:

package com.atguigu.akka.actor

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

// 1. 当我们继承 Actor 后,就是一个 Actor,需要重写该 Actor 的核心方法 receiveclass SayHelloActor extends Actor {  // 循环的接收消息  // 1. receive方法,会被该 Actor 的 MailBox(实现了 Runnable 接口)调用  // 2. 当该 Actor 的 MailBox 接收到消息,就会调用 receive 方法  // 3. Receive 的底层:type Receive = PartialFunction[Any, Unit]  override def receive: Receive = {    // 接受消息并处理,如果接收到 exit,就退出    case "hello" => println("发送:hello\t\t回应:hello too:)")    case "ok" => println("发送:ok\t\t\t回应:ok too:)")    case "exit" => {      println("接收到exit~指令,退出系统...")      context.stop(self) // 停止自己的 ActorRef      context.system.terminate() // 关闭 ActorSystem    }  }}

object SayHelloActor {  // 1. 先创建一个 ActorSystem,专门用于创建 Actor  private val actoryFactory = ActorSystem("actoryFactory")

  // 2. 创建一个 Actor 的同时,返回 Actor 的 ActorRef  private val sayHelloActorRef: ActorRef = actoryFactory.actorOf(Props[SayHelloActor], "sayHelloActor")  // (1) Props[SayHelloActor] 创建了一个 SayHelloActor 实例,这里使用到了反射  // (2) "sayHelloActor" 是 Actor 的名字  // (3) sayHelloActorRef: ActorRef  =>是 Props[SayHelloActor] 的引用  // (4) 创建的 SayHelloActor 实例被 ActorSystme 接管

  def main(args: Array[String]): Unit = {    // 给 SayHelloActor 发消息(邮箱)    sayHelloActorRef ! "hello"    sayHelloActorRef ! "ok"    sayHelloActorRef ! "ok~"    // 研究异步如何退出 ActorSystem    sayHelloActorRef ! "exit"  }

}

输出结果如下:

发送:hello        回应:hello too:)发送:ok            回应:ok too:)接收到exit~指令,退出系统...

9) 运行的效果

代码的示意图和小结


小结:
当程序执行 private val sayHelloActorRef: ActorRef = actoryFactory.actorOf(Props[SayHelloActor], "sayHelloActor") 会完成如下任务: [这是非常重要的方法]

  • 1、actorFactory 是 ActorSystem("actorFactory") 创建的。
  • 2、这里的 Props[SayHelloActor] 会使用反射机制,创建一个 SayHelloActor 对象,如果是 actorFactory.actorOf(Props(new SayHelloActor(其他代理对象的引用)), "sayHelloActor") 形式,就是使用 new 的方式创建一个 SayHelloActor 对象。注意:Props() 是小括号。
  • 3、会创建一个 SayHelloActor 对象的代理对象 sayHelloActorRef,使用 sayHelloActorRef 才能发送消息。
  • 4、会在底层创建 Dispather Message,是一个线程池,用于分发消息,消息是发送到对应的 Actor 的 MailBox。
  • 5、会在底层创建 SayHelloActor 的 MailBox 对象,该对象是一个队列,可接收 Dispatcher Message 发送的消息。
  • 6、MailBox 实现了 Runnable 接口,是一个线程,一直运行并调用 Actor 的 receive 方法,因此当Dispather 发送消息到 MailBox 时,Actor 在r eceive 方法就可以得到信息。
  • 7、SayHelloActorRef ! "hello" ,表示把 hello 消息发送到 SayHello Actor 的 Mailbox (通过Dispatcher Message 转发)。

16.5.2 Actor 之间通讯

应用实例需求

两个 Actor 的通讯机制原理图

代码实现
AActor.scala

package com.atguigu.akka.actors

import akka.actor.{Actor, ActorRef}

class AActor(bActorRef: ActorRef) extends Actor {  var count = 0  override def receive: Receive = {    case "start" => {      println("AActor 出招了,start ok")      bActorRef ! "我打"    }    case "我打" => {      count += 1      // 给 BActor 发出消息      // 这里需要持有 BActor 的引用(BActorRef)才可以      println(s"AActor(黄飞鸿) 厉害!看我佛山无影脚 第${count}脚")      Thread.sleep(1000)      bActorRef ! "我打" // 给 BActor 发出消息    }  }}

BActor.scala

package com.atguigu.akka.actors

import akka.actor.Actor

class BActor extends Actor {  var count = 0  override def receive: Receive = {    case "我打" => {      count += 1      println(s"BActor(乔峰) 挺猛 看我降龙十八掌 第${count}掌")      Thread.sleep(1000)      // 通过 sender() 方法,可以获取到发送消息的 Actor 的 ActorRef      sender() ! "我打"    }  }}

ActorApp.scala

package com.atguigu.akka.actors

import akka.actor.{ActorRef, ActorSystem, Props}

// 100招后,就退出object ActorApp extends App {  // 创建 ActorSystem  val actorfactory = ActorSystem("actorfactory")  // 先创建 BActor 的引用/代理  val bActorRef: ActorRef = actorfactory.actorOf(Props[BActor], "bActor")  // 创建 AActor 的引用时需要持有 BActor 的引用  val aActorRef: ActorRef = actorfactory.actorOf(Props(new AActor(bActorRef)), "aActor")

  // aActor 先出招  aActorRef ! "start"}

输出结果如下:

AActor 出招了,start okBActor(乔峰) 挺猛 看我降龙十八掌 第1掌AActor(黄飞鸿) 厉害!看我佛山无影脚 第1脚BActor(乔峰) 挺猛 看我降龙十八掌 第2掌AActor(黄飞鸿) 厉害!看我佛山无影脚 第2脚BActor(乔峰) 挺猛 看我降龙十八掌 第3掌AActor(黄飞鸿) 厉害!看我佛山无影脚 第3脚BActor(乔峰) 挺猛 看我降龙十八掌 第4掌AActor(黄飞鸿) 厉害!看我佛山无影脚 第4脚BActor(乔峰) 挺猛 看我降龙十八掌 第5掌AActor(黄飞鸿) 厉害!看我佛山无影脚 第5脚BActor(乔峰) 挺猛 看我降龙十八掌 第6掌AActor(黄飞鸿) 厉害!看我佛山无影脚 第6脚......

代码的小结

  • 1、两个 Actor 通讯机制和 Actor 自身发消息机制基本一样,只是要注意如下:
  • 2、如果 A Actor 在需要给 B Actor 发消息,则需要持有 B Actor 的 ActorRef,可以通过创建 A Actor 时,传入 B Actor 的代理对象(ActorRef)。
  • 3、当 B Actor 在 receive 方法中接收到消息,需要回复时,可以通过 sender() 获取到发送 Actor 的代理对象。

如何理解 Actor 的 receive 方法被调用?

  • 1、每个 Actor 对应 MailBox。
  • 2、MailBox 实现了 Runnable 接口,处于运行的状态。
  • 3、当有消息到达 MailBox,就会去调用 Actor 的 receive 方法,即将消息推送给 receive 方法。

16.7 Akka 网络编程

看两个实际应用(socket/tcp/ip)
  QQ、迅雷、百度网盘客户端、新浪网站、京东商城、淘宝

16.7.1 Akka 网络编程基本介绍

16.7.2 协议(tcp/ip)

  TCP/IP(Transmission Control Protocol/Internet Protocol)的简写,中文译名为传输控制协议/因特网互联协议,又叫网络通讯协议,这个协议是Internet 最基本的协议、是 Internet 国际互联网络的基础,简单地说,就是由网络层的IP协议和传输层的TCP协议组成的。
  TCP/IP 3本圣经级别书籍:xxx

16.7.3 OSI 与 Tcp/ip 参考模型

16.7.4 ip 地址

  概述:每个 internet 上的主机和路由器都有一个 ip 地址,它包括网络号和主机号,ip 地址有 ipv4(32位) 或者 ipv6(128位),可以通过 ipconfig(ifconfig) 来查看。
  一个小技巧:网络不通时,如何确定是哪一个路由(ip地址)出现问题?答:使用 tracert 指令。演示如下:

16.7.5 端口(port)

  我们这里所指的端口不是指物理意义上的端口,而是特指TCP/IP协议中的端口,是逻辑意义上的端口。如果把 IP 地址比作一间房子,端口就是出入这间房子的门。真正的房子只有几个门,但是一个 IP 地址的端口 可以有65535(即:256×256-1)个之多!端口是通过端口号来标记的。
端口(port)-分类


端口(port)-使用注意

socket 编程中客户端和服务器的网络分布

16.8 Akka 网络编程-小黄鸡客服案例

16.8.1 需求分析 + 界面设计

需求分析
  1、服务端进行监听(9999)
  2、客户端可以通过键盘输入,发送咨询问题给小黄鸡客服(服务端)
  3、小黄鸡(服务端)回答客户的问题

界面设计
服务端:


客户端:

16.8.2 程序框架图

16.8.3 功能实现

代码结构:


示例代码如下:
YellowChickenServer.scala

package com.atguigu.akka.yellowchicken.server

import akka.actor.{Actor, ActorRef, ActorSystem, Props}import com.atguigu.akka.yellowchicken.common.{ClientMessage, ServerMessage}import com.typesafe.config.ConfigFactory

class YellowChickenServer extends Actor {  override def receive: Receive = {    case "start" => println("start 小黄鸡客服开始工作了...")    // 如果接收到了服务端的发来的消息,即 ClientMessage    case ClientMessage(mes) => {      println("客户咨询的问题是:" + mes)      mes match {        // 使用 match case 匹配(模糊匹配)        case "大数据学费" => sender() ! ServerMessage("20000 RMB")        case "学校地址" => sender() ! ServerMessage("北京市朝阳区青年路大悦城")        case "学习什么技术" => sender() ! ServerMessage("大数据 前端 Python")        case _ => sender() ! ServerMessage("你说的啥子:)")      }    }  }}

// 主程序入口object YellowChickenServerApp extends App {  val host = "127.0.0.1" // 服务端ip地址  val port = 9999 // 端口  // 创建 config 对象,指定协议类型、监听的ip和端口  val config = ConfigFactory.parseString(    s"""       |akka.actor.provider="akka.remote.RemoteActorRefProvider"       |akka.remote.netty.tcp.hostname=$host       |akka.remote.netty.tcp.port=$port        """.stripMargin)  // 创建 ActorSystem  val serverActorSystem = ActorSystem("Server", config)  // 创建 YellowChickenServer 的 Actor 和 ActorRef  val yellowChickenServerActorRef: ActorRef = serverActorSystem.actorOf(Props[YellowChickenServer], "YellowChickenServer-01")

  // 启动服务端  yellowChickenServerActorRef ! "start"}

CustomerActor.scala

package com.atguigu.akka.yellowchicken.client

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}import com.atguigu.akka.yellowchicken.common.{ClientMessage, ServerMessage}import com.typesafe.config.ConfigFactory

import scala.io.StdIn

class CustomerActor(serverHost: String, serverPort: Int) extends Actor {

  // 定义一个 YellowChickenServerRef  var serverActorRef: ActorSelection = _

  // 在 Actor 中有一个方法 preStart 方法,它会在 Actor 运行前执行  // 在 Akka 开发中,通常将初始化的工作,放在 preStart 方法中  override def preStart(): Unit = {    this.serverActorRef = context.actorSelection(s"akka.tcp://[email protected]${serverHost}:${serverPort}/user/YellowChickenServer-01")    println("this.serverActorRefer=" + this.serverActorRef)  }

  override def receive: Receive = {    case "start" => println("start 客户端运行,可以咨询问题")    case mes: String => {      // 发给服务端      // serverActorRef ! mes // 不应该发送字符串,应该包装一把,应该发送一个(样例)对象(即协议)      serverActorRef ! ClientMessage(mes) // 此时发送的是一个对象,该样例类默认实现了序列化 和 apply 方法    }    // 如果接受到了服务器端的消息    case ServerMessage(mes) => {      println(s"收到小黄鸡客服(Server)消息:$mes")    }  }}

// 主程序入口object CustomerActorApp extends App {  val (host, port, serverHost, serverPort) = ("127.0.0.1", 9990, "127.0.0.1", 9999)  val config = ConfigFactory.parseString(    s"""       |akka.actor.provider="akka.remote.RemoteActorRefProvider"       |akka.remote.netty.tcp.hostname=$host       |akka.remote.netty.tcp.port=$port        """.stripMargin)

  // 创建 ActorSystem  val clientActorSystem = ActorSystem("Client", config)  // 创建 CustomerActor 的 Actor 和 ActorRef  val clientActorRef: ActorRef = clientActorSystem.actorOf(Props(new CustomerActor(serverHost, serverPort)), "CustomerActor-01")

  // 启动客户端  clientActorRef ! "start"

  // 客户端发送消息  while (true) {    val mes = StdIn.readLine()    clientActorRef ! mes  }}

MessageProtocol.scala

package com.atguigu.akka.yellowchicken.common

// 使用样例类来构建协议

// 1、客户端发送服务端的协议(序列化对象)case class ClientMessage(mes: String)  // 回顾:样例类的构造器中的每一个参数都默认为 val ,即只可读。

// 2、服务器端发送给客户端的协议case class ServerMessage(mes: String)

16.9 Akka 网络编程-Spark Master Worker 进程通讯项目

16.9.1 项目意义

  1、深入理解 Spark 的 Master 和 Worker 的通讯机制。
  2、为了方便同学们看 Spark 的底层源码,命名的方式和源码保持一致(如:通讯消息类命名就是一样的)。
  3、加深对主从服务心跳检测机制(HeartBeat)的理解,方便以后 spark 源码二次开发。

16.9.2 项目需求分析

16.9.3 项目界面设计

  我们主要是通过应用实例,来剖析 Spark 的 Master 和 Worker 的通讯机制,因此功能比较简洁,设计的界面如下。看后面演示即可。

16.9.4 实现功能 1-Worker 完成注册

功能要求: Worker 注册到 Master,Master 完成注册,并回复 Worker 注册成功。


代码结构:

示例代码如下:
MasterActor.scala

package com.atguigu.akka.sparkmasterworker.master

import akka.actor.{Actor, ActorSystem, Props}import com.atguigu.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo, WorkerInfo}import com.typesafe.config.ConfigFactory

import scala.collection.mutable

class MasterActor extends Actor {  // 定义一个 mutable.HashMap 属性,用于管理 Worker  val workers = mutable.HashMap[String, WorkerInfo]()

  override def receive: Receive = {    case "start" => println("Master服务器启动了...")    // 接收到 Worker 客户端注册的信息,保存进 HashMap    case RegisterWorkerInfo(id, cpu, ram) => {      if (!workers.contains(id)) {        // 创建 WorkerInfo        val workerInfo = new WorkerInfo(id, cpu, ram)        // 加入到 HashMap        workers += (id -> workerInfo)        println("服务器的Workers= " + workers)        // 回复客户端注册成功        sender() ! RegisteredWorkerInfo      }    }  }}

object MasterActorApp {  def main(args: Array[String]): Unit = {    val host = "127.0.0.1" // 服务端ip地址    val port = 10005 // 端口    // 创建 config 对象,指定协议类型、监听的ip和端口    val config = ConfigFactory.parseString(      s"""         |akka.actor.provider="akka.remote.RemoteActorRefProvider"         |akka.remote.netty.tcp.hostname=$host         |akka.remote.netty.tcp.port=$port        """.stripMargin)    // 先创建 ActorSystem    val masterActorSystem = ActorSystem("Master", config)    // 再创建 Master 的 Actor 和 ActorRef    val masterActorRef = masterActorSystem.actorOf(Props[MasterActor], "MasterActor-01")

    // 启动 Master    masterActorRef ! "start"  }}

WorkerActor.scala

package com.atguigu.akka.sparkmasterworker.worker

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}import com.atguigu.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo}import com.typesafe.config.ConfigFactory

class WorkerActor(serverHost: String, serverPort: Int) extends Actor {  // 定义一个 MasterActorRef  var masterActorProxy: ActorSelection = _

  // 定义 Worker 的编号  var id = java.util.UUID.randomUUID().toString

  // 在 Actor 中有一个方法 preStart 方法,它会在 Actor 运行前执行  // 在 Akka 开发中,通常将初始化的工作,放在 preStart 方法中  override def preStart(): Unit = {    this.masterActorProxy = context.actorSelection(s"akka.tcp://[email protected]${serverHost}:${serverPort}/user/MasterActor-01")    println("this.masterActorProxy=" + this.masterActorProxy)  }

  override def receive = {    case "start" => {      println("Worker客户端启动运行")      // 给服务器发送一个注册信息      masterActorProxy ! RegisterWorkerInfo(id, 16, 16 * 1024)    }    case RegisteredWorkerInfo => {      println("WorkedId= " + id + " 注册成功!")    }  }}

object WorkerActorApp {  def main(args: Array[String]): Unit = {    val (host, port, serverHost, serverPort) = ("127.0.0.1", 10001, "127.0.0.1", 10005)    val config = ConfigFactory.parseString(      s"""         |akka.actor.provider="akka.remote.RemoteActorRefProvider"         |akka.remote.netty.tcp.hostname=$host         |akka.remote.netty.tcp.port=$port        """.stripMargin)

    // 创建 ActorSystem    val workerActorSystem = ActorSystem("Worker", config)    // 创建 WorkerActor 的 Actor 和 ActorRef    val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new WorkerActor(serverHost, serverPort)), "WorkerActor-01")

    // 启动客户端    workerActorRef ! "start"  }}

MessageProtocol.scala

package com.atguigu.akka.sparkmasterworker.common

// 使用样例类来构建协议

// Worker 注册信息case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)

// 这个是 WorkerInfo,是保存在 Master 的 HashMap 中的,该 HashMap 用于管理 Worker// 将来这个 WorkerInfo 会扩展,比如 增加 Worker 上一次的心跳时间class WorkerInfo(val id: String, val cpu: Int, val ram: Int)

// 当 Worker 注册成功,服务器返回一个 RegisteredWorkerInfo 对象case object RegisteredWorkerInfo

16.9.5 实现功能 2-Worker 定时发送心跳

功能要求:Worker 定时发送心跳给 Master,Master 能够接收到,并更新 Worker 上一次心跳时间。


示例代码如下:
MessageProtocol.scala 中增加代码

package com.atguigu.akka.sparkmasterworker.common

// 使用样例类来构建协议

// Worker 注册信息case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)

// 这个是 WorkerInfo,是保存在 Master 的 HashMap 中的,该 HashMap 用于管理 Worker// 将来这个 WorkerInfo 会扩展,比如 增加 Worker 上一次的心跳时间class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {  // 新增属性:心跳时间  var lastHeartBeatTime: Long = _}

// 当 Worker 注册成功,服务器返回一个 RegisteredWorkerInfo 对象case object RegisteredWorkerInfo

// 每隔一定时间定时器发送给 Master 一个心跳case class HeartBeat(id: String)

// Worker 每隔一定时间定时器发送给 自己 一个消息case object SendHeartBeat

MasterActor.scala 中增加代码

    case HeartBeat(id) => {      // 更新对应的 Worker 的心跳时间      // 1、先从 Worker 中取出 WorkerInfo      val workerInfo = workers(id)      workerInfo.lastHeartBeatTime = System.currentTimeMillis()      println("Master更新了 " + id + " 的心跳时间 ")    }

WorkerActor.scala 中增加代码

      // 当客户端注册成功后,就定义一个定时器,每隔一定时间,发送 SendHeartBeat 给自己      import context.dispatcher      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)    case SendHeartBeat => {      println("WorkedId= " + id + " 给Master发送心跳")      masterActorProxy ! HeartBeat(id)    }

16.9.6 实现功能 3-Master 启动定时任务,定时检测注册的 Worker

功能要求:Master 启动定时任务,定时检测注册的 Worker 有哪些没有更新心跳,已经超时的 Worker,将其从 HashMap 中删除掉。


示例代码如下:
MessageProtocol.scala 中增加代码

// Master 给自己发送一个触发检查超时 Worker 的信息case object StartTimeOutWorker

// Master 给自己发消息,检测 Worker,对于心跳超时的case object RemoveTimeOutWorker

MasterActor.scala 中增加代码

    case "start" => {      println("Master服务器启动了...")      // Master 启动定时任务,定时检测注册的 Worker 有哪些没有更新心跳,已经超时的 Worker,将其从 HashMap 中删除掉。      self ! StartTimeOutWorker    }

    // 开启定时器,每隔一定时间检测是否有 Worker 的心跳超时    case StartTimeOutWorker => {      println("开启了定时检测Worker心跳的任务")      import context.dispatcher // 使用调度器时候必须导入dispatcher      context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)    }

    // 判断哪些 Worker 心跳超时(nowTime - lastHeartBeatTime),对已经超时的 Worker,将其从 HashMap 中删除掉。    case RemoveTimeOutWorker => {      // 首先获取所有 Workers 的所有 WorkerInfo      val workerInfos = workers.values      val nowTime = System.currentTimeMillis()      // 过滤出所有超时的 workerInfo 并删除即可      workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeatTime) > 6000)        .foreach(workerInfo => workers.remove(workerInfo.id))      println("当前有 " + workers.size + " 个Worker存活")    }

16.9.7 实现功能 4-Master,Worker 的启动参数运行时指定

功能要求:Master,Worker 的启动参数运行时指定,而不是固定写在程序中的。


MasterActor.scala 中修改代码

    if (args.length != 3) {      println("请输入参数 host port MasterActor的名字")      sys.exit()    }    val host = args(0)  // 服务端ip地址    val port = args(1)  // 端口    val masterName = args(2)  // MasterActor的名字    ......    // 再创建 Master 的 Actor 和 ActorRef    val masterActorRef = masterActorSystem.actorOf(Props[MasterActor], s"${masterName}")

WorkerActor.scala 中增修改代码

    if (args != 6) {      println("请输入参数 host port WorkerActor的名字 serverHost serverPort MasterActor的名字")    }

    val host = args(0)    val port = args(1)    val workerName = args(2)

    val serverHost = args(3)    val serverPort = args(4)    val masterName = args(5)    ......    // 创建 WorkerActor 的 Actor 和 ActorRef    val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new WorkerActor(serverHost, serverPort.toInt, masterName)), s"${workerName}")

Master 配置参数截图:


Worker 配置参数截图:

原文地址:https://www.cnblogs.com/chenmingjun/p/10660643.html

时间: 2024-10-13 01:52:39

大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目的相关文章

大数据技术之_16_Scala学习_08_数据结构(下)-集合操作+模式匹配

第十一章 数据结构(下)-集合操作11.1 集合元素的映射-map11.1.1 map 映射函数的操作11.1.2 高阶函数基本使用案例1+案例211.1.3 使用 map 映射函数来解决11.1.4 模拟实现 map 映射函数的机制11.1.5 课堂练习11.2 集合元素的扁平-flatMap11.3 集合元素的过滤-filter11.4 集合元素的化简-reduce11.5 集合元素的折叠-fold11.6 集合元素的扫描-scan11.7 集合的综合应用案例11.8 集合的合并-zip11

大数据技术之_09_Hive学习_复习与总结

一.知识梳理1.1.背景表结构1.1.1.order by1.1.2.sort by1.1.3.distribute by1.1.4.cluster by1.2.行转列.列转行(UDAF 与 UDTF)1.2.1.行转列1.2.2.列转行1.3.建表时的数组操作1.4.orc 存储1.5.Hive 分桶1.5.1.直接分桶1.5.2.在分区中分桶二.总结2.1 启动/停止hadoop集群.zookeeper集群.历史服务器2.2 访问hive的两种方式2.3 CentOS6x与Cenos7x命令

大数据技术之_20_Elasticsearch学习_01_概述 + 快速入门 + Java API 操作 + 创建、删除索引 + 新建、搜索、更新删除文档 + 条件查询 + 映射操作

一 概述1.1 什么是搜索?1.2 如果用数据库做搜索会怎么样?1.3 什么是全文检索和 Lucene?1.4 什么是 Elasticsearch?1.5 Elasticsearch 的适用场景1.6 Elasticsearch 的特点1.7 Elasticsearch 的核心概念1.7.1 近实时1.7.2 Cluster(集群)1.7.3 Node(节点)1.7.4 Index(索引 --> 数据库)1.7.5 Type(类型 --> 表)1.7.6 Document(文档 -->

大数据技术之_03_Hadoop学习_02_入门_Hadoop运行模式+【本地运行模式+伪分布式运行模式+完全分布式运行模式(开发重点)】+Hadoop编译源码(面试重点)+常见错误及解决方案

第4章 Hadoop运行模式4.1 本地运行模式4.1.1 官方Grep案例4.1.2 官方WordCount案例4.2 伪分布式运行模式4.2.1 启动HDFS并运行MapReduce程序4.2.2 启动YARN并运行MapReduce程序4.2.3 配置历史服务器4.2.4 配置日志的聚集4.2.5 配置文件说明4.3 完全分布式运行模式(开发重点)4.3.1 虚拟机准备4.3.2 编写集群分发脚本xsync4.3.3 集群配置4.3.4 集群单点启动4.3.5 SSH无密登录配置4.3.6

大数据技术之_08_Hive学习_01_Hive入门+Hive安装、配置和使用+Hive数据类型

第1章 Hive入门1.1 什么是Hive1.2 Hive的优缺点1.2.1 优点1.2.2 缺点1.3 Hive架构原理1.4 Hive和数据库比较1.4.1 查询语言1.4.2 数据存储位置1.4.3 数据更新1.4.4 索引1.4.5 执行1.4.6 执行延迟1.4.7 可扩展性1.4.8 数据规模第2章 Hive安装.配置和使用2.1 Hive安装地址2.2 Hive安装部署2.3 将本地文件导入Hive案例2.4 MySql安装2.4.1 安装包准备2.4.2 安装MySql服务器2.

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析 + Spark GraphX 概述、解析 + 计算模式 + Pregel API + 图算法参考代码 + PageRank 实例

第1章 Spark GraphX 概述1.1 什么是 Spark GraphX1.2 弹性分布式属性图1.3 运行图计算程序第2章 Spark GraphX 解析2.1 存储模式2.1.1 图存储模式2.1.2 GraphX 存储模式2.2 vertices.edges 以及 triplets2.2.1 vertices2.2.2 edges2.2.3 triplets2.3 图的构建2.3.1 构建图的方法2.3.2 构建图的过程2.4 计算模式2.4.1 BSP 计算模式2.4.2 图操作一

大数据技术之_08_Hive学习_04_压缩和存储(Hive高级)+ 企业级调优(Hive优化)

第8章 压缩和存储(Hive高级)8.1 Hadoop源码编译支持Snappy压缩8.1.1 资源准备8.1.2 jar包安装8.1.3 编译源码8.2 Hadoop压缩配置8.2.1 MR支持的压缩编码8.2.2 压缩参数配置8.3 开启Map输出阶段压缩8.4 开启Reduce输出阶段压缩8.5 文件存储格式8.5.1 列式存储和行式存储8.5.2 TextFile格式8.5.3 Orc格式8.5.4 Parquet格式8.5.5 主流文件存储格式对比实验8.6 存储和压缩结合8.6.1 修

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio