Flume结合Spark测试

近日,在测试Flume结合Kafka结合Spark Streaming的实验。今天把Flume与Spark的简单结合做出来了,这里记录一下,避免网友走弯路。有不周到的地方还希望路过的大神多多指教。

实验比较简单,分为两部分:一、使用avro-client发送数据 二、使用netcat发送数据

首先Spark程序需要Flume的两个jar包:

flume-ng-sdk-1.4.0、spark-streaming-flume_2.11-1.2.0

一、使用avro-client发送数据

1、  编写Spark程序,该程序的功能是接收Flume事件

import org.apache.log4j.{Level, Logger}

import org.apache.spark.SparkConf

importorg.apache.spark.storage.StorageLevel

import org.apache.spark.streaming._

import org.apache.spark.streaming.flume._

object FlumeEventTest{

defmain(args:Array[String]) {

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

val hostname = args(0)

val port = args(1).toInt

val batchInterval = args(2)

val sparkConf = newSparkConf().setAppName("FlumeEventCount").setMaster("local[2]")

val ssc = new StreamingContext(sparkConf,batchInterval)

valstream = FlumeUtils.createStream(ssc,hostname,port,StorageLevel.MEMORY_ONLY)

stream.count().map(cnt => "Received " + cnt + " flumeevents." ).print()

ssc.start()

ssc.awaitTermination()

}

}

2、  Flume配置文件参数

a1.channels = c1

a1.sinks = k1

a1.sources = r1

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = localhost

a1.sinks.k1.port = 9999

a1.sources.r1.type = avro

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

a1.sources.r1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

这里,使用avro向flume的44444端口发送数据;然后flume通过9999向Spark发送数据。

3、  运行Spark程序:

4、  通过Flume配置文件启动Flumeagent

../bin/flume-ng agent --conf conf--conf-file ./flume-conf.conf --name a1

-Dflume.root.logger=INFO,console

Spark运行效果:

5、  使用avro来发送文件:

./flume-ng avro-client --conf conf -Hlocalhost -p 44444 -F/opt/servicesClient/Spark/spark/conf/spark-env.sh.template-Dflume.root.logger=DEBUG,console

Flume agent效果:

Spark效果:

二、使用netcat发送数据

1、  Spark程序同上

2、  配置Flume参数

a1.channels = c1

a1.sinks = k1

a1.sources = r1

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = localhost

a1.sinks.k1.port = 9999

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

a1.sources.r1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

这里,使用telnet作为Flume的数据源

3、  运行Spark程序同上

4、  通过Flume配置文件启动Flumeagent

../bin/flume-ng agent --conf conf--conf-file ./flume-conf.conf --name a1

-Dflume.root.logger=INFO,console

注意:这里使用netcat作为Flume的数据源,注意与avro作为源的效果区别

5、  使用telnet发送数据

Spark效果:

这是两个比较简单的demo,如果真正在项目中使用Flume来收集数据,使用Kafka作为分布式消息队列,使用Spark Streaming实时计算,还需要详细研究Flume和Spark流计算。

前段时间给部门做培训,演示了Spark Streaming的几个例子:文本处理、网络数据处理、stateful操作和window操作,这几天有时间整理整理,分享给大家。包括Spark MLlib的两个简单demo:基于K-Means的用户分类和基于协同过滤的电影推荐系统。

今天看了斯坦福Andrew Ng教授的ML课程,讲的很棒,这里把链接分享给大家:

http://open.163.com/special/opencourse/machinelearning.html

原文地址:http://snglw.blog.51cto.com/5832405/1652508

时间: 2024-10-20 14:07:47

Flume结合Spark测试的相关文章

hadoop-ha+zookeeper+hbase+hive+sqoop+flume+kafka+spark集群安装

创建3台虚拟机 主机为桌面版 其他为迷你版本 ******************************常用命令.进程名称****************************启动集群命令: start-all.sh启动zookeeper: zkServer.sh start 启动journalnode: hadoop-daemon.sh start journalnode启动namenode: hadoop-daemon.sh --script hdfs start namenode启动z

2016年大数据Spark“蘑菇云”行动之flume整合spark streaming

近期,听了王家林老师的2016年大数据Spark"蘑菇云"行动,需要将flume,kafka和Spark streaming进行整合. 感觉一时难以上手,还是先从简单着手吧:我的思路是这样的,flume产生数据,然后输出到spark streaming,flume的源数据是netcat(地址:localhost,端口22222),输出是avro(地址:localhost,端口是11111).Spark streaming的处理是直接输出有几个events. 一.配置文件 Flume 配

Spark Streaming和Flume集成指南V1.4.1

Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务.这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据.这里有两个方法. Python API:Flume现在还不支持PythonAPI 方法1:Flume风格的推方法 Flume被设计用来在Flume代理之间推送数据.在这种方法中,Spark Streaming本质上设置了一个接收器作为Flume的一个Avro代理,Flume把数据推送到接收器上.下面是配置的步骤. 一

spark on alluxio和MR on alluxio测试(改进版)【转】

转自:http://kaimingwan.com/post/alluxio/spark-on-alluxiohe-mr-on-alluxioce-shi-gai-jin-ban 1. 介绍 2. 准备数据 2.1 清空系统缓存 3. MR测试 3.1 MR without alluxio 3.2 MR with alluxio 3.3 问题补充 4. spark测试 4.1 spark without alluxio 4.2 spark with alluxio 5. 第一阶段实验总结 6. I

Linux搭建XMPP服务器Tigase(Spark客户端测试)

Tigase是一个基于Java开发的XMPP服务器,类似于Openfire,可用于搭建一个即时通讯(Instant Messaging,简称IM)的平台. 1.准备 在安装Tigase之前,首先需要准备Java环境以及数据库(本文使用MySQL). Tigase下载:https://projects.tigase.org/projects/tigase-server/files 下载:tigase-server-5.2.2-b3463-dist-max.tar.gz,并解压缩文件: wget h

Spark Streaming整合Flume

1 目的 Spark Streaming整合Flume.参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html) 2 整合方式一:基于推 2.1 基本要求 flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据 streaming应用必须先启动,receive必须要先监听推送数据的端口后,flume才能推送数据 添加如下依赖 groupId = org.

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

决胜大数据时代:Hadoop&Yarn&Spark企业级最佳实践(8天完整版脱产式培训版本)

Hadoop.Yarn.Spark是企业构建生产环境下大数据中心的关键技术,也是大数据处理的核心技术,是每个云计算大数据工程师必修课. 课程简介 大数据时代的精髓技术在于Hadoop.Yarn.Spark,是大数据时代公司和个人必须掌握和使用的核心内容. Hadoop.Yarn.Spark是Yahoo!.阿里淘宝等公司公认的大数据时代的三大核心技术,是大数据处理的灵魂,是云计算大数据时代的技术命脉之所在,以Hadoop.Yarn.Spark为基石构建起来云计算大数据中心广泛运行于Yahoo!.阿

Flume 学习笔记之 Flume NG+Kafka整合

Flume NG集群+Kafka集群整合: 修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafka hadoop1: #set Agent name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacit