kafka flumn sparkstreaming java实现监听文件夹内容保存到Phoenix中

ps:具体Kafka Flumn SparkStreaming的使用  参考前几篇博客

2.4.6.4.1 配置启动Kafka

(1) 在slave机器上配置broker

1) 点击CDH上的kafka进入kafka功能界面,切换到实例页签,点击下方的“添加角色实例”进入添加角色实例界面。

2) 进入添加角色界面,点击Kafka Broker下面的选择主机

3) 进入选择主机界面,将主机全选,之后点击确定

(2) 启动Kafka

(3) 创建主题,主题名为井名,有几个井创建几个对应的主题。

进入kafka的安装目录,公司环境安装地址为/opt/cloudera/parcels/KAFKA-3.1.1-1.3.1.1.p0.2/lib/kafka ,运行下面语句:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name

2.4.6.4.2 配置启动Flumn
2.4.6.4.2.1 配置Flumn

(1) 编写配置

Kafka配置了主题,每个主题配置一组sources、sinks、channels。每组配置中,tier1.sources.r1.spoolDir 设置为对应井深数据存放的文件夹、tier1.sinks.k1.topic设置为对应的主题名。下面是两个主题对应的设置,将上述提到的配置做相应修改就可用。


#配置一个agent,agent的名称可以自定义(如a1)

#指定agent的sources(如r1、r2)、sinks(如k1、k2)、channels(如c1、c2)

tier1.sources=r1 r2

tier1.sinks=k1 k2

tier1.channels=c1 c2

#描述source r1

#配置目录scource

#配置监控的目录,当目录出现新文件时会进行写入

tier1.sources.r1.type = spooldir

tier1.sources.r1.spoolDir =/var/flumefile

tier1.sources.r1.channels=c1

#配置数据源输出

#设置Kafka接收器,此处最坑,注意版本,此处为Flume 1.6.0的输出槽类型

tier1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink

#设置Kafka的broker地址和端口号

tier1.sinks.k1.brokerList=master:9092

#设置Kafka的Topic

tier1.sinks.k1.topic=topic-test

#设置序列化方式

tier1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

tier1.sinks.k1.channel=c1

#配置channels类型为 File

tier1.channels.c1.type=memory

tier1.channels.c1.capacity=10000

tier1.channels.c1.transactionCapacity=100

#描述source r1

#配置目录scource

#配置监控的目录,当目录出现新文件时会进行写入

tier1.sources.r2.type = spooldir

tier1.sources.r2.spoolDir =/var/flumefile2

tier1.sources.r2.channels=c2

#配置数据源输出

#设置Kafka接收器,此处最坑,注意版本,此处为Flume 1.6.0的输出槽类型

tier1.sinks.k2.type= org.apache.flume.sink.kafka.KafkaSink

#设置Kafka的broker地址和端口号

tier1.sinks.k2.brokerList=master:9092

#设置Kafka的Topic

tier1.sinks.k2.topic=test

#设置序列化方式

tier1.sinks.k2.serializer.class=kafka.serializer.StringEncoder

tier1.sinks.k2.channel=c2

#配置channels类型为 File

tier1.channels.c2.type=memory

tier1.channels.c2.capacity=10000

tier1.channels.c2.transactionCapacity=100

(2) 设置Flumn参数

1) 进入Flumn配置界面。

2) 将步骤1的配置粘贴到配置文件(Agent Default Group)

3) 配置代理名称,与配置文件中的命名相同

4) 启动Flumn

2.4.6.4.2.2 解决Flumn整合Kafka jar包版本问题

(1) 将Kafka主目录lib下的如下jar拷贝至Flume的lib目录下,并将Flume原本对应其 他版本jar删除。

kafka_2.10-0.8.2.1.jar、kafka-clients-0.8.2.1.jar、jopt-simple-3.2.jar、 metrics-core-2.2.0.jar、   scala-library-2.10.4.jar、zkclient-0.3.jar等

(2) 下载flume、kafka插件包,flumeng-kafka-plugin.jar并将其放到Flume的lib目录下。

2.4.6.4.3 编写sparkStearing

用sparkStearing作为消费者对Kafka的消息进行消费。

2.4.6.4.3.1 引入jar包

在pom.xml文件中添加下面代码,引入jar包


<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>

<version>2.3.1</version>

</dependency>

</dependencies>

2.4.6.4.3.2 代码编写

代码编写参考Spark集成Kafka API

Url:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html


// broker地址,可写多个“,”分隔

String brokers = "master:9092";

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("streamingKafka");

JavaSparkContext sc = new JavaSparkContext(conf);

sc.setLogLevel("WARN");

JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(10));

// kafka相关参数,必要!缺了会报错

Map<StringObject> kafkaParams = new HashMap<>();

kafkaParams.put("bootstrap.servers", brokers);

kafkaParams.put("key.deserializer", StringDeserializer.class);

kafkaParams.put("value.deserializer", StringDeserializer.class);

kafkaParams.put("group.id", "cloudera_mirrormaker");

// Topic分区 也可以通过配置项实现

// 如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性

// earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

// latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

// none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

kafkaParams.put("auto.offset.reset", "latest");

// 如果是使用spark-streaming-kafka-0-10,那么我们建议将enable.auto.commit设为false。

// 这个配置只是在这个版本生效,enable.auto.commit如果设为true的话,那么意味着offsets会按照auto.commit.interval.ms中所配置的间隔来周期性自动提交到Kafka中。

// 在Spark Streaming中,将这个选项设置为true的话会使得Spark应用从kafka中读取数据之后就自动提交,

// 而不是数据处理之后提交,这不是我们想要的。所以为了更好地控制offsets的提交,我们建议将enable.auto.commit设为false。

kafkaParams.put("enable.auto.commit", false);

// 设置消费的topic

Collection<String> topicsColl = Arrays.asList("topic-test", "test");

// 获取DStream

JavaInputDStream<ConsumerRecord<StringString>> lines = KafkaUtils.createDirectStream(ssc,

LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsColl, kafkaParams));

// 遍历JavaInputDStream的到RDD

lines.foreachRDD(rdd -> {

// 遍历RDD得到每一列的信息

rdd.foreach(x -> {

// 获取行

String value = x.value();

// 获取需要保存的每一行的信息

String[] split = value.split(";");

// 判断是否为保存数据需要的格式

// 保存offset,保证每次获取新数据

//获取偏移量

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

 //保存偏移量到Kafka

((CanCommitOffsets) lines.inputDStream()).commitAsync(offsetRanges);

});

ssc.start();

try {

ssc.awaitTermination();

catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

2.4.6.4.3.3 获取数据的格式

会将得到批数据封装成一个RDD,RDD里面存放着ConsumerRecord<String, String>,每一个ConsumerRecord<String, String>对应数据文件中的一行。数据格式为:


ConsumerRecord(

topic = test,

partition = 0,

offset = 547,

CreateTime = 1558419603357,

checksum = 890333719,

serialized key size = -1,

serialized value size = 92,

key = null,

value = 对应该行的数据

2.4.6.4.4 数据获取和持久化遇到的问题

(1) 一个文件夹对应一个Topic,SparkSsteaming获取多个topic数据的时候,是并发获取的,每个topic一个线程。

(2) 一个topic对应的文件夹放入多个文件,SparkSteaming会按照文件放入顺序,单线程获取数据。

(3) SparkSteaming会将一批数据封装成RDD,调用RDD.foreach()遍历时,数据处理的代码不能包含外部变量,并且不能调用外部方法。否则会报不可序列化的异常。

原文地址:https://www.cnblogs.com/Mr-yl/p/11063852.html

时间: 2024-10-10 17:54:39

kafka flumn sparkstreaming java实现监听文件夹内容保存到Phoenix中的相关文章

使用Node.JS监听文件夹变化

使用Node.JS监听文件夹改变有许多应用场合,比如: 构建自动编绎工具 当源文件改变时,自动运行build过程,比如当你写CoffeeScript文件或SASS CSS文件时,保存之后可即时生成对应的JS或CSS. 构建自动布署工具 通过侦听源文件夹的改变,你可以自动即时将改后的文件布署到测试服务器,加快你的开发测试速度. 这些工具其实都需要侦听文件夹的改变,基于Node.JS的侦听文件夹改变的模块有很多. fs.watch 其中Node.JS的文件系统也可侦听某个目录的改变, 如fs.wat

Android 监听文件夹

在一次Android和pc端的通讯过程中,我们放弃了adb forward来实现socket通讯.而是使用adb push文件,我监听文件夹... 都学习一下很有必要 本篇简单Android监听文件夹的方式FileObserver. FileObserver简介 Android.os包下的FileObserver类是一个用于监听文件访问.创建.修改.删除.移动等操作的监听器,基于Linux的INotify. FileObserver是个抽象类,必须继承它才能使用.每个FileObserver对象

如何使用NodeJs来监听文件变化

1.前言 在我们调试修改代码的时候,每修改一次代码,哪怕只是很小的修改,我们都需要手动重新build文件,然后再运行代码,看修改的效果,这样的效率特别低,对于开发者来说简直不能忍. 2.构建自动编译工具 如何使用nodeJs来监听文件变化,一旦源文件修改保存时,自动运行build过程.比如当你写CoffeeScript文件或SASS文件时,保存之后可即时生成对应的JS或CSS. 基于Node.JS的侦听文件夹改变的模块有很多. a .  fs.watch.Node.JS的文件系统也可侦听某个目录

Windows平台下Oracle 11g R2监听文件日志过大,造成客户端无法连接的问题处理

近期部署在生产环境的应用突然无法访问,查看应用日志发现无法获取数据库连接. SystemErr R Caused by: oracle.net.ns.NetException: The Network Adapter could not establish the connection SystemErr R at oracle.net.nt.ConnStrategy.execute(ConnStrategy.java:359) SystemErr R at oracle.net.resolve

win7 安装oracle 10g 未生成监听文件 导致配置监听时无法保存

最近这两天一直在为安装 的oracle 配置监听无法保存 再找各种解决方案,最后自己居然自己配置出来了. 因为缺少监听文件,拷贝别人的放到自己的目录下C:\oracle\product\10.2.0\client_1\NETWORK\ADMIN   listener.ora.tnsnames.ora和 sqlnet.ora 1.修改 tnsnames.ora  中的 # tnsnames.ora Network Configuration File:  c:\oracle\product\10.

使用oracle10g官方文档找到监听文件(listener.ora)的模板

***********************************************声明***********************************************************************  原创作品,出自 "深蓝的blog" 博客,欢迎转载,转载时请务必注明出处,否则追究版权法律责任. 深蓝的blog:http://blog.csdn.net/huangyanlong/article/details/39780739 *******

java 事件监听 - 控件

java 事件监听 //事件监听 //事件监听,写了一个小案例,点击按钮改变面板的颜色. import java.awt.*; import javax.swing.*; import java.awt.event.*; public class Index extends JFrame implements ActionListener{ //设置面板 Wdmb wdmb = new Wdmb(); //设置按钮 JButton anniu1 = new JButton("黄色");

FileSystemWatcher监听文件是否有被修改

作用:监听文件系统更改通知,并在目录或目录中的文件更改时引发事件. 需求:监听特定文件是否修改,然后做出相应的操作. 方法: ①利用一个线程,一直去查找该指定的文件是否有被修改,如果修改则操作特定步骤,否则继续查询. 缺点:占用CPU,要一直循环查找. ②利用.net里面的FileSystemWatcher来监听文件是否有被修改,如果有,则操作特定步骤. 代码: ①定义一个全局变量Watch FileSystemWatcher Watch; ②初始化该全局变量 Watch = new FileS

利用Node的chokidar 监听文件改变的文件。

最近维护一个项目.每次改完东西,都要上传到服务器.然后有时候就忘记一些东西,于是就想有没有可以方法能监听文件的改变.然后我再利用程序把更改的文件一键上传到服务器. 于是就找到了nodejs 的chokidar模块. 然后利用redis的set集合.因为我们频繁更改.如果用普通的字符 会增加比较.set是一个集合,里面的元素都是不重复的.正好可以利用这个特性.帮我们记录更改的记录文件.删除的时候,然后删掉set中的文件.目前只做了增加或修改的文件提交,如果本地删除的 不会同步到服务器. 监听本地文