Scala编程实战

2018-12-11 08:01:32

1. 课程目标

1.1. 目标:熟练使用Scala编写程序

              

2. 项目概述

2.1. 需求

目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常多,比如前我们学过的Hadoop项目的RPC通信框架,但是Hadoop在设计之初就是为了运行长达数小时的批量分析而设计的,在某些极端的情况下,任务提交的延迟很高,所以Hadoop的RPC显得有些笨重。

Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模型实现,Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。

2.2. Akka简介

Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。

Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。

Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:

1.提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发

2.提供了异步非阻塞的、高性能的事件驱动编程模型

3.超级轻量级事件处理(每GB堆内存几百万Actor)

3. 项目实现

3.1. 架构图

                    

3.2. 重要类介绍

3.2.1. ActorSystem

在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。

3.2.2. Actor

在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。

1.preStart()方法:该方法在Actor对象构造器执行后执行,整个Actor生命周期中仅执行一次。

2.receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收发送消息,会被反复执行。

3.3. Master类

package cn.akkaTest.spark
import scala.concurrent.duration._
import akka.actor.{Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory

import scala.collection.mutable

/**
  * Master为整个集群中的主节点
  * Master继承了Actor
  */
class Master extends Actor{

  //保存WorkerID和Work信息的map
  val idToWorker = new mutable.HashMap[String, WorkerInfo]
  //保存所有Worker信息的Set
  val workers = new mutable.HashSet[WorkerInfo]
  //Worker超时时间
  val WORKER_TIMEOUT = 10 * 1000
  //重新receive方法

  //导入隐式转换,用于启动定时器
  import context.dispatcher

  //构造方法执行完执行一次
  override def preStart(): Unit = {
    //启动定时器,定时执行
    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
  }

  //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
  override def receive: Receive = {
    //Worker向Master发送的注册消息
    case RegisterWorker(id, workerHost, memory, cores) => {
      if(!idToWorker.contains(id)) {
        val worker = new WorkerInfo(id, workerHost, memory, cores)
        workers.add(worker)
        idToWorker(id) = worker
        sender ! RegisteredWorker("192.168.10.1")
      }
    }

    //Worker向Master发送的心跳消息
    case HeartBeat(workerId) => {
      val workerInfo = idToWorker(workerId)
      workerInfo.lastHeartbeat = System.currentTimeMillis()
    }

    //Master自己向自己发送的定期检查超时Worker的消息
    case CheckOfTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
      for(worker <- toRemove){
        workers -= worker
        idToWorker.remove(worker.id)
      }
      println("worker size: " + workers.size)
    }
  }
}

object Master {
  //程序执行入口
  def main(args: Array[String]) {

    val host = "192.168.10.1"
    val port = 8888
    //创建ActorSystem的必要参数
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem是单例的,用来创建Actor
    val actorSystem = ActorSystem.create("MasterActorSystem", config)
    //启动Actor,Master会被实例化,生命周期方法会被调用
    actorSystem.actorOf(Props[Master], "Master")
  }
}

3.4. Worker类

package cn.akkaTest.spark

import java.util.UUID
import scala.concurrent.duration._
import akka.actor.{ActorSelection, Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory

/**
  * Worker为整个集群的从节点
  * Worker继承了Actor
  */
class Worker extends Actor{

  //Worker端持有Master端的引用(代理对象)
  var master: ActorSelection = null
  //生成一个UUID,作为Worker的标识
  val id = UUID.randomUUID().toString

  //构造方法执行完执行一次
  override def preStart(): Unit = {
    //Worker向MasterActorSystem发送建立连接请求
    master = context.system.actorSelection("akka.tcp://[email protected]:8888/user/Master")
    //Worker向Master发送注册消息
    master ! RegisterWorker(id, "192.168.10.1", 10240, 8)
  }

  //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
  override def receive: Receive = {
    //Master向Worker的反馈信息
    case RegisteredWorker(masterUrl) => {
      import context.dispatcher
      //启动定时任务,向Master发送心跳
      context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
    }

    case SendHeartBeat => {
      println("worker send heartbeat")
      master ! HeartBeat(id)
    }
  }
}

object Worker {
  def main(args: Array[String]) {
    val clientPort = 2552
    //创建WorkerActorSystem的必要参数
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.port = $clientPort
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("WorkerActorSystem", config)
    //启动Actor,Master会被实例化,生命周期方法会被调用
    actorSystem.actorOf(Props[Worker], "Worker")
  }
}

  

原文地址:https://www.cnblogs.com/wanfeng1937/p/10100261.html

时间: 2024-08-03 14:24:20

Scala编程实战的相关文章

03.Scala编程实战

Scala编程实战 1.    课程目标 1.1.  目标:使用Akka实现一个简易版的spark通信框架 2.    项目概述 2.1.   需求 Hivesql----------> select count(*) from user----->整个表只有1条数据 Map 0%     reduce 0% Map 10%    reduce 0% Map 20%    reduce 0% 目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常多,比如前我们学过的Hadoop项目

Scala编程实战pdf

下载地址:网盘下载 学习Scala语言,不仅仅意味着熟悉新的API,更重要的是一种思维方式的转变.从原有的面向对象编程(OO)到函数式编程(FP)的思想.本书面向实际的使用场景,提供了大量的Scala实例,同时,也给出底层的原理和相关的参考.对于Scala新手来说这是一本**不错的入门书,对于老手来说也是一本夯实基础,检视自己所学知识的好书. Alvin Alexander走上软件开发之路比较曲折.虽然他从得克萨斯州的A&M大学拿到了航空工程学学位,但他真正想做的却是打棒球.成为见习工程师时,他

Scala 深入浅出实战经典 第67讲:Scala并发编程匿名Actor、消息传递、偏函数解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/LwsfuGIsWEk/优酷:http://v.youku.com/v_show/id_

大数据Spark蘑菇云前传第15课:Scala类型参数编程实战及Spark源码鉴赏(学习笔记)

前传第15课:Scala类型参数编程实战及Spark源码鉴赏 本課課程: Spark源码中的Scala类型系統的使用 Scala类型系統编程操作实战 Spark源码中的Scala类型系統的使用 classOf[RDD[_]] 這個也是类型系統 這里的意思是說 B 這種類型必需至少是 A 這樣類型 Ordering Scala类型系統编程操作实战 作為類型系統最大的就可以對類型進行限制,在Scala 中的類型系統,他本身也作為對象.e.g. 我們可以建立 Person 這個類,現在可以建立一個什麼

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析

王家林亲授<DT大数据梦工厂>大数据实战视频“Scala深入浅出实战经典”视频.音频和PPT下载!第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析百度云:http://pan.baidu.com/s/1pJ5jzHx腾讯微云:http://url.cn/aSawrm360云盘:http://yunpan.cn/cctL3QYACaVNa  访问密码 c0fb 信息来源于 DT大数据梦工厂微信公众账号:DT_Spark

Scala 深入浅出实战经典 第68讲:Scala并发编程原生线程Actor、Cass Class下的消息传递和偏函数实战解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载: 百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/mm3eDHk3T5o/优酷:http://v.youku.com/v_show/id

Spark2.0从入门到精通:Scala编程、大数据开发、上百个实战案例、内核源码深度剖析视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

scala界面GUI编程实战初步了解

示例代码: import scala.swing._ //SimpleSwingApplication继承自SwingApplication类(此类中有main方法,因此可以运行显示界面) object Hello_GUI extends SimpleSwingApplication { def top = new MainFrame{ //顶级容器 title = "Hello GUI" contents = new Button{ text = "Scala =>