Flume Source

转自:http://www.cnblogs.com/zhangmiao-chp/archive/2011/05/18/2050465.html

近期学习Flume相关内容,收集的一些资料

flume 参考文档 (三)

Flume Source

   

   

1、Flume’s Tiered Event Sources


collectorSource[(port)]


Collector source,监听端口汇聚数据


autoCollectorSource


通过master协调物理节点自动汇聚数据


logicalSource


逻辑source,由master分配端口并监听rpcSink

 

2、Flume’s Basic Sources


null

 

console


监听用户编辑历史和快捷键输入,只在node_nowatch模式下可用


stdin


监听标准输入,只在node_nowatch模式下可用,每行将作为一个event source


rpcSource(port)


由rpc框架(thrift/avro)监听tcp端口


text("filename")


一次性读取一个文本,每行为一个event


tail("filename"[,startFromEnd=false])


每行为一个event。监听文件尾部的追加行,如果startFromEnd为true,tail将从文件尾读取,如果为false,tail将从文件开始读取全部数据


multitail("filename"[, file2[,file3… ] ])


同上,同时监听多个文件的末尾


tailDir("dirname"[, fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]])


监听目录中的文件末尾,使用正则去选定需要监听的文件(不包含目录),recurseDepth为递归监听其下子目录的深度


seqfile("filename")


监听hdfs的sequencefile,全路径


syslogUdp(port)


监听Udp端口


syslogTcp(port)


监听Tcp端口


syslogTcp1(port)


只监听Tcp端口的一个链接


execPeriodic("cmdline", ms)


周期执行指令,监听指令的输出,整个输出都被作为一个event


execStream("cmdline")


执行指令,监听指令的输出,输出的每一行被作为一个event


exec("cmdline"[,aggregate=false[,restart=false[,period=0]]])


执行指令,监听指令的输出,aggregate如果为true,整个输出作为一个event如果为false,则每行作为一个event。如果restart为true,则按period为周期重新运行


synth(msgCount,msgSize)


随即产生字符串event,msgCount为产生数量,msgSize为串长度


synthrndsize(msgCount,minSize,maxSize)


同上,minSize – maxSize


nonlsynth(msgCount,msgSize)

 

asciisynth(msgCount,msgSize)


Ascii码字符


twitter("username","pw"[,"url"])


尼玛twitter的插件啊


irc("server",port, "nick","chan")

 

scribe[(+port)]


Scribe插件


report[(periodMillis)]


生成所有physical node报告为事件源

时间: 2024-07-31 09:04:52

Flume Source的相关文章

Flume Source 实例

Avro Source 监听avro端口,接收外部avro客户端数据流.跟前面的agent的Avro Sink可以组成多层拓扑结构. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 a1.sources=s1 a1.sinks=k1 a1.channels=c1    a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1    a1.sources.s1.type=avro a1.sources.s1.bind=vm1 a

<Flume><Source Code><Flume源码阅读笔记>

Overview source采集的日志首先会传入ChannelProcessor, 在其内首先会通过Interceptors进行过滤加工,然后通过ChannelSelector选择channel. Source和Sink之间是异步的,sink只需要监听自己关系的Channel的变化即可. sink存在写失败的情况,flume提供了如下策略: 默认是一个sink,若写入失败,则该事务失败,稍后重试. 故障转移策略:给多个sink定义优先级,失败时会路由到下一个优先级的sink.sink只要抛出一

Flume(3)source组件之NetcatSource使用介绍

一.概述: 本节首先提供一个基于netcat的source+channel(memory)+sink(logger)的数据传输过程.然后剖析一下NetcatSource中的代码执行逻辑. 二.flume配置文件: 下面的配置文件netcat.conf中定义了source使用netcat,它会监听44444端口. # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe

Flume NG 学习笔记(四)Source配置

首先.这节水的东西就比较少了,大部分是例子. 一.Avro Source与Thrift Source Avro端口监听并接收来自外部的Avro客户流的事件.当内置Avro 去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构.官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接 下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了. Property Name Default Description channel

Hadoop实战-Flume之自定义Source(十八)

import java.nio.charset.Charset; import java.util.HashMap; import java.util.Random; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; im

flume 使用 spool source的时候字符集出错

1. 错误所在 2016-04-21 02:23:05,508 (pool-3-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source source1: { spoolDir: /home/hadoop_admin/movielog/ }: Unca

flume高并发优化——(8)多文件source扩展断点续传

在很多情况下,我们为了不丢失数据,一般都会为数据收集端扩展断点续传,而随着公司日志系统的完善,我们在原有的基础上开发了断点续传的功能,以下是思路,大家共同讨论: 核心流程图: 源码: /* * 作者:许恕 * 时间:2016年5月3日 * 功能:实现tail 某目录下的所有符合正则条件的文件 * Email:[email protected] * To detect all files in a folder */ package org.apache.flume.source; import

【Apache Flume系列】Flume-ng案例分享及source编码格式问题

转载请注明源地址:http://blog.csdn.net/weijonathan/article/details/41749151 最近忙于在整一个客户的流式抽取的方案,结果遇到了很多问题:主要还是编码问题:先说下场景 场景: 用户生成每一个小时的开始生成一个日志文件,不停的往日志文件中写入.而我这块则是实时读取客户的日志文件然后解析入库: 这里我们选择的方案还是以前的由flume来读取:然后写入kafka,最后到storm中进行解析到最后入库: 这一个流程方案大家应该都比较熟悉了.也不用我在

flume组件汇总 source、sink、channel

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载