【Flume】flume中通道channel的简单分析梳理

Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.

通道就是事件暂存的地方,source负责往通道中添加event,sink负责从通道中移出event

flume1.5.2内置的通道有:内存,文件,jdbc

1、内存通道memory-channel

时间存储在内存队列中,对于性能要求高且能接受agent失败时数据丢失的情况是很好的选择

capacity:默认该通道中最大的可以存储的event数量是100,

trasactionCapacity:每次最大可以source中拿到或者送到sink中的event数量也是100

keep-alive:event添加到通道中或者移出的允许时间

byte**:即event的字节量的限制,只包括eventbody

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

该通道的最大缺陷就是数据会丢失

2、JDBC通道JDBC-channel

事件是持久化存储在数据库中,该通道目前支持flume1.5.2内置的derby数据库。

这是一个耐用的通道,对于可恢复性要求高的情况是很理想的。

当然derby这种数据库对你来说肯定不适合,互联网公司现在都是mysql,所以想要将jdbc-channel和mysql进行结合的话,就需要使用者结合自身情况进行定制化的开发了。

3、文件通道File-channel

By default the File Channel uses paths for checkpoint and data directories that are within the user home as specified
above. As a result if you have more than one File Channel instances active within the agent, only one will be able to lock the directories and cause the other channel initialization to fail. It is therefore necessary that you provide explicit paths to all
the configured channels, preferably on different disks. Furthermore, as file channel will sync to disk after every commit, coupling it with a sink/source that batches events together may be necessary to provide good performance where multiple disks are not
available for checkpoint and data directories.

自然是将通道数据同步到磁盘上,性能就会下降,但是添加了检查点机制,防止数据丢失。

针对变形的内存通道,也就是内存通道和文件通道结合使用的,我们在此不进行讲解,因为这种混合使用,官方也给出提示——不建议在生产环境使用。

原因还是没有解决数据丢失的问题,或者一旦线上出现问题,排查问题又更加复杂了。

时间: 2024-10-16 05:11:19

【Flume】flume中通道channel的简单分析梳理的相关文章

WebRTC中的AppRTCDemo.apk简单分析

以前一直在QQ空间记录一些简单的关于webrtc的笔记.博说不如发布CSDN,想想也可以,解决了一些小问题,也可以帮助一下其它碰到该同样问题的人. 上周试着将WebRTC中的PeerConnection_client进行改写,拿 掉了PeerConnection_Server端,改用openfire服务器,信令采用xmpp,主要代码来源于call.主要还是将http请求用xmpp重写了,原理上很简单,合并后,效果还可以. 现在开始看android版本的peerconnection,也就是标题的A

Java动态替换InetAddress中DNS的做法简单分析1

在java.net包描述中, 简要说明了一些关键的接口. 其中负责networking identifiers的是Addresses. 这个类的具体实现类是InetAddress, 底层封装了Inet4Address与Inet6Address的异同, 可以看成一个Facade工具类. A Low Level API, which deals with the following abstractions: Addresses, which are networking identifiers,

java.util.ComparableTimSort中的sort()方法简单分析

TimSort算法是一种起源于归并排序和插入排序的混合排序算法,设计初衷是为了在真实世界中的各种数据中能够有较好的性能. 该算法最初是由Tim Peters于2002年在Python语言中提出的. TimSort 是一个归并排序做了大量优化的版本号. 对归并排序排在已经反向排好序的输入时表现O(n2)的特点做了特别优化.对已经正向排好序的输入降低回溯.对两种情况混合(一会升序.一会降序)的输入处理比較好. 在jdk1.7之后.Arrays类中的sort方法有一个分支推断,当LegacyMerge

Java动态替换InetAddress中DNS的做法简单分析2

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.HttpURLConnection; import java.net.InetAddress;

【Java】【Flume】Flume-NG启动过程源码分析(一)

从bin/flume 这个shell脚本可以看到Flume的起始于org.apache.flume.node.Application类,这是flume的main函数所在. main方法首先会先解析shell命令,如果指定的配置文件不存在就甩出异常. 根据命令中含有"no-reload-conf"参数,决定采用那种加载配置文件方式:一.没有此参数,会动态加载配置文件,默认每30秒加载一次配置文件,因此可以动态修改配置文件:二.有此参数,则只在启动时加载一次配置文件.实现动态加载功能采用了

【Java】【Flume】Flume-NG启动过程源码分析(二)

本节分析配置文件的解析,即PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run中的eventBus.post(getConfiguration()).分析getConfiguration()方法.此方法在AbstractConfigurationProvider类中实现了,并且这个类也初始化了三大组件的工厂类:this.sourceFactory = new DefaultSourceFactory();this.s

【Java】【Flume】Flume-NG启动过程源码分析(三)

本篇分析加载配置文件后各个组件是如何运行的? 加载完配置文件订阅者Application类会收到订阅信息执行: @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); } MaterializedConfiguration conf就是getConfiguration()

rocketmq中的NettyRemotingClient类的简单分析

rocketmq中的NettyRemotingClient类的简单分析 Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_

Collections中sort()方法源代码的简单分析

Collections的sort方法代码: public static <T> void sort(List<T> list, Comparator<? super T> c) { Object[] a = list.toArray(); Arrays.sort(a, (Comparator)c); ListIterator i = list.listIterator(); for (int j=0; j<a.length; j++) { i.next(); i.