日志收集框架 Flume 组件之Source使用

上一篇简单介绍了Flume几个组件,今天介绍下组件其一的source,整理这些,也是二次学习的过程,也是梳理知识的过程。

Source 中文译为来源,源
作用:采集数据,然后把数据传输到channel上。
例如:监控某个文件或者某个端口或某个目录,新增数据,新增文件的变化,然后传输到channel。

常用的的source类型,也是平常用的比较多的几种类型,如下:

source类型 说明
Avro Source 支持avro协议,内置支持
Thrift Source 支持Thirft rpc ,内置支持
Exec Source 基于Unix的command在标准输出上采集数据 ,如tail -F
JMS Source 监控JMS系统,比如Activemq,可以
Taildir Source 监听目录或文件(Flume1.8版本支持)
Spooling Directory Source 监听目录下的新增文件
Kafka Source 读取Kafka数据

下面不多少,简单实战,没安装的可以google一下,好多安装教程,本文是基于Flume 1.8

Exec Source,前面说过了,exec source 是以tail -F 形式来监听文件的变化的,
flume-exec.conf配置:

#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # Describe/configure the source
# 配置类型为exec
a1.sources.r1.type = exec
# 路径是自己要监听的日志路径
a1.sources.r1.command = tail -F /usr/local/installed/tomcat/logs/system_app.log
a1.sources.r1.channels = c1

# # Describe the sink
# 下沉sink是以日志的形式来打印
a1.sinks.k1.type = logger

# # Use a channel which buffers events in memory
# channel采用以内存形式来存放上游source传递过来的数据
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

具体使用步骤:
1、启动
进入到flume安装目录,../bin下,命令如下:

./bin/flume-ng agent -n a1 -c ../conf/ -f ../conf/flume-exec.conf

缺点:agent挂了,则不会记录上次传递数据的位置,还是以tail -F为准,来重新传递数据。

Taildir Source 监听目录文件变化,记录上一次同步后的位置,实现断点续传,可以保证没有重复数据的读取。

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# # Describe/configure the source
a1.sources.r1.type = TAILDIR
# 保存监听文件的读取位置的文件
a1.sources.r1.positionFile = /opt/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /usr/local/installed/tomcat/logs/system_app.log
a1.sources.r1.batchSize = 100
a1.sources.r1.backoffSleepIncrement = 1000
a1.sources.r1.maxBackoffSleep = 5000

#
# # Describe the sink
a1.sinks.k1.type = logger
#
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
# # Bind the source and sink to the channel
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1

具体测试,可以往监听的文件里写入数据,看看是否可以监听到数据。

Spooling Directory Source 监听目录文件的变化,
flume-spooling.conf 配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/self
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.fileSuffix = completed
a1.sources.r1.batchSize = 100

# # Describe the sink
a1.sinks.k1.type = logger
#
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
# # Bind the source and sink to the channel
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1

备注:注意,只监听新增的文件,这个目录下有新增文件会被监听到。目录下子文件夹也不会被监听到,目录下以有的文件更新了,也不会被监听到。

Avro Source配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 44444
#
# # Describe the sink
a1.sinks.k1.type = logger
#
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
# # Bind the source and sink to the channel
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1

备注:此文件是监听127.0.0.1:44444端口的数据变化,可以telnet 127.0.0.1:44444,输入数据,看flume是否监听到数据。

其它的一些类型,可自行测试。
./bin/flume-ng agent -n a1 -c ../conf/ -f ../conf/flume-exec.conf
由于本文是在bin目录下启动的,没有更改flume产生日志的位置,所以会在/bin/logs/ 会有flume日志产生。

测试的时候,自己开一个窗口,监控日志的变化,由于本文是采用以log日志的形式输出,所以用这个命令tail -f ./bin/logs/flume.log 可以看到是否配置成功。



连接:
flume 概念以及模型简介地址:
日志收集框架 Flume 简介
http://blog.51cto.com/shangdc/2178127

原文地址:http://blog.51cto.com/shangdc/2286265

时间: 2024-07-29 16:45:56

日志收集框架 Flume 组件之Source使用的相关文章

日志采集框架Flume的安装及使用

日志采集框架Flume的安装及使用 1.Flume介绍 1.1.Flume概述 Flume是一个分布式.可靠.和高可用(旧版Flume og才有高可用)的海量日志采集.传输和聚合的系统. Flume可以采集文件,socket数据包等各种形式源数据, 又可以将采集到的数据输出到HDFS.hbase.hive.kafka等众多外部存储系统中 一般的采集需求,通过对flume的简单配置即可实现 Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景 1.2

日志收集框架 Exceptionless

日志收集框架 Exceptionless 前言 从去年就答应过Eric(Exceptionless的作者之一),在中国会帮助给 Exceptionless 做推广,但是由于各种原因一直没有做这件事情,在此对Eric表示歉意.:) Exceptionless 简介 Exceptionless 是一个开源的实时的日志收集框架,它可以应用在基于 ASP.NET,ASP.NET Core,Web Api,Web Forms,WPF,Console,MVC 等技术栈的应用程序中,并且提供了Rest接口可以

在.NET Core中使用Exceptionless分布式日志收集框架

一.Exceptionless简介 Exceptionless 是一个开源的实时的日志收集框架,它可以应用在基于 ASP.NET,ASP.NET Core,Web Api,Web Forms,WPF,Console,MVC 等技术栈的应用程序中,并且提供了Rest接口可以应用在 Javascript,Node.js 中.它将日志收集变得简单易用并且不需要了解太多的相关技术细节及配置.在以前,我们做日志收集大多使用 Log4net,Nlog 等框架,在应用程序变得复杂并且集群的时候,可能传统的方式

asp.Net Core免费开源分布式异常日志收集框架Exceptionless安装配置以及简单使用图文教程

原文:asp.Net Core免费开源分布式异常日志收集框架Exceptionless安装配置以及简单使用图文教程 最近在学习张善友老师的NanoFabric 框架的时了解到Exceptionless : https://exceptionless.com/ !因此学习了一下这个开源框架!下面对Exceptionless的学习做下笔记! Exceptionless是什么?能做什么呢? “Exceptionless”这个词的定义是:没有异常.Exceptionless可以为您的ASP.NET.We

日志收集系统Flume调研笔记第1篇 - Flume简介

用户行为数据的收集无疑是构建推荐系统的先决条件,而Apache基金会下的Flume项目正是为分布式的日志收集量身打造的,本文是Flume调研笔记的第1篇,主要介绍Flume的基本架构,下篇笔记将会以实例说明Flume的部署和使用步骤. 本文所用的Flume版本为目前最新版的ver1.5.2,它属于Flume-NG,在系统架构上与Flume-OG有所区别,二者的不同可以参考FlumeWiki文档的说明. 1. Flume是什么 Flume是Apache基金会下的一个开源项目,它实现了一套分布式的.

日志收集系统Flume及其应用

Apache Flume概述 Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统.Flume 支持定制各类数据发送方,用于收集各类型数据:同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力.一般的采集需求,通过对 flume 的简单配置即可实现.针对特殊场景也具备良好的自定义扩展能力.因此,flume 可以适用于大部分的日常数据采集场景. 当前 Flume 有两个版本.Flume 0.9X 版本的统称 Flume O

flume组件汇总 source、sink、channel

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

日志收集系统Flume调研笔记第2篇 - Flume配置及使用实例

上篇笔记对Flume的使用场景和系统架构做了介绍,本篇笔记以实例说明Flume的配置方法.下面开始正文. 1. Flume使用实例 1.1 配置 Flume agent的3个组件及其拓扑关系是在配置文件中指定的,总的原则是必须列出source/channel/sink的name/type等重要的配置项,并通过channel将source(s)和sink(s)连接起来,此外,1个source可以指定多个channel,而1个sink只能接收来自1个channel的数据. 这里给出的是部署1套含1个

日志采集框架Flume

概述 Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS.hbase.hive.kafka等众多外部存储系统中 一般的采集需求,通过对flume的简单配置即可实现 Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景 运行机制 1. Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形