Spark机器学习· 实时机器学习

Spark机器学习

1 在线学习

模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。

2 Spark Streaming

3 MLib+Streaming应用

3.0 build.sbt

依赖Spark MLlib和Spark Streaming

name := "scala-spark-streaming-app"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1"
使用国内镜像仓库

~/.sbt/repositories

[repositories]
local
osc: http://maven.oschina.net/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots

3.1 生产消息

object StreamingProducer {

  def main(args: Array[String]) {

    val random = new Random()

    // Maximum number of events per second
    val MaxEvents = 6

    // Read the list of possible names
    val namesResource = this.getClass.getResourceAsStream("/names.csv")
    val names = scala.io.Source.fromInputStream(namesResource)
      .getLines()
      .toList
      .head
      .split(",")
      .toSeq

    // Generate a sequence of possible products
    val products = Seq(
      "iPhone Cover" -> 9.99,
      "Headphones" -> 5.49,
      "Samsung Galaxy Cover" -> 8.95,
      "iPad Cover" -> 7.49
    )

    /** Generate a number of random product events */
    def generateProductEvents(n: Int) = {
      (1 to n).map { i =>
        val (product, price) = products(random.nextInt(products.size))
        val user = random.shuffle(names).head
        (user, product, price)
      }
    }

    // create a network producer
    val listener = new ServerSocket(9999)
    println("Listening on port: 9999")

    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)

          while (true) {
            Thread.sleep(1000)
            val num = random.nextInt(MaxEvents)
            val productEvents = generateProductEvents(num)
            productEvents.foreach{ event =>
              out.write(event.productIterator.mkString(","))
              out.write("\n")
            }
            out.flush()
            println(s"Created $num events...")
          }
          socket.close()
        }
      }.start()
    }
  }
}
sbt run

Multiple main classes detected, select one to run:

 [1] MonitoringStreamingModel
 [2] SimpleStreamingApp
 [3] SimpleStreamingModel
 [4] StreamingAnalyticsApp
 [5] StreamingModelProducer
 [6] StreamingProducer
 [7] StreamingStateApp

Enter number: 6

3.2 打印消息

阅读全文请点击:http://click.aliyun.com/m/8713/

时间: 2024-10-10 14:43:08

Spark机器学习· 实时机器学习的相关文章

离线轻量级大数据平台Spark之MLib机器学习库概念学习

Mlib机器学习库 1.1机器学习概念 机器学习有很多定义,倾向于下面这个定义.机器学习是对能通过经验自动改进的计算机算法的研究.机器学习依赖数据经验并评估和优化算法所运行出的模型.机器学习算法尝试根据训练数据使得表示算法行为的数学目标最大化,并以此来进行预测或作出决定.机器学习问题分类为几种,包括分类.回归.聚类.所有的机器学习算法都经过一条流水线:提取训练数据的特征->基于特征向量训练模型->评估模型选择最佳.特征提取主要是提取训练数据中的数值特征,用于数学建模.机器学习一般有如下分类:

Spark MLBase分布式机器学习系统入门:以MLlib实现Kmeans聚类算法

1.什么是MLBaseMLBase是Spark生态圈的一部分,专注于机器学习,包含三个组件:MLlib.MLI.ML Optimizer. ML Optimizer: This layer aims to automating the task of ML pipeline construction. The optimizer solves a search problem over feature extractors and ML algorithms included inMLI and

实时机器学习是什么,面临哪些挑战?

最近能够随数据获取实时调整模型的实时机器学习,正在成为媒体技术领域的新"网红".曾经连续两年,都被FTI评为传媒业的重要技术趋势之一,与自然语言理解NLU.机器阅读理解MRC.音视频算法等共享金字塔顶端的荣光.        那实时机器学习到底是什么呢? 在开启扒皮模式之前,我们先来了解一下,实时机器学习究竟在哪些地方比传统的机器学习更强? 传统的机器学习(ML)正在媒体领域得到越来越多的应用,利用算法实现内容的"个性化推荐",已经成为主流媒体的标配. 但过去的算法

【Streaming】30分钟概览Spark Streaming 实时计算

本文主要介绍四个问题: 什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark Streaming相对其他实时计算框架该如何技术选型? 本文主要针对初学者,如果有不明白的概念可了解之前的博客内容. 1.什么是Spark Streaming? 与其他大数据框架Storm.Flink一样,Spark Streaming是基于Spark Core基础之上用于处理实时计算业务的框架.其实

Spark Streaming实时计算框架介绍

http://www.cnblogs.com/Leo_wl/p/3530464.html 随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在Spark上的实时计算框架,通过它提供的丰富的API.基于内存的高速执行引擎,用户可以结合流式.批处理和交互试查询应用.本文将详细介绍Spark Streaming实时计算框架的原理与特点.适用场景. Spar

【作业四】林轩田机器学习技法 + 机器学习公开新课学习个人体会

这次作业的coding任务量比较大,总的来说需要实现neural network, knn, kmeans三种模型. Q11~Q14为Neural Network的题目,我用单线程实现的,运行的时间比较长,因此把这几道题的正确答案记录如下: Q11: 6 Q12: 0.001 Q13: 0.01 Q14: 0.02 ≤ Eout ≤ 0.04 其中Q11和Q14的答案比较明显,Q12和Q13有两个答案比较接近(参考了讨论区的内容,最终也调出来了) neural network的代码实现思路如下:

【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streaming 消费 Kafka 中的消息,同时消费记录由 Zookeeper 集群统一管理,这样即使 Kafka 宕机重启后也能找到上次的消费记录继而进行消费.在这里 Spark Streaming 首先从 MySQL 读取规则然后进行 ETL 清洗并计算多个聚合指标,最后将结果的一部分存储到 Hbase

利用Spark mllab进行机器学习的基本操作(聚类,分类,回归分析)

Spark作为一种开源集群计算环境,具有分布式的快速数据处理能力.而Spark中的Mllib定义了各种各样用于机器学习的数据结构以及算法.Python具有Spark的API.需要注意的是,Spark中,所有数据的处理都是基于RDD的. 首先举一个聚类方面的详细应用例子Kmeans: 下面代码是一些基本步骤,包括外部数据,RDD预处理,训练模型,预测. #coding:utf-8 from numpy import array from math import sqrt from pyspark

spark 与 scikit-learn 机器学习流程组件设计哲学比较

概述:估算器,变换器和管道 - spark.ml 该spark.ml软件包旨在提供基于DataFrame构建的一组统一的高级API ,帮助用户创建和调整实用的机器学习流程.有关子包的指南,请参阅下面的算法指南部分 spark.ml,包括Pipelines API特有的功能转换器,集合等. 管道中的主要概念 Spark ML标准化了用于机器学习算法的API,使得将多种算法组合到单个管道或工作流中变得更加容易.本节介绍Spark ML API引入的关键概念,其中管道概念主要受scikit-learn