cdh环境下,spark streaming与flume的集成问题总结

如何做集成,其实特别简单,网上其实就是教程。

http://blog.csdn.net/fighting_one_piece/article/details/40667035  看这里就成。 我用的是第一种集成。。

做的时候,出现了各种问题。    大概从从2014.12.17 早晨5点搞到2014.12.17晚上18点30

总结起来其实很简单,但做的时候搞了许久啊啊啊!!!!   这样的事情,吃一堑长一智吧

问题1、  需要引用各种包,这些包要打入你的JAR中, 因为用的是spark on yarn模式,所以如果不打进去,在集群中是找不到依赖包的!!!  去哪找呢?  直接去search.maven.org找。。

问题2:因为搭建的spark on yarn集群,所以监听时只能监听localhost,不然如果你指定了ip,那么非该IP下的结点,就会因为监听不到而出现了问题

问题3:cdh中的flume的启动,你要去find / -name flume.conf ,找一下,然后找到最新的,与cloudera manager配置文件一样的那么,flume启动时就用这个配置文件

问题4:不要直接用集群,先用单点测试一下。。因为单点测试一下后会发现各种问题。 解决后再去集群测试

问题5:一定要注意版本!  cdh5.2中spark的版本是1.1.0,而我用的插件一直是1.1.1版本的!!! 啊, 为这事儿,我从中午搞到现在。   这个要吃一堑长一智啦!!!

spark代码如下

package com.hark

import java.io.File

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

/**
 * Created by Administrator on 2014-12-16.
 */
object SparkStreamingFlumeTest {
  def main(args: Array[String]) {
    //println("harkhark")

    val path = new File(".").getCanonicalPath()
    //File workaround = new File(".");
    System.getProperties().put("hadoop.home.dir", path);
    new File("./bin").mkdirs();
    new File("./bin/winutils.exe").createNewFile();

    //val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")

    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(20))

    //val hostname = "127.0.0.1"
   val hostname = "localhost"
    val port = 2345
    val storageLevel = StorageLevel.MEMORY_ONLY
    val flumeStream = FlumeUtils.createStream(ssc, hostname, port, storageLevel)

    flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()

  }
}

  

flume配置文件如下

# Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per
# agent name, in this case ‘tier1‘.
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

# For each source, channel, and sink, set
# standard properties.
tier1.sources.source1.type     = exec
tier1.sources.source1.command     = tail -F /opt/data/test3/123
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type   = memory
#tier1.sinks.sink1.type         = logger
tier1.sinks.sink1.type         = avro
tier1.sinks.sink1.hostname        = localhost
tier1.sinks.sink1.port        = 2345
tier1.sinks.sink1.channel      = channel1

# Other properties are specific to each type of yhx.hadoop.dn01
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
tier1.channels.channel1.capacity = 100

spark启动命令如下:

spark-submit --driver-memory 512m --executor-memory 512m --executor-cores 1  --num-executors 3 --class com.hark.SparkStreamingFlumeTest --deploy-mode cluster --master yarn /opt/spark/SparkTest.jar

flume启动命令如下:

flume-ng agent --conf /opt/cloudera-manager/run/cloudera-scm-agent/process/585-flume-AGENT --conf-file /opt/cloudera-manager/run/cloudera-scm-agent/process/585-flume-AGENT/flume.conf --name tier1 -Dflume.root.logger=INFO,console
时间: 2024-12-27 23:59:48

cdh环境下,spark streaming与flume的集成问题总结的相关文章

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs

Spark 系列(十五)—— Spark Streaming 整合 Flume

一.简介 Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中.Spark Straming 提供了以下两种方式用于 Flume 的整合. 二.推送式方法 在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Flume 通过 avro Sink 将数据源源不断推送到该端口.这里以监听日志文件为例,具体整合方式如

Spark学习七:spark streaming与flume集成

Spark学习七:spark streaming与flume集成 标签(空格分隔): Spark 一,启动flume flume-conf.properties文件 agent002.sources = sources002 agent002.channels = channels002 agent002.sinks = sinks002 ## define sources agent002.sources.sources002.type = exec agent002.sources.sour

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Spark Streaming整合Flume

1 目的 Spark Streaming整合Flume.参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html) 2 整合方式一:基于推 2.1 基本要求 flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据 streaming应用必须先启动,receive必须要先监听推送数据的端口后,flume才能推送数据 添加如下依赖 groupId = org.

ueditor1.4.3在.net环境下的vs开发工具中集成经验

Ueditor是个很不错的在线富文本编辑器,几个项目一直使用它.最近想更新版本,发现新版1.4.3与旧版的部署方式完全不一样了,官网文档介绍的是直接放在iis下的部署说明,没有提到在vs开发工具中如何集成,自己新建了一个测试项目琢磨了一会,测试没啥问题,记录下给大家分享. 项目结构如下图: 因为我创建的是web项目类型,所以把controller.ashx以项目形式的一般处理程序迁移过去,并重命名成ueditor.ashx(记得在ueditor.config.js修改服务器统一请求接口路径).另

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

第88讲:Spark Streaming从Flume Poll数据

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦