Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成

标签(空格分隔): Spark


  • Spark学习八spark streaming与flume和kafka集成

    • 一Kafka
    • 二flume和kafka的集成
    • 三kafka和spark streaming的集成方式一kafka推送
    • 四kafka和spark streaming的集成方式一spark streaam主动获取
    • 五spark stream的高级应用updateStateByKey实现累加功能
    • 六spark stream的高级应用窗口函数

一,Kafka

1,概述

2,kafka的安装

解压安装文件

修改server.properties文件:

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/opt/app/kafka_2.10-0.8.2.1/ka-logs

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=study.com.cn:2181

启动zookeeper:

bin/zkServer.sh start

启动kafka:

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh -daemon config/server.properties   //运行在后台

创建topic

bin/kafka-topics.sh --create --zookeeper study.com.cn:2181 --replication-factor 1 --partitions 1 --topic topicTest

bin/kafka-topics.sh --list --zookeeper study.com.cn:2181

推送信息(生产者):

bin/kafka-console-producer.sh --broker-list study.com.cn:9092 --topic topicTest
eeeeeeeeeeeeeeeeeeeeeeeeeeee

接受信息(消费者):

bin/kafka-console-consumer.sh --zookeeper study.com.cn:2181 --topic topicTest --from-beginning

二,flume和kafka的集成

创建topicFlume:

bin/kafka-topics.sh --create --zookeeper study.com.cn:2181 --replication-factor 1 --partitions 1 --topic topicFlume

bin/kafka-topics.sh --list --zookeeper study.com.cn:2181

修改flume-kafka.xml的配置:

agent002.sources = sources002
agent002.channels = channels002
agent002.sinks = sinks002

## define sources
agent002.sources.sources002.type = exec
agent002.sources.sources002.command = tail -F /opt/app/apache-flume-1.5.0-bin/monitor/log.input

## define channels
agent002.channels.channels002.type = memory
agent002.channels.channels002.capacity = 10000
agent002.channels.channels002.transactionCapacity = 10000
agent002.channels.channels002.byteCapacityBufferPercentage = 20
agent002.channels.channels002.byteCapacity = 800000

##define sinks
agent002.sinks.sinks002.type =org.apache.flume.sink.kafka.KafkaSink
agent002.sinks.sinks002.brokerList=study.com.cn:9092
agent002.sinks.sinks002.topic=topicFlume

##relationship
agent002.sources.sources002.channels = channels002
agent002.sinks.sinks002.channel = channels002

启动flume:

bin/flume-ng agent --conf conf --name agent002 --conf-file conf/flume-kafka001.properties -Dflume.root.logger=INFO,console

启动kafka消费者:

bin/kafka-console-consumer.sh --zookeeper study.com.cn:2181 --topic topicFlume --from-beginning

测试:

echo "ddddddddddddddddddddd" >>log.input

三,kafka和spark streaming的集成方式一(kafka推送)

创建topicFlume:

bin/kafka-topics.sh --create --zookeeper study.com.cn:2181 --replication-factor 1 --partitions 1 --topic topicSpark

bin/kafka-topics.sh --list --zookeeper study.com.cn:2181

准备需要的jar包:

启动spark本地应用:

bin/spark-shell \
--jars /opt/app/spark-1.3.0-bin-2.5.0/externaljars/kafka_2.10-0.8.2.1.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/kafka-clients-0.8.2.1.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/metrics-core-2.2.0.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/zkclient-0.3.jar \
--master local[2]

spark源码:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

val ssc = new StreamingContext(sc, Seconds(30))

val topicMap = Map("topicSpark" -> 1)
val kafkaStream = KafkaUtils.createStream(ssc, "study.com.cn:2181", "topicGroup", topicMap).map(_._2)

val wordCountDStream = kafkaStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

wordCountDStream.print()

ssc.start()
ssc.awaitTermination()

测试:

bin/kafka-console-producer.sh --broker-list study.com.cn:9092 --topic topicSpark
eeeeee eeeeeeeeeeeeeeeeeeeee hadoop

四,kafka和spark streaming的集成方式一(spark streaam主动获取)

创建topicFlume:

bin/kafka-topics.sh --create --zookeeper study.com.cn:2181 --replication-factor 1 --partitions 1 --topic topicSpark

bin/kafka-topics.sh --list --zookeeper study.com.cn:2181

准备需要的jar包:

启动spark本地应用:

bin/spark-shell \
--jars /opt/app/spark-1.3.0-bin-2.5.0/externaljars/kafka_2.10-0.8.2.1.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/kafka-clients-0.8.2.1.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/metrics-core-2.2.0.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/zkclient-0.3.jar \
--master local[2]

spark源码:

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

val ssc = new StreamingContext(sc, Seconds(30))

// kafkaParams: Map[String, String]
val kafkaParams = Map("metadata.broker.list" -> "study.com.cn:9092")
// topics: Set[String]
val topics = Set("topicSpark")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

val wordCountDStream = kafkaStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

wordCountDStream.print()

ssc.start()
ssc.awaitTermination()

测试:

bin/kafka-console-producer.sh --broker-list study.com.cn:9092 --topic topicSpark
eeeeee eeeeeeeeeeeeeeeeeeeee hadoop

五,spark stream的高级应用updateStateByKey实现累加功能

创建topicFlume:

bin/kafka-topics.sh --create --zookeeper study.com.cn:2181 --replication-factor 1 --partitions 1 --topic topicSpark

bin/kafka-topics.sh --list --zookeeper study.com.cn:2181

准备需要的jar包:

启动spark本地应用:

bin/spark-shell \
--jars /opt/app/spark-1.3.0-bin-2.5.0/externaljars/kafka_2.10-0.8.2.1.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/kafka-clients-0.8.2.1.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/metrics-core-2.2.0.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/zkclient-0.3.jar \
--master local[2]

spark源码:

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

val ssc = new StreamingContext(sc, Seconds(30))

ssc.checkpoint(".")
val kafkaParams = Map("metadata.broker.list" -> "study.com.cn:9092")
// topics: Set[String]
val topics = Set("topicSpark")

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

val wordDStream = kafkaStream.flatMap(_.split(" ")).map((_, 1))

// definition ,Option[Int]
// SessionInfo: sessionId, ip, count
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  val currentCount = values.sum
  val previousCount = state.getOrElse(0)
  Some(currentCount + previousCount)
}

// wordDStream[(K, V)]
val wordCountDStream = wordDStream.updateStateByKey(updateFunc)

wordCountDStream.print()

ssc.start()
ssc.awaitTermination()

测试:

bin/kafka-console-producer.sh --broker-list study.com.cn:9092 --topic topicSpark
eeeeee eeeeeeeeeeeeeeeeeeeee hadoop

六,spark stream的高级应用窗口函数

创建topicFlume:

bin/kafka-topics.sh --create --zookeeper study.com.cn:2181 --replication-factor 1 --partitions 1 --topic topicSpark

bin/kafka-topics.sh --list --zookeeper study.com.cn:2181

准备需要的jar包:

启动spark本地应用:

bin/spark-shell \
--jars /opt/app/spark-1.3.0-bin-2.5.0/externaljars/kafka_2.10-0.8.2.1.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/kafka-clients-0.8.2.1.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/metrics-core-2.2.0.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\
/opt/app/spark-1.3.0-bin-2.5.0/externaljars/zkclient-0.3.jar \
--master local[2]

spark源码:

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

val ssc = new StreamingContext(sc, Seconds(5))

// kafkaParams: Map[String, String]
val kafkaParams = Map("metadata.broker.list" -> "bigdata-senior01.ibeifeng.com:9092")
// topics: Set[String]
val topics = Set("sparkTopic")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

// val wordCountDStream = kafkaStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// 每隔十五秒,统计最近五分钟之内的情况
val wordDStream = kafkaStream.flatMap(_.split(" ")).map((_, 1))
//
val reduceFunc = (v1: Int, v1: Int) =>{
  v1 + v2
}
val wordCountDStream = wordDStream.reduceByKeyAndWindow(reduceFunc, Seconds(5 * 12 * 5), Seconds(3 * 5))

wordCountDStream.print()

ssc.start()
ssc.awaitTermination()
时间: 2024-10-26 15:48:38

Spark学习八:spark streaming与flume和kafka集成的相关文章

Spark学习笔记——Spark Streaming

许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用.训练机器学习模型的应用, 还有自动检测异常的应用.Spark Streaming 是 Spark 为这些应用而设计的模型.它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码. Spark Streaming 使用离散化流( discretized stream)作为抽象表示, 叫作 DStream. DStream 是随时间推移而收到的数据的序列.在内部,每个时间区间收到

Spark学习9 Spark Streaming流式数据处理组件学习

目录 SparkStreaming相关概念 概述 SparkStreaming的基本数据抽象DStream 处理模式 操作流程中细节 StreamingContext StreamingContext对象的创建 StreamingContext主要用法 输入源 DStream两种转化 无状态转化操作 有状态转化操作 输出操作 实践(最简单的wordCount) 创建StreamingContext对象 创建DStream对象 对DStream对象操纵 SparkStreaming相关概念 概述

Spark学习之Spark安装

Spark安装 spark运行环境 spark是Scala写的,运行在jvm上,运行环境为java7+ 如果使用Python的API ,需要使用Python2.6+或者Python3.4+ Spark1.6.2  -  Scala 2.10    Spark 2.0.0  -  Scala  2.11 Spark下载 下载地址:http://spark.apache.org/downloads.html 搭建spark,不需要Hadoop,如有Hadoop集群,可下载对应版本解压 Spark目录

Spark学习(一) Spark初识

一.官网介绍 1.什么是Spark 官网地址:http://spark.apache.org/ Apache Spark™是用于大规模数据处理的统一分析引擎. 从右侧最后一条新闻看,Spark也用于AI人工智能 spark是一个实现快速通用的集群计算平台.它是由加州大学伯克利分校AMP实验室 开发的通用内存并行计算框架,用来构建大型的.低延迟的数据分析应用程序.它扩展了广泛使用的MapReduce计算 模型.高效的支撑更多计算模式,包括交互式查询和流处理.spark的一个主要特点是能够在内存中进

新闻网大数据实时分析可视化系统项目——9、Flume+HBase+Kafka集成与开发

1.下载Flume源码并导入Idea开发工具 1)将apache-flume-1.7.0-src.tar.gz源码下载到本地解压 2)通过idea导入flume源码 打开idea开发工具,选择File——>Open 然后找到flume源码解压文件,选中flume-ng-hbase-sink,点击ok加载相应模块的源码. 2.官方flume与hbase集成的参数介绍 3.下载日志数据并分析 到搜狗实验室下载用户查询日志 1)介绍 搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索

Spark学习笔记-Spark Streaming

http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html 在SparkStreaming中如何对数据进行分片 Level of Parallelism in Data Processing Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the computation is not

【Spark学习】Spark 1.1.0 with CDH5.2 安装部署

[时间]2014年11月18日 [平台]Centos 6.5 [工具]scp [软件]jdk-7u67-linux-x64.rpm spark-worker-1.1.0+cdh5.2.0+56-1.cdh5.2.0.p0.35.el6.noarch.rpm spark-core-1.1.0+cdh5.2.0+56-1.cdh5.2.0.p0.35.el6.noarch.rpm spark-history-server-1.1.0+cdh5.2.0+56-1.cdh5.2.0.p0.35.el6.

spark学习笔记-spark集群搭建(7)

安装spark包 1 1.将spark-1.3.0-bin-hadoop2.4.tgz使用WinSCP上传到/usr/local目录下. 2 2.解压缩spark包:tar zxvf spark-1.3.0-bin-hadoop2.4.tgz. 3 3.更改spark目录名:mv spark-1.3.0-bin-hadoop2.4 spark 4 4.设置spark环境变量 5 vi .bashrc 6 export SPARK_HOME=/usr/local/spark 7 export PA

Spark学习笔记——Spark上数据的获取、处理和准备

数据获得的方式多种多样,常用的公开数据集包括: 1.UCL机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类.回归.聚类和推荐系统任务.数据集列表位于:http://archive.ics.uci.edu/ml/ 2.Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问.这些数据集包括人类基因组项目.Common Crawl网页语料库.维基百科数据和Google Books Ngrams.相关信息可参见:http://aws.amazon.com/p