Flume的Source、Sink总结,及常用使用场景

数据源Source

RPC异构流数据交换

  • Avro Source
  • Thrift Source

文件或目录变化监听

  • Exec Source
  • Spooling Directory Source
  • Taildir Source

MQ或队列订阅数据持续监听

  • JMS Source
  • SSL and JMS Source
  • Kafka Source

Network类数据交换

  • NetCat TCP Source
  • NetCat UDP Source
  • HTTP Source
  • Syslog Sources
  • Syslog TCP Source
  • Multiport Syslog TCP Source
  • Syslog UDP Source

定制源

  • Custom Source

Sink

  • HDFS Sink
  • Hive Sink
  • Logger Sink
  • Avro Sink
  • Thrift Sink
  • IRC Sink
  • File Roll Sink
  • HBaseSinks
  • HBaseSink
  • HBase2Sink
  • AsyncHBaseSink
  • MorphlineSolrSink
  • ElasticSearchSink
  • Kite Dataset Sink
  • Kafka Sink
  • HTTP Sink
  • Custom Sink

案例

1、监听文件变化 

exec-memory-logger.properties

#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1

#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.1.103
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1

#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

启动

flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/exec-memory-logger.properties --name a1 -Dflume.root.logger=INFO,console

测试

echo "asfsafsf" >> /tmp/log.txt

2、TCP NetCat监听

netcat.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/netcat.properties --name a1 -Dflume.root.logger=INFO,console

测试

telnet localhost 44444

3、Kafka读、写 (读:从kafka到log,写:从file到kafka)

read-kafka.properties 、write-kafka.properties

#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1  

#配置sources属性
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.channels = c1
a1.sources.s1.batchSize = 5000
a1.sources.s1.batchDurationMillis = 2000
a1.sources.s1.kafka.bootstrap.servers = 192.168.1.103:9092
a1.sources.s1.kafka.topics = test1
a1.sources.s1.kafka.consumer.group.id = custom.g.id

#将sources与channels进行绑定
a1.sources.s1.channels = c1

#配置sink
a1.sinks.k1.type = logger

#将sinks与channels进行绑定
a1.sinks.k1.channel = c1  

#配置channel类型
a1.channels.c1.type = memory
a1.sources = s1
a1.channels = c1
a1.sinks = k1                                                                                         

a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /tmp/kafka.log
a1.sources.s1.channels=c1 

#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka地址
a1.sinks.k1.brokerList=192.168.1.103:9092
#设置发送到Kafka上的主题
a1.sinks.k1.topic=test1
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.channel=c1     

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100   

启动

flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/read-kafka.properties --name a1 -Dflume.root.logger=INFO,console

flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/write-kafka.properties --name a1 -Dflume.root.logger=INFO,console

测试

# 创建用于测试主题
bin/kafka-topics.sh --create                     --bootstrap-server 192.168.1.103:9092                     --replication-factor 1                     --partitions 1                      --topic test1
# 启动 Producer,用于发送测试数据:
bin/kafka-console-producer.sh --broker-list 192.168.1.103:9092 --topic test1

4、定制源

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

5、HDFS Sink

spooling-memory-hdfs.properties ,监听目录变化,将新建的文件传到HDFS

#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1  

#配置sources属性
a1.sources.s1.type =spooldir
a1.sources.s1.spoolDir =/tmp/log2
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName
#将sources与channels进行绑定
a1.sources.s1.channels =c1 

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#将sinks与channels进行绑定
a1.sinks.k1.channel = c1

#配置channel类型
a1.channels.c1.type = memory

测试

hdfs dfs -ls /flume/events/19-11-21/15

6、Hive Sink

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = ‘\t‘
a1.sinks.k1.serializer.fieldnames =id,,msg

7、Avro Source、Avro Sink

exec-memory-avro.properties、avro-memory-log.properties

#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1

#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.1.103
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1

#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#指定agent的sources,sinks,channels
a2.sources = s2
a2.sinks = k2
a2.channels = c2

#配置sources属性
a2.sources.s2.type = avro
a2.sources.s2.bind = 192.168.1.103
a2.sources.s2.port = 8888

#将sources与channels进行绑定
a2.sources.s2.channels = c2

#配置sink
a2.sinks.k2.type = logger

#将sinks与channels进行绑定
a2.sinks.k2.channel = c2

#配置channel类型
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

启动

先
flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/avro-memory-log.properties --name a2 -Dflume.root.logger=INFO,console
后
flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/exec-memory-avro.properties --name a1 -Dflume.root.logger=INFO,console

测试,使用一个Avro客户端发送数据

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.api.SecureRpcClientFactory;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClient;
import java.nio.charset.Charset;
import java.util.Properties;

public class MyApp {
  public static void main(String[] args) {
    MySecureRpcClientFacade client = new MySecureRpcClientFacade();
    // Initialize client with the remote Flume agent‘s host, port
    Properties props = new Properties();
    props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
    props.setProperty("hosts", "h1");
    props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(8888));

    // Initialize client with the kerberos authentication related properties
    props.setProperty("kerberos", "true");
    props.setProperty("client-principal", "flumeclient/[email protected]");
    props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
    props.setProperty("server-principal", "flume/[email protected]");
    client.init(props);

    // Send 10 events to the remote Flume agent. That agent should be
    // configured to listen with an AvroSource.
    String sampleData = "Hello Flume!";
    for (int i = 0; i < 10; i++) {
      client.sendDataToFlume(sampleData);
    }

    client.cleanUp();
  }
}

class MySecureRpcClientFacade {
  private RpcClient client;
  private Properties properties;

  public void init(Properties properties) {
    // Setup the RPC connection
    this.properties = properties;
    // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
    this.client = SecureRpcClientFactory.getThriftInstance(properties);
  }

  public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    try {
      client.append(event);
    } catch (EventDeliveryException e) {
      // clean up and recreate the client
      client.close();
      client = null;
      client = SecureRpcClientFactory.getThriftInstance(properties);
    }
  }

  public void cleanUp() {
    // Close the RPC connection
    client.close();
  }
}

8、Elasticsearch Sink

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

9、定制Source、Sink开发

public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }
}
public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source‘s associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}

原文地址:https://www.cnblogs.com/starcrm/p/11909979.html

时间: 2024-08-29 01:44:20

Flume的Source、Sink总结,及常用使用场景的相关文章

Flume -- 初识flume、source和sink

Flume – 初识flume.source和sink 目录基本概念常用源 Source常用sink 基本概念 ? 什么叫flume? 分布式,可靠的大量日志收集.聚合和移动工具. ? events 事件,是一行数据的字节数据,是flume发送文件的基本单位. ? flume配置文件 重命名flume-env.sh.template为flume-env.sh,并添加[export JAVA_HOME=/soft/jdk] ? flume的Agent source //从哪儿读数据. 负责监控并收

Flume内置channel,source,sink汇总

由于经常会使用到Flume的一些channel,source,sink,于是为了方便将这些channel,source,sink汇总出来,也共大家访问. Component Interface Type Alias Implementation Class *.Channel memory *.channel.MemoryChannel *.Channel jdbc *.channel.jdbc.JdbcChannel *.Channel file *.channel.file.FileChan

【Flume】从入口Application来分析flume的source和sink是如何与channel交互的

大家在启动flume的时候,输入的命令就可以看出flume的启动入口了 [[email protected] apache-flume-1.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1 Info: Sourcing environment configuration script /home/flume/apache-flume-1.5.2-bin/conf/flume-env.sh + exec /home/

flume 自定义 hbase sink 类

参考(向原作者致敬) http://ydt619.blog.51cto.com/316163/1230586 https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase flume 1.5 的配置文件示例 #Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the

Flume Avro Source 远程连接拒绝的解决办法

昨天做了一个Java连接虚拟机,实现Flume Avro Source 的远程连接,确报了一个这样的错,经过了一晚上,终于找到了解决的方案. 我来给大家分享一下! 报错如下: Exception in thread "main" org.apache.flume.FlumeException:NettyAvroRpcClient{ host:xxxx,port:xxxx}:RPC connection error 解决的办法是: 把配置文件中a1.sources.r1.bind必须设置

C++ Thrift Client 与 Flume Thrift Source 对接

项目需要C++代码与flume对接,进而将日志写入HDFS.flume原生为java代码,原先的解决方案是通过JNI调用flume java方法.但是由于一来对jni的调用效率的担心,二来C++调用JNI需要照顾local reference和GC的问题,被搞得头痛了.一怒之下,重写代码了,使用C++与远端的JAVA Flume对接. 在协议的选择上,AVRO C++虽然也有apache的开源项目,但是目前只支持读写文件,而不能使用RPC.故使用了thrift与远端Flume thrift so

泛函编程(36)-泛函Stream IO:IO数据源-IO Source &amp; Sink

上期我们讨论了IO处理过程:Process[I,O].我们说Process就像电视信号盒子一样有输入端和输出端两头.Process之间可以用一个Process的输出端与另一个Process的输入端连接起来形成一串具备多项数据处理功能的完整IO过程.但合成的IO过程两头输入端则需要接到一个数据源,而另外一端则可能会接到一个数据接收设备如文件.显示屏等.我们在这篇简单地先介绍一下IO数据源Source和IO数据接收端Sink. 我们先用一个独立的数据类型来代表数据源Source进行简单的示范说明,这

Redis的Python实践,以及四中常用应用场景详解——学习董伟明老师的《Python Web开发实践》

首先,简单介绍:Redis是一个基于内存的键值对存储系统,常用作数据库.缓存和消息代理. 支持:字符串,字典,列表,集合,有序集合,位图(bitmaps),地理位置,HyperLogLog等多种数据结构. 支持事务.分片.主从复之.支持RDB(内存数据保存的文件)和AOF(类似于MySQL的binlog)两种持久化方式.3.0加入订阅分发.Lua脚本.集群等特性. 命令参考:http://doc.redisfans.com 中文官网:http://www.redis.net.cn 安装(都大同小

CDN 边缘微信h5斗牛开发规则,三秒部署、支持定制、即时生效,多种规则覆盖常用业务场景

CDN 边缘规则,三秒部署.支持定制.微信h5斗牛开发h5.super-mans.comQ:2012035031即时生效,多种规则覆盖常用业务场景 2017年的最后一周,又拍云进行了一次重要升级,将自定义 Rewrite 升级为"边缘规则".互联网应用场景的日益多样化,简单.方便.快速的根据不同应用场景实现不同的功能变得越来越重要.边缘规则,将更加贴合客户在垂直领域的需求,为终端用户带来更好的访问体验. 什么是边缘规则(EdgeRules)又拍云 CDN 边缘网络中的智能可扩展应用程序