Data Collection with Apache Flume(二)

今天继续讨论几个agent的配置。

第一个agent是从终端捕获特定命令执行的输出结果,并将文件输出到特定目录。先看一下配置的代码:

agent2.sources = execsource      //指定为从命令获取输出的source
agent2.sinks = filesink          //输出到文件的sink
agent2.channels = filechannel     //输出到文件的channel

agent2.sources.execsource.type = exec  //类型
agent2.sources.execsource.command = cat /home/leung/message   //指定命令

agent2.sinks.filesink.type = FILE_ROLL
agent2.sinks.filesink.sink.directory = /home/leung/flume/files     //输出目录
agent2.sinks.filesink.sink.rollInterval = 0

agent2.channels.filechannel.type = file
agent2.channels.filechannel.checkpointDir = /home/leung/flume/fc/checkpoint //检查点
agent2.channels.filechannel.dataDirs = /home/leung/flume/fc/data  //channel的数据目录

agent2.sources.execsource.channels = filechannel
agent2.sinks.filesink.channel = filechannel

   OK,启动agent2,然后查看结果。

结果如下图。可以看到,执行 cat /home/leung/message命令之后,输出的结果与files目录中的文件内容是一致的,证明已经成功写入文件。

下一个agent是从网络端口 获取数据然后写到Hadoop集群的HDFS中。先看看配置代码:

agent4.sources = netsource
agent4.sinks = hdfssink //HDFS sink
agent4.channels = memorychannel

agent4.sources.netsource.type = netcat
agent4.sources.netsource.bind = localhost
agent4.sources.netsource.port = 3000

agent4.sinks.hdfssink.type = hdfs
agent4.sinks.hdfssink.hdfs.path = /flume  //写出到HDFS上的文件目录,不需要提前创建
agent4.sinks.hdfssink.hdfs.filePrefix = log  //指定写出文件的文件名前缀
agent4.sinks.hdfssink.hdfs.rollInterval = 0
agent4.sinks.hdfssink.hdfs.rollCount = 3
agent4.sinks.hdfssink.hdfs.fileType = DataStream

agent4.channels.memorychannel.type = memory
agent4.channels.memorychannel.capacity = 1000
agent4.channels.memorychannel.transactionCapacity = 100

agent4.sources.netsource.channels = memorychannel
agent4.sinks.hdfssink.channel = memorychannel

    下面启动agent4以及查看一下结果。

下面查看一下结果。发现在HDFS中已经新建了一个flume文件夹,并且已经写入了指定的内容。

接着我们为文件夹名加一个时间戳。详细看如下配置代码。

agent5.sources = netsource
agent5.sinks = hdfssink
agent5.channels = memorychannel

agent5.sources.netsource.type = netcat
agent5.sources.netsource.bind = localhost
agent5.sources.netsource.port = 3000
agent5.sources.netsource.interceptors = ts
agent5.sources.netsource.interceptors.ts.type = org.apache.flume.interceptor.TimestampInterceptor$Builder //引用这个类方法添加时间戳

agent5.sinks.hdfssink.type = hdfs
agent5.sinks.hdfssink.hdfs.path = /flume-%Y-%m-%d //定义文件夹名格式
agent5.sinks.hdfssink.hdfs.filePrefix = log-
agent5.sinks.hdfssink.hdfs.rollInterval = 0
agent5.sinks.hdfssink.hdfs.rollCount = 3
agent5.sinks.hdfssink.hdfs.fileType = DataStream

agent5.channels.memorychannel.type = memory
agent5.channels.memorychannel.capacity = 1000
agent5.channels.memorychannel.transactionCapacity = 100

agent5.sources.netsource.channels = memorychannel
agent5.sinks.hdfssink.channel = memorychannel

  OK,下面启动agent5。

下面查看一下结果。可以看到文件夹的名字被如期地加上了日期。

OK,先到这里,还有两个稍微复杂一点点的agent下次再讨论。本人水平有限,请各位不吝指正!谢谢!

时间: 2024-08-09 19:52:41

Data Collection with Apache Flume(二)的相关文章

Data Collection with Apache Flume(三)

最后提及两个agent.首先第一个是使用一个avro souce和一个avro sink向另一个agent传递event,然后再写入特定目录. 先看看配置代码. agent6.sources = avrosource //定义avrosource,可以使用avro client在网络上向其传送数据 agent6.sinks = avrosink agent6.channels = memorychannel agent6.sources.avrosource.type = avro agent6

Data Collection with Apache Flume(一)

首先介绍一下Flume是个神马东东.Flume可以实现从多种数据源获取数据,然后传递到不同的目标路径.通常是利用Flume传送logs到不同的地方,例如从web server收集logs文件然后传送到hadoop cluster进行分析之类的.Flume配置灵活简单,可以实现不同情况的日志传送,确实是一款不错的工具. OK,接下来先看看怎么安装配置Flume.大家可以从http://flume.apache.org获取Flume的最新二进制版本.下载好后,解压,然后将flume的bin目录加到P

Apache Flume 安装文档、日志收集

简介: 官网 http://flume.apache.org 文档 https://flume.apache.org/FlumeUserGuide.html hadoop 生态系统中,flume 的职责是收集数据,一般用作收集各种日志数据. Source -> Channel -> Sink 这是一个基本的工作流程. Source 定义了数据从哪里来,Channel 是一个数据暂存的位置 ( disk / mem ),Sink 定义将数据流向哪里! 一.flume 安装 shell >

使用Apache Flume抓取数据(1)

使用Apache Flume抓取数据,怎么来抓取呢?不过,在了解这个问题之前,我们必须明确ApacheFlume是什么? 一.什么是Apache Flume Apache Flume是用于数据采集的高性能系统 ,名字来源于原始的近乎实时的日志数据采集工具,现在广泛用于任何流事件数据的采集,支持从很多数据源聚合数据到HDFS. 最初由Cloudera开发 ,在2011年贡献给了Apache基金会 ,在2012年变成了Apache的顶级项目,Flume OG升级换代成了Flume NG. Flume

Flafka: Apache Flume Meets Apache Kafka for Event Processing

The new integration between Flume and Kafka offers sub-second-latency event processing without the need for dedicated infrastructure. In this previous post you learned some Apache Kafka basics and explored a scenario for using Kafka in an online appl

org.apache.flume.conf.ConfigurationException: Channel c1 not in active set.

1 错误详细信息 WARN conf.FlumeConfiguration: Could not configure sink k1 due to: Channel c1 not in active set. org.apache.flume.conf.ConfigurationException: Channel c1 not in active set. at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.valida

sharepoint 2013 开启 Usage and Health Data Collection

Usage and Health Data Collection Monitoring the status of your farm's health is a critical aspect of SharePoint administration. This service application collects the various logging information stored in SharePoint and writes it to the logging databa

那些年踏过的Apache Flume之路

Flume作为日志采集系统,有着独特的应用和优势,那么Flume在实际的应用和实践中到底是怎样的呢?让我们一起踏上Flume之路. 1.  什么是Apache Flume (1)Apache Flume简单来讲是高性能.分布式的日志采集系统,和sqoop同属于数据采集系统组件,但是sqoop用来采集关系型数据库数据,而Flume用来采集流动型数据. (2)Flume名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数据的采集,它支持从很多数据源聚合数据到HDFS. (3)Flu

关于Core Data的一些整理(二)

关于Core Data的一些整理(二) 创建NSManagedObject的子类时,有一点是在这中间要强调的一点是,要不要勾选 Use scalar properties for primitive data types. 勾选上这个选项之后就是使用的是你在定义的时候使用的原始数据类型. 如果没有勾选的话,就会存在类型的转化,转换情况如下: String maps to String Integer 16/32/64, Float, Double and Boolean map to NSNum