flume sink processor

sink groups使多个不同的sink组成一个整体,而sink processor提供了组内负载均衡和故障转移的功能。

有三种sink processor :default sink processor,failover sink processor,Load balancing Sink Processor。

default sink processor

一般的单独的sink

failover sink processor

维护了一个sinks的优先级列表,保证只要有一个sink事件就可以被处理(即故障转移)。

sink优先级高的会被优先激活,若没有设置优先级则按照snk被声明的顺序来决定优先级。

设置如下:

inks 多个sink用空格分开。
processor.type default 组件的名称,必须是:failover
processor.priority.<sinkName> 优先级值。<sinkName> 必须是sinks中有定义的。优先级值高Sink会更早被激活。值越大,优先级越高。
:多个sinks的话,优先级的值不要相同,如果优先级相同的话,只会有一个生效。且failover时,同优先级的不会Failover,就算是同优先级的还存在也会报All sinks failed to process。
processor.maxpenalty 30000 失败的Sink最大的退避时间(单位:毫秒)(退避算法(退避算法为我们在解决重试某项任务的时候,提供了一个比较好的等待思想。),参考:http://qiuqiang1985.iteye.com/blog/1513049)

示例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

Load balancing Sink Processor

提供了多个sinks负载均衡的能力。它维护了一个active sinks的索引列表,列表中fenb的sinks的负载必须是分布式的。

通过round_robin (轮询)或 random(随机)选择机制实现了分布式负载。选择机制默认为round_robin ,也可通过设置重载。自定义选举类须继承AbstractSinkSelector。

当被调用时,选择器根据配置文件的选择机制挑选下一个sink,并且调用该sink。如果所选的Sink传递Event失败,则通过选择机制挑选下一个可用的Sink,以此类推。失效的sink不会被加入到黑名单里,选择器会继续尝试所有可用的sink。所有的被调用的sink都失败后,选择器才会把失败发送给sink runner。

要是backoff属性可用,sink processor会把失败的sink拉黑,并移除在给定的超时时间内把它们从选择中移除。

设置如下:

processor.sinks 多个sink用空格分开。
processor.type default 组件的名称,必须是:load_balance
processor.backoff false 是否以指数的形式退避失败的Sinks。
processor.selector round_robin 选择机制。必须是round_robin,random或者自定义的类,该类继承了AbstractSinkSelector
processor.selector.maxTimeOut 30000
默认是30000毫秒,屏蔽故障sink的时间

示例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
时间: 2024-10-10 12:49:11

flume sink processor的相关文章

Flume sink 相关内容

SinkRunner.java 开启线程调用相应的Processor(Policy) , 根据  Policy调用process的返回值来决定线程睡眠时间,每次默认延后1s,最大默认为5s. public class SinkRunner implements LifecycleAware { private static final Logger logger = LoggerFactory .getLogger(SinkRunner.class); private static final

自定义Flume Sink:ElasticSearch Sink

Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中.Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner的对象,SinkRunner.start()方法会启动一个新的线程去管理每一个Sink的生命周期.每一个Sink需要实现start().Stop()和process()方法.你可以在start方法中去初始化Sink的参数和状态,在stop方法中清理Sink的资源.最关键的是process方法,它将处

找不到org.apache.spark.streaming.flume.sink.SparkFlumeProtocol$Callback

java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/sink/SparkFlumeProtocol$Callback at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:84) at org.apache.spark.streaming.flume

flume sink运行过程简单分析

没有运行,直接看源码得到sink简单运行过程 SinkRunner负责运行sink程序 内部类 PollingRunner implements Runnable {  private SinkProcessor policy; } 负责运行sink run方法 while (!shouldStop.get()) { try { if (policy.process().equals(Sink.Status.BACKOFF)) { counterGroup.incrementAndGet("ru

8.Processor

1.概述 Sink Group允许用户将多个Sink组合成一个实体. Flume Sink Processor 可以通过切换组内Sink用来实现负载均衡的效果,或在一个Sink故障时切换到另一个Sink. sinks – 用空格分隔的Sink集合 processor.type default 类型名称,必须是 default.failover 或 load_balance 2.Default Sink Processor Default Sink Processor 只接受一个 Sink. 不要

Sink Prosessor - Flume的可靠性保证:故障转移、负载均衡

Flume的一些组件(如Spooling Directory Source.File Channel)能够保证agent挂掉后不丢失数据. 1.负载均衡 1)Load balancing Sink Processor source里的event流经channel,进入sink组,在sink组内部根据负载算法(round_robin.random)选择sink,后续可以选择不同机器上的agent实现负载均衡. 实例如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16

flume组件汇总 source、sink、channel

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

【原创】大数据基础之Flume(2)Sink代码解析

flume sink核心类结构 1 核心接口Sink org.apache.flume.Sink /** * <p>Requests the sink to attempt to consume data from attached channel</p> * <p><strong>Note</strong>: This method should be consuming from the channel * within the bounds

Hadoop实战-Flume之自定义Sink(十九)

import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeli