6.Sink - 汇聚点

!!!1.Logger Sink

记录INFO级别的日志,通常用于调试。

属性说明:

!channel –

!type – The component type name, needs to be logger

maxBytesToLog 16 Maximum number of bytes of the Event body to log

要求必须在 --conf 参数指定的目录下有 log4j的配置文件

也可以通过-Dflume.root.logger=INFO,console在命令启动时手动指定log4j参数

案例:

参见入门案例

!!!2.File Roll Sink

在本地文件系统中存储事件。

每隔指定时长生成文件保存这段时间内收集到的日志信息。

属性说明:

!channel –

!type – 类型,必须是"file_roll"

!sink.directory – 文件被存储的目录

sink.rollInterval 30 滚动文件每隔30秒(应该是每隔30秒钟单独切割数据到一个文件的意思)。如果设置为0,则禁止滚动,从而导致所有数据被写入到一个文件中。

sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.

batchSize 100

案例:

编写配置文件:

#命名Agent a1的组件

a1.sources  =  r1

a1.sinks  =  k1

a1.channels  =  c1

#描述/配置Source

a1.sources.r1.type  = http

a1.sources.r1.port  = 6666

#描述Sink

a1.sinks.k1.type  = file_roll

a1.sinks.k1.directory = /home/park/work/apache-flume-1.6.0-bin/mysink

#描述内存Channel

a1.channels.c1.type  =  memory

a1.channels.c1.capacity  =  1000

a1.channels.c1.transactionCapacity  =  100

#为Channle绑定Source和Sink

a1.sources.r1.channels  =  c1

a1.sinks.k1.channel  =  c1

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template7.conf --name a1 -Dflume.root.logger=INFO,console

!!!3.Avro Sink

是实现多级流动 和 扇出流(1到多) 扇入流(多到1) 的基础。

属性说明:

!channel –

!type – The component type name, needs to be avro.

!hostname – The hostname or IP address to bind to.

!port – The port # to listen on.

batch-size 100 number of event to batch together for send.

connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.

request-timeout 20000 Amount of time (ms) to allow for requests after the first.

reset-connection-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource

compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression

ssl false Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.

trust-all-certs false If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection.

truststore – The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.

truststore-password – The password for the specified truststore.

truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type.

exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.

maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O w

案例1 - 多级流动

h2:

配置配置文件:

#命名Agent组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

#描述/配置Source

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=9988

#描述Sink

a1.sinks.k1.type=logger

#描述内存Channel

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=1000

#为Channel绑定Source和Sink

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console

h1:

配置配置文件

#命名Agent组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

#描述/配置Source

a1.sources.r1.type=http

a1.sources.r1.port=8888

#描述Sink

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=192.168.242.138

a1.sinks.k1.port=9988

#描述内存Channel

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=1000

#为Channel绑定Source和Sink

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console

发送http请求到h1:

curl -X POST -d ‘[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]‘ http://192.168.242.133:8888

稍等几秒后,发现h2最终收到了这条消息

案例2:扇出流 - 复制

h2 h3:

配置配置文件:

#命名Agent组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

#描述/配置Source

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=9988

#描述Sink

a1.sinks.k1.type=logger

#描述内存Channel

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=1000

#为Channel绑定Source和Sink

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console

h1:

配置配置文件

#命名Agent组件

a1.sources=r1

a1.sinks=k1 k2

a1.channels=c1 c2

#描述/配置Source

a1.sources.r1.type=http

a1.sources.r1.port=8888

#描述Sink

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=192.168.242.138

a1.sinks.k1.port=9988

a1.sinks.k2.type=avro

a1.sinks.k2.hostname=192.168.242.135

a1.sinks.k2.port=9988

#描述内存Channel

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=1000

a1.channels.c2.type=memory

a1.channels.c2.capacity=1000

a1.channels.c2.transactionCapacity=1000

#为Channel绑定Source和Sink

a1.sources.r1.channels=c1 c2

a1.sinks.k1.channel=c1

a1.sinks.k2.channel=c2

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console

案例3:扇出流 - 多路复用(路由)

h2 h3:

配置配置文件:

#命名Agent组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

#描述/配置Source

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=9988

#描述Sink

a1.sinks.k1.type=logger

#描述内存Channel

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=1000

#为Channel绑定Source和Sink

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console

h1:

配置配置文件

#配置Agent组件

a1.sources=r1

a1.sinks=k1 k2

a1.channels=c1 c2

#描述/配置Source

a1.sources.r1.type=http

a1.sources.r1.port=8888

a1.sources.r1.selector.type=multiplexing

a1.sources.r1.selector.header=flag

a1.sources.r1.selector.mapping.aaa=c1

a1.sources.r1.selector.mapping.bbb=c2

a1.sources.r1.selector.default=c1

#描述Sink

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=192.168.242.138

a1.sinks.k1.port=9988

a1.sinks.k2.type=avro

a1.sinks.k2.hostname=192.168.242.135

a1.sinks.k2.port=9988

#描述内存Channel

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=1000

a1.channels.c2.type=memory

a1.channels.c2.capacity=1000

a1.channels.c2.transactionCapacity=1000

#为Channel绑定Source和Sink

a1.sources.r1.channels=c1 c2

a1.sinks.k1.channel=c1

a1.sinks.k2.channel=c2

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console

发送http请求进行测试。发现可以实现路由效果

案例4:扇入流

m3:

编写配置文件:

#命名Agent组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

#描述/配置Source

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=4141

#描述Sink

a1.sinks.k1.type=logger

#描述内存Channel

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=1000

#为Channel绑定Source和Sink

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template.conf --name a1 -Dflume.root.logger=INFO,console

m1、m2:

编写配置文件:

#命名Agent组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

#描述/配置Source

a1.sources.r1.type=http

a1.sources.r1.port=8888

#描述Sink

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=192.168.242.135

a1.sinks.k1.port=4141

#描述内存Channel

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=1000

#为Channel绑定Source和Sink

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console

m1通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:

[[email protected] conf]# curl -X POST -d ‘[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]‘ http://0.0.0.0:8888

m2通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:

[[email protected] conf]# curl -X POST -d ‘[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]‘ http://0.0.0.0:8888

发现m3均能正确收到消息

!!!!4.HDFS Sink

此Sink将事件写入到Hadoop分布式文件系统HDFS中。

目前它支持创建文本文件和序列化文件。

对这两种格式都支持压缩。

这些文件可以分卷,按照指定的时间或数据量或事件的数量为基础。

它还通过类似时间戳或机器属性对数据进行 buckets/partitions 操作    It also buckets/partitions data by attributes like timestamp or machine where the event originated.

HDFS的目录路径可以包含将要由HDFS替换格式的转移序列用以生成存储事件的目录/文件名。

使用这个Sink要求haddop必须已经安装好,以便Flume可以通过hadoop提供的jar包与HDFS进行通信。

注意,此版本hadoop必须支持sync()调用。

属性说明:

!channel –

!type – 类型名称,必须是“HDFS”

!hdfs.path – HDFS 目录路径 (eg hdfs://namenode/flume/webdata/)

hdfs.filePrefix FlumeData Flume在目录下创建文件的名称前缀

hdfs.fileSuffix – 追加到文件的名称后缀 (eg .avro - 注: 日期时间不会自动添加)

hdfs.inUsePrefix – Flume正在处理的文件所加的前缀

hdfs.inUseSuffix .tmp Flume正在处理的文件所加的后缀

hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)

hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)

hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)

hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)

hdfs.batchSize 100 number of events written to file before it is flushed to HDFS

hdfs.codeC – Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy

hdfs.fileType SequenceFile File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC

hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.

hdfs.minBlockReplicas – Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.

hdfs.writeFormat – Format for sequence file records. One of “Text” or “Writable” (the default).

hdfs.callTimeout 10000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.

hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)

hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling

hdfs.kerberosPrincipal – Kerberos user principal for accessing secure HDFS

hdfs.kerberosKeytab – Kerberos keytab for accessing secure HDFS

hdfs.proxyUser

hdfs.round false 时间戳是否向下取整(如果是true,会影响所有基于时间的转移序列,除了%T)

hdfs.roundValue 1 舍值的边界值

hdfs.roundUnit 向下舍值的单位 -  second, minute , hour

hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.

hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.

hdfs.closeTries 0 Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.

hdfs.retryInterval 180 Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.

serializer TEXT Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.

案例:

编写配置文件:

#命名Agent组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

#描述/配置Source

a1.sources.r1.type=http

a1.sources.r1.port=8888

#描述Sink

a1.sinks.k1.type=hdfs

a1.sinks.k1.hdfs.path=hdfs://0.0.0.0:9000/ppp

#描述内存Channel

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=1000

#为Channel绑定Source和Sink

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console

!!!!5.Hive Sink
6.Custom Sink

自定义接收器,是自己实现的接收器接口Sink来实现的。

自定义接收器的类及其依赖类须在Flume启动前放置到Flume类加载目录下。

属性说明:

type – 类型,需要指定为自己实现的Sink类的全路径名

时间: 2024-07-28 15:48:38

6.Sink - 汇聚点的相关文章

痞子衡嵌入式:飞思卡尔i.MX RTyyyy系列MCU硬件那些事(2.2)- 在串行NOR Flash XIP调试原理

大家好,我是痞子衡,是正经搞技术的痞子.今天痞子衡给大家介绍的是飞思卡尔i.MX RTyyyy系列EVK在串行NOR Flash调试的原理. 本文是i.MXRT硬件那些事系列第二篇的续集,在第二篇首集中痞子衡给大家详细介绍了EVK板载调试器用法,有了调试器在手,从此调试不用愁.从调试代码所在目标存储器类别上来分,调试一般分为在SRAM调试和在Flash调试.在SRAM调试实现比较简单,程序直接从JTAG/SWD口灌进RAM即可:在Flash调试,则相对复杂一点,因为首先需要有Flash下载算法,

基础服务

.net 基础服务开源战略规划备忘录 公司现状 1. 技术人员水平限制: 基础研发人员技术细节,性能处理能力不足,技术视野不够开阔;甚至一些高可用,高性能方案的概念都未听闻,更别提发展方向和思路了,令人痛心. 2. 技术反馈渠道限制: 公司业务线暂不多,基础服务的应用面尚属狭窄:基础服务和镜像各种环境的适应性和性能不足以及时凸显暴露出来,框架bug和问题使用反馈周期太长,不足以快速跟进和改善基础框架. 3. 人员招聘渠道限制: 高技术人才未中长期储备, 各招聘渠道未能招募到合适的技术人员.临时招

Flume 入门--几种不同的Sources

1.flume概念 flume是分布式的,可靠的,高可用的,用于对不同来源的大量的日志数据进行有效收集.聚集和移动,并以集中式的数据存储的系统. flume目前是apache的一个顶级项目. flume需要java运行环境,要求java1.6以上,推荐java1.7. 将下载好的flume安装包解压到指定目录即可. 2.flume中的重要模型 2.1.1.flume Event: flume 事件,被定义为一个具有有效荷载的字节数据流和可选的字符串属性集. 2.1.2.flume Agent:

冲突域 广播域

一.概念理解: 1.冲突域(物理分段): 连接在同一导线上的所有工作站的集合,或者说是同一物理网段上所有节点的集合或以太网上竞争同一带宽的节点集合.这个域代表了冲突在其中发生并传播的区域,这个区域可以被认为是共享段.在OSI模型中,冲突域被看作是第一层的概念,连接同一冲突域的设备有Hub,Reperter或者其他进行简单复制信号的设备.也就是说,用Hub或者Repeater连接的所有节点可以被认为是在同一个冲突域内,它不会划分冲突域.而第二层设备(网桥,交换机)第三层设备(路由器)都可以划分冲突

3.概念、模型和特点

Flume Event - Flume 事件 - 被定义为一个具有有效荷载的字节数据流和可选的字符串属性集. Flume Agent- Flume - 代理 - 是一个进程承载从外部源事件流到下一个目的地的过程. Source - 数据源 - 消耗外部传递给他的事件,外部源将数据按照Flume Source 能识别的格式将Flume 事件发送给Flume Source Channel - 数据通道 -  是一个被动的存储,用来保持事件,直到由一个Flume Sink消耗. Sink - 数据汇聚

hihocoder 1343 : Stable Members【拓扑排序】

hihocoder #1343:题目 解释:一个学习小组,一共有N个学员,一个主管.每个学员都有自己的导师(一个或者多个),导师可以是其他学员也可以是主管.每周学员都要把自己的学习报告和收到的报告提交给自己的导师,这个团队设计很合理,没有回环(投递出去的报告不会回到自己手中),并且所有的报告最终都会投递到主管那里.但这个团队中有的学员会因为其他某个学员不工作而导致报告无法提交到主管手中,我们称这种学员为不可靠的.而不受某个学员不工作而影响到报告提交的就是可靠学员.问题就是找出可靠学员的数量. 输

运维网络基础

对于系统运维人员,CCNA的学习难度不是很大,而且学习的周期较短,对于快速成为一个系统运维工程师中,网络搞的最好的,就足够了.如果时间精力有限,建议学习一下知识,作为步入网络知识的基石. 网络基础知识: n 涉及到网络的发展历程,网络的一些名词概念,路由,交换介绍 n OSI七层模型的介绍 n TCP/IP协议簇的简介,包含TCP/IP三次握手和四次挥手的过程 n VLSM可变长子网的概念 第1章 初识网络 网络的知识体系是一个庞大的体系,涉及到路由交换.安全.无线.语言.数据中心等多个方面,所

H3C组播系列之IP组播概述

一.组播的基本介绍 组播指发送源将产生的单一IP数据包通过网络发送给一组特定接收者的网络传输方式.组播结合了单播和广播的优点,在进行点到多点传输时,发送源不需要关心接受者的数目,仅需要发送一份报文:路由器仅关心接口下是否有接收者,同样不需要关心接收者的数量,所以在路由器之间的链路上也仅传送一份报告. 和单播相比,组播减轻了发送源的负担,并且提高了链路的有效利用率.此外,发送源可以同时发送报文给多个接收者,可以满足低延时应用的需求. 和广播相比,组播方式下路由器仅在有接收者的接口复制报文,报文最终

ISP Lab Multicast

ISP Lab Multicast 1- IGMP 1-1 IGMP基本配置 R3(config)#int f0/0 R3(config-if)#ip ospf 110 a 0 R3(config)#ip multicast-routing R3(config)#int f0/0 R3(config-if)#ip pim sparse-mode \\使得路由器可以接受client的igmp信息 R3#sh ip igmp interface f 0/0 FastEthernet0/0 is up