sparkStreaming入门

1.环境

jdk : 1.8

scala : 2.11.7

hadoop:2.7

spark : 2.2.0

2. 开发工具

idea 2017.2

3.maven的pom文件

<dependencies>
<!-- https://mvnrepository.com/artifact/com.sun/tools -->
<!-- https://mvnrepository.com/artifact/org.apache.maven/maven-core -->
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-core</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>

4.sparkStreaming通过本地的socket端口解析日志

package test02

import org.apache.spark.SparkConfimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.StreamingContext

object demo5 {  def main (args : Array[String]) {    val conf = new SparkConf().setMaster("local[4]").setAppName("SaprkApp")    val ssc = new StreamingContext(conf, Seconds(10))    val lines = ssc.socketTextStream("localhost", 9999)

//测试数据 : 2017-07-18 11:02:52.032 INFO   172.31.20.232:56965 [email protected]@172.31.33.81 PUBLIC_USER_LOGIN(0202)-0 create session for 18583551/[email protected]@172.31.33.81    val loginRDD = lines.map(line =>{      val pattern = """^(\S+\s\d+\:\d+\:\d+\.\d{3})\s+\S+\s+(\d+\.\d+.\d+\.\d+):\d+\S+\s+(\d+)@(\S+)@\S+\s+PUBLIC_USER_LOGIN\(0202\)-0\s+create session for.*""".r      var login_time = ""      var ip_address = ""      var passport_id = ""      var session_id = ""      var scene_name = "login"      pattern.findAllIn(line).matchData foreach{m =>{        login_time = m.group(1)        ip_address = m.group(2)        passport_id = m.group(3)        session_id  = m.group(4)

println(login_time)        println(ip_address)        println(passport_id)        println(session_id)

}}      (login_time,ip_address,passport_id,session_id,scene_name)    })    val loginRes = loginRDD.filter(_._1 != "").filter(_._2 != "")    loginRes.print()

//测试数据 : 2017-07-18 11:03:44.312 INFO   0.0.0.0:18402 [email protected]@172.31.32.135 ADMIN_SYSTEM_SUCCESS(00FE)-84581  -> USER_DISCONNECTED Time cost 9.09ms    val logoutRDD = lines.map(line =>{      val pattern = """^(\S+\s\d+\:\d+\:\d+\.\d{3})\s+\S+\s+(\d+\.\d+\.\d+\.\d+):\d+\S+\s+(\d+)@(\S+)@.*(disconnected|DISCONNECTED).*""".r      var login_id = ""      var ip = ""      var passport_id = ""      var str = ""      var scene_name = "logout"      pattern.findAllIn(line).matchData foreach{m =>{        login_id = m.group(1)        ip = m.group(2)        passport_id = m.group(3)        str = m.group(4)

println(login_id)        println(ip)        println(passport_id)        println(str)

}}      (login_id,ip,passport_id,str)    })    val logoutRes = logoutRDD.filter(_._1 != "").filter(_._2 != "")    logoutRes.print()    logoutRes.saveAsTextFiles("/Users/huiliyang/streaming/aa")

//测试数据 : 2017-08-27 06:04:38.420 [info] <0.3471.83> 172.31.2.201:59154 70281275 PUBLIC_SERVER_CLIENT_LOG(258)-0 LovelyStreet:1228    val eventRDD = lines.map(line =>{      val pattern = """^(\S+\s\d+\:\d+\:\d+\.\d{3})\s+\[info\]\s<[\d\.]*>\s?(\d+\.\d+\.\d+.\d+):\d+\S+\s+\S?([1-9]\d{7})(@\[email protected]\d+\.\d+\.\d+\.\d+)?\S?\s+PUBLIC_(SERVER|SYSTEM)_CLIENT\S+\s(\S+):(\d+)""".r      var login_time = ""      var ip_address = ""      var passport_id = ""      var session_id = ""      var str1 = ""      var str2 = ""      var str3 = ""      pattern.findAllIn(line).matchData foreach{m =>{        login_time = m.group(1)        ip_address = m.group(2)        passport_id = m.group(3)        session_id  = m.group(4)        str1  = m.group(5)        str2  = m.group(6)        str3  = m.group(7)

println(login_time)        println(ip_address)        println(passport_id)        println(session_id)        println(str1)        println(str2)        println(str3)

}}      (login_time,ip_address,passport_id,session_id,str1,str2,str3)    })    val eventRes = eventRDD.filter(_._1 != "").filter(_._2 != "")    eventRes.print()

ssc.start()    ssc.awaitTermination()

}}
时间: 2024-07-29 12:17:20

sparkStreaming入门的相关文章

SparkStreaming入门及例子

看书大概了解了下Streaming的原理,但是木有动过手啊...万事开头难啊,一个wordcount 2小时怎么都运行不出结果.是我太蠢了,好了言归正传. SparkStreaming是一个批处理的流式计算框架,适合处理实时数据与历史数据混合处理的场景(比如,你用streaming将实时数据读入处理,再使用sparkSQL提取历史数据,与之关联处理).Spark Streaming将数据流以时间片为单位分割形成RDD,使用RDD操作处理每一块数据,没块数据都会生成一个spark JOB进行处理,

spark-streaming入门(二)

Input DStreams and Receivers Input DStreams are DStreams representing the stream of input data received from streaming sources. In the quick example, lines was an input DStream as it represented the stream of data received from the netcat server. Eve

spark案例

  其实小编写这篇文章的目的就是,前两天突然对spark心血来潮,想重温一下scala编写spark,知识嘛要温故知新,虽然现在写的比较少,但是平时花一小时复习复习,看看官网,对知识的巩固和深入有莫大的好处,于是乎小编就从网上搜了搜关于spark的一些入门案例,搜了半小时发现,基本上都是Wordcount,或者一些简单的调用API,让小编实在有些无从下手,于是乎小编就突发奇想,不如自己写一写,把代码上传到gitlab上,一方面有助于自己以后的复习,一方面也可以给大家提供一个正面或者反面教材,岂不

大数据入门第二十四天——SparkStreaming(2)与flume、kafka整合

前一篇中数据源采用的是从一个socket中拿数据,有点属于“旁门左道”,正经的是从kafka等消息队列中拿数据! 主要支持的source,由官网得知如下: 获取数据的形式包括推送push和拉取pull 一.spark streaming整合flume 1.push的方式 更推荐的是pull的拉取方式 引入依赖: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streami

Spark从入门到上手实战

Spark从入门到上手实战 课程学习地址:http://www.xuetuwuyou.com/course/186 课程出自学途无忧网:http://www.xuetuwuyou.com 讲师:轩宇老师 课程简介: Spark属于新起的基于内存处理海量数据的框架,由于其快速被众公司所青睐.Spark 生态栈框架,非常的强大,可以对数据进行批处理.流式处理.SQL 交互式处理及机器学习和Graphx 图像计算.目前绝大数公司都使用,主要在于 Spark SQL 结构化数据的处理,非常的快速,高性能

python3+spark2.1+kafka0.8+sparkStreaming

python代码: import time from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from operator import add sc = SparkContext(master="local[1]",appName="PythonSparkStreamingR

使用scala开发spark入门总结

使用scala开发spark入门总结 一.spark简单介绍 关于spark的介绍网上有很多,可以自行百度和google,这里只做简单介绍.推荐简单介绍连接:http://blog.jobbole.com/89446/ 1.    spark是什么? Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架.一般配合hadoop使用,可以增强hadoop的计算性能. 2.    Spark的优点有哪些? Sp

Spark机器学习:Spark 编程模型及快速入门

http://blog.csdn.net/pipisorry/article/details/52366356 Spark编程模型 SparkContext类和SparkConf类 我们可通过如下方式调用 SparkContext 的简单构造函数,以默认的参数值来创建相应的对象.val sc = new SparkContext("local[4]", "Test Spark App") 这段代码会创建一个4线程的 SparkContext 对象,并将其相应的任务命

Spark 入门(Python、Scala 版)

本文中,我们将首先讨论如何在本地机器上利用Spark进行简单分析.然后,将在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索).最后两节将开始通过命令行与Spark进行交互,然后演示如何用Python写Spark应用,并作为Spark作业提交到集群上.同时也会提供相应的 Scala 版本. 1.设置Spark环境 在本机设置和运行Spark非常简单.你只需要下载一个预构建的包,只要你安装了Java 6+和Python 2.6+,就可以在Windows.Mac O