Structured Streaming 实战案例 读取Scoker

1.1.1.读取Socket数据

●准备工作

nc -lk 9999

hadoop spark sqoop hadoop spark hive hadoop

●代码演示:

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.接收数据
    val dataDF: DataFrame = spark.readStream
      .option("host", "node01")
      .option("port", 9999)
      .format("socket")
      .load()
    //3.处理数据
    import spark.implicits._
    val dataDS: Dataset[String] = dataDF.as[String]
    val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
    val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
    //result.show()
    //Queries with streaming sources must be executed with writeStream.start();
    result.writeStream
      .format("console")//往控制台写
      .outputMode("complete")//每次将所有的数据写出
      .trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快
      .option("checkpointLocation","./810")//设置checkpoint目录,用来做合并
      .start()//开启
      .awaitTermination()//等待停止
  }
}

32

1

import org.apache.spark.SparkContext

2

import org.apache.spark.sql.streaming.Trigger

3

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

4


5

object WordCount {

6

  def main(args: Array[String]): Unit = {

7

    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet

8

    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()

9

    val sc: SparkContext = spark.sparkContext

10

    sc.setLogLevel("WARN")

11

    //2.接收数据

12

    val dataDF: DataFrame = spark.readStream

13

      .option("host", "node01")

14

      .option("port", 9999)

15

      .format("socket")

16

      .load()

17

    //3.处理数据

18

    import spark.implicits._

19

    val dataDS: Dataset[String] = dataDF.as[String]

20

    val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))

21

    val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)

22

    //result.show()

23

    //Queries with streaming sources must be executed with writeStream.start();

24

    result.writeStream

25

      .format("console")//往控制台写

26

      .outputMode("complete")//每次将所有的数据写出

27

      .trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快

28

      .option("checkpointLocation","./810")//设置checkpoint目录,用来做合并

29

      .start()//开启

30

      .awaitTermination()//等待停止

31

  }

32

}

代码截图:

原文地址:https://www.cnblogs.com/TiePiHeTao/p/aae3b5d9885e8730da014172165606e1.html

时间: 2024-10-04 07:05:33

Structured Streaming 实战案例 读取Scoker的相关文章

Structured Streaming 实战案例 读取文本数据

1.1.1.读取文本数据 spark应用可以监听某一个目录,而web服务在这个目录上实时产生日志文件,这样对于spark应用来说,日志文件就是实时数据 Structured Streaming支持的文件类型有text,csv,json,parquet ●准备工作 在people.json文件输入如下数据: {"name":"json","age":23,"hobby":"running"} {"n

第93课:Spark Streaming updateStateByKey案例实战和内幕源码解密

本节课程主要分二个部分: 一.Spark Streaming updateStateByKey案例实战二.Spark Streaming updateStateByKey源码解密 第一部分: updateStateByKey的主要功能是随着时间的流逝,在Spark Streaming中可以为每一个可以通过CheckPoint来维护一份state状态,通过更新函数对该key的状态不断更新:对每一个新批次的数据(batch)而言,Spark Streaming通过使用updateStateByKey

第93讲:Spark Streaming updateStateByKey案例实战和内幕源码

本节课程主要分二个部分: 一.Spark Streaming updateStateByKey案例实战 二.Spark Streaming updateStateByKey源码解密 第一部分: updateStateByKey它的主要功能是随着时间的流逝,在Spark Streaming中可以为每一个key可以通过CheckPoint来维护一份state状态,通过更新函数对该key的状态不断更新:在更新的时候,对每一个新批次的数据(batch)而言,Spark Streaming通过使用upda

《Web渗透技术及实战案例解析》pdf

下载地址:网盘下载 内容简介 编辑 本书从Web渗透的专业角度,结合网络安全中的实际案例,图文并茂地再现Web渗透的精彩过程.本书共分7章,由浅入深地介绍和分析了目前网络流行的Web渗透攻击方法和手段,并结合作者多年的网络安全实践经验给出了相对应的安全防范措施,对一些经典案例还给出了经验总结和技巧,通过阅读本书可以快速掌握目前Web渗透的主流技术.本书最大的特色就是实用和实战性强,思维灵活.内容主要包括Web渗透必备技术.Google黑客技术.文件上传渗透技术.SQL注入.高级渗透技术.0day

Spark Structured Streaming框架(2)之数据输入源详解

Spark Structured Streaming目前的2.1.0版本只支持输入源:File.kafka和socket. 1. Socket Socket方式是最简单的数据输入源,如Quick example所示的程序,就是使用的这种方式.用户只需要指定"socket"形式并配置监听的IP和Port即可. val scoketDF = spark.readStream .format("socket") .option("host","

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

Structured Streaming教程(3) —— 与Kafka的集成

Structured Streaming最主要的生产环境应用场景就是配合kafka做实时处理,不过在Strucured Streaming中kafka的版本要求相对搞一些,只支持0.10及以上的版本.就在前一个月,我们才从0.9升级到0.10,终于可以尝试structured streaming的很多用法,很开心~ 引入 如果是maven工程,直接添加对应的kafka的jar包即可: <dependency> <groupId>org.apache.spark</groupI

Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?

本章节根据源代码分析Spark Structured Streaming(Spark2.4)在进行DataSourceProvider查找的流程,首先,我们看下读取流数据源kafka的代码: SparkSession sparkSession = SparkSession.builder().getOrCreate(); Dataset<Row> sourceDataset = sparkSession.readStream().format("kafka").option

运维实战案例之“Argument list too long”错误与解决方法

作为一名运维人员来说,这个错误并不陌生,在执行rm.cp.mv等命令时,如果要操作的文件数很多,可能会使用通配符批量处理大量文件,这时就可能会出现"Argument list too long"这个问题了. 1.错误现象 这是一台Mysql数据库服务器,在系统中运行了很多定时任务,今天通过crontab命令又添加了一个计划任务,退出时发生了如下报错: #crontab -e 编辑完成后,保存退出,就出现下面如下图所示错误: 2.解决思路 根据上面报错的提示信息,基本判定是磁盘空间满了,