大数据入门第二十四天——SparkStreaming(2)与flume、kafka整合

前一篇中数据源采用的是从一个socket中拿数据,有点属于“旁门左道”,正经的是从kafka等消息队列中拿数据!

主要支持的source,由官网得知如下:

  获取数据的形式包括推送push和拉取pull

一、spark streaming整合flume

  1.push的方式

    更推荐的是pull的拉取方式

    引入依赖:

     <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

    编写代码:

package com.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by ZX on 2015/6/22.
  */
object FlumePushWordCount {

  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val conf = new SparkConf().setAppName("FlumeWordCount")//.setMaster("local[2]")
    // 使用此构造器将可以省略sc,由构造器构建
    val ssc = new StreamingContext(conf, Seconds(5))
    // 推送方式: flume向spark发送数据(注意这里的host和Port是streaming的地址和端口,让别人发送到这个地址)
    val flumeStream = FlumeUtils.createStream(ssc, host, port)
    // flume中的数据通过event.getBody()才能拿到真正的内容
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1))

    val results = words.reduceByKey(_ + _)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

    flume-push.conf——flume端配置文件:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = avro
#这是接收方
a1.sinks.k1.hostname = 192.168.31.172
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume-push.conf

  2.pull的方式

    属于推荐的方式,通过streaming来主动拉取flume产生的数据

    编写代码:(依赖同上)

package com.streaming

import java.net.InetSocketAddress

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FlumePollWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //从flume中拉取数据(flume的地址),通过Seq序列,里面可以new多个地址,从多个flume地址拉取
    val address = Seq(new InetSocketAddress("172.16.0.11", 8888))
    val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1))
    val results = words.reduceByKey(_+_)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

      配置flume

  通过拉取的方式需要flume的lib目录中有相关的JAR(要通过spark程序来调flume拉取),通过官网可以得知具体的JAR信息:

  

    配置flume:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true

# Describe the sink(配置的是flume的地址,等待拉取)
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = mini1
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume-poll.conf

    启动flume,然后启动IDEA中的spark streaming:

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -Dflume.root.logger=INFO,console
// -D后参数可选

原文地址:https://www.cnblogs.com/jiangbei/p/8856750.html

时间: 2024-10-19 12:02:14

大数据入门第二十四天——SparkStreaming(2)与flume、kafka整合的相关文章

大数据入门第二十五天——logstash入门

一.概述 1.logstash是什么 根据官网介绍: Logstash 是开源的服务器端数据处理管道,能够同时 从多个来源采集数据.转换数据,然后将数据发送到您最喜欢的 “存储库” 中.(我们的存储库当然是 Elasticsearch.) //属于elasticsearch旗下产品(JRuby开发,开发者曾说如果他知道有scala,就不会用jruby了..) 也就是说,它是flume的“后浪”,它解决了“前浪”flume的数据丢失等问题! 2.基础结构  输入:采集各种来源数据 过滤:实时解析转

大数据入门第十四天——Hbase详解(一)入门与安装配置

一.概述 1.什么是Hbase 根据官网:https://hbase.apache.org/ Apache HBase™ is the Hadoop database, a distributed, scalable, big data store. HBASE是一个高可靠性.高性能.面向列.可伸缩的分布式存储系统 中文简明介绍: Hbase是分布式.面向列的开源数据库(其实准确的说是面向列族).HDFS为Hbase提供可靠的底层数据存储服务,MapReduce为Hbase提供高性能的计算能力,

python入门第二十四天----成员修饰符 类的特殊成员

1 #成员修饰符 修饰符可以规定内部的字段.属性.方法等 是共有的成员,私有的成员 2 class Foo: 3 def __init__(self,name,age): 4 self.name=name 5 self.age=age #可以在外部直接访问 6 7 obj=Foo('Jack',22) 8 print(obj.name) 9 print(obj.age) 共有字段 1 #成员修饰符 修饰符可以规定内部的字段.属性.方法等 是共有的成员,私有的成员 2 class Foo: 3 d

大数据入门第二十二天——spark(三)自定义分区、排序与查找

一.自定义分区 1.概述 默认的是Hash的分区策略,这点和Hadoop是类似的,具体的分区介绍,参见:https://blog.csdn.net/high2011/article/details/68491115 2.实现 package cn.itcast.spark.day3 import java.net.URL import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext} import s

大数据笔记(十四)——HBase的过滤器与Mapreduce

一. HBase过滤器 package demo; import javax.swing.RowFilter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; impo

Spring入门第二十四课

Spring对JDBC的支持 直接看代码: db.properties jdbc.user=root jdbc.password=logan123 jdbc.driverClass=com.mysql.jdbc.Driver jdbc.jdbcUrl=jdbc:mysql://localhost:3306/selective-courses-system jdbc.initPoolSize=5 jdbc.maxPoolSize=10 applicationContext.xml <?xml ve

大数据入门第二天——基础部分之zookeeper(下)

一.集群自启动脚本 1.关闭zk [[email protected] bin]# jps 3104 Jps 2805 QuorumPeerMain [[email protected] bin]# kill -9 2805 //kill或者stop都是可以的 2.远程执行命令 [[email protected] bin]# ssh 192.168.137.138 /opt/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start [email prote

Egret入门学习日记 --- 第二十四篇(书中 9.12~9.15 节 内容)

第二十四篇(书中 9.12~9.15 节 内容) 开始 9.12节 内容. 重点: 1.TextInput的使用,以及如何设置加密属性. 操作: 1.TextInput的使用,以及如何设置加密属性. 创建exml文件,拖入组件,设置好id. 这是显示密码星号处理的属性. 创建绑定类. 实例化,并运行. 但是焦点在密码输入框时,密码是显示的. 暂时不知道怎么设置 “焦点在密码框上时,还是显示为 * 号” 的方法. 至此,9.12节 内容结束. 开始 9.13节 . 这个,和TextInput的使用

RabbitMQ入门教程(十四):RabbitMQ单机集群搭建

原文:RabbitMQ入门教程(十四):RabbitMQ单机集群搭建 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78723467 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 集群简介 理解集群先理解一下元数据 队列元数据:队列的名称和声明队列时设置的属性(是否持久化.是否自动删除.队列所属的节点)