Flume -- 初识flume、source和sink

Flume – 初识flume、source和sink

目录
基本概念
常用源 Source
常用sink

基本概念

? 什么叫flume?
  分布式,可靠的大量日志收集、聚合和移动工具。

? events
  事件,是一行数据的字节数据,是flume发送文件的基本单位。

? flume配置文件
  重命名flume-env.sh.template为flume-env.sh,并添加[export JAVA_HOME=/soft/jdk]

? flume的Agent
  source //从哪儿读数据。 负责监控并收集数据。相对于channel是生产者。
  channel //数据通道。 通道,相当于数据缓冲区。
  sink //将数据传送往哪儿。 沉槽,负责将数据放置在指定位置。相对于channel是消费者。

? flume如何使用
  在flume的conf文件下,创建conf后缀的文件,使用flume命令启动

? flume命令
  启动:flume-ng agent -f /soft/flume/conf/example.conf -n a1

常用源 Source

? 执行源:Exec Sour //通过linux命令作为source。缺点:失败后数据会丢失,不能保证数据的完整性。
  #定义源:exec
  a1.source.r1.type = exec
  a1.source.r1.command = tail -F /home/centos/1.txt
? 滚动目录源:Spooling Directory Source //监控目录,如果目录下有新文件产生,机会将其消费
  #定义源:spoodir
  a1.source.r1.type = spooldir
  #指定监控目录
  a1.source.r1.spoolDir = /home/centos/log
? 指定类型的文件:tailDir source #监控目录中指定类型的文件,并监控其消费偏移量;
 通过~/.flume/taildir_position.json监控并实时记录文件偏移量,可通过a1.sources.r1.positionFile配置进行修改
  #定义源:TAILDIR
  a1.source.r1.type = TAILDIR
  #指定监控文件组
  a1.source.r1.filegroups = g1
  #指定g1组中包含的文件
  a1.source.r1.filegroups.g1 = /home/centos/log/.*log
? 顺序数字源:Sequence Generator Source //产生顺序数字的源,用作测试
  #定义源:seq
  a1.source.r1.type = seq
  #定义一次RPC产生的批次数量
  a1.source.r1.batchSize = 1024
? 压力源:Stress Source //测试集群压力,用作负载测试
  #定义源:stress
  a1.source.r1.type = org.apache.flume.source.StressSource
  #一个event产生的数据量
  a1.source.r1.size = 1073741824

常用sink

? 日志&控制台:logger sink
  a1.sinks.k1.type = logger
? 存储在本地文件:File Roll Sink
  #设置滚动文件sink
  a1.sinks.k1.type = file_roll
  #指定文件位置。若文件不存在会报错
  a1.sinks.k1.directory = /home/centos/log2
  #设置滚动周期间隔,0即不滚动;默认30s。
  a1.sinks.k1.sink.rollInterval = 0
? 写入到hdfsL:HDFS Sink //默认SequenceFile,可以通过hdfs.fileType指定(SequenceFile, DataStream or CompressedStream)
  #指定类型
  a1.sinks.k1.type = hdfs
  #指定路径,不用单独创建文件夹
  a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H
  #时间相关的配置,必须指定时间戳
  a1.sinks.k1.hdfs.useLocalTimeStamp = true
  #实例化文件的前缀
  a1.sinks.k1.hdfs.filePrefix = events-
  #滚动间隔,0为不滚动
  a1.sinks.k1.hdfs.rollInterval = 0
  #滚动大小;默认1024
  a1.sinks.k1.hdfs.rollSize = 1024
  #指定数据类型;默认为 sequenceFile
  a1.sinks.k1.hdfs.fileType = CompressedStream
  #指定压缩编解码器
  a1.sinks.k1.hdfs.codeC = gzip
? 写入到Hbase:hbase sink //需要创建表,无法指定rowkey和col
  #设置类型
  a1.sinks.k1.type = hbase
  a1.sinks.k1.table = ns1:flume
  a1.sinks.k1.columnFaminly = f1
? 写入到Hbase:regexhbase sink //需要创建表,可以手动指定rowKey和col
  #设置正则hbase类型
  a1.sinks.k1.type = hbase
  a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
  #手动指定rowkey和列,[ROW_KEY]必须些,且大写
  a1.sinks.k1.serializer.colNames = ROW_KEY,name,age
  #指定正则,与col对应
  a1.sinks.k1.serializer.regex = (.*),(.*),(.*)
  #指定rowkey索引
  a1.sinks.k1.serializer.rowKeyIndex = 0
  a1.sinks.k1.table = ns1:flume
  a1.sinks.k1.coluFamily = f1
? 写入到Hive:hive sink //需要启动hive的事务性
  # 设置hive sink
  a1.sinks.k1.type = hive
  # 需要启动hive的metastore:hive --service metastore //metastore源数据仓库
  a1.sinks.k1.hive.metastore = thrift://s101:9083
  a1.sinks.k1.hive.database = default
  # 需要创建事务表
  a1.sinks.k1.hive.table = tx2
  # 指定列和字段的映射
  a1.sinks.k1.serializer = DELIMITED
  # 指定输入的格式,必须是双引号
  a1.sinks.k1.serializer.delimiter = "\t"
  # 指定hive存储文件展现方式,必须是单引号
  a1.sinks.k1.serializer.serdeSeparator = ‘\t‘
  a1.sinks.k1.serializer.fieldnames =id,name,age
? 写入到hive补充
  1、首先将/soft/hive/hcatalog/share/hcatalog中的所有jar拷贝到hive的lib库中
    cp /soft/hive/hcatalog/share/hcatalog/* /soft/hive/lib/
  2、启动hive的metastore
    hive --service metastore
  3、启动hive并创建事务表
    SET hive.support.concurrency = true;
    SET hive.enforce.bucketing = true;
    SET hive.exec.dynamic.partition.mode = nonstrict;
    SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
    SET hive.compactor.initiator.on = true;
    SET hive.compactor.worker.threads = 1;
    create table tx2(id int ,name string, age int ) clustered by (id) into 2 buckets stored as orc TBLPROPERTIES(‘transactional‘=‘true‘);
  4、启动flume,并使用以上的配置文件
    flume-ng agent -f k_hive.conf -n a1
  5、输入数据验证
    1 tom 18

原文地址:https://www.cnblogs.com/yiwanfan/p/9470320.html

时间: 2024-12-20 04:52:08

Flume -- 初识flume、source和sink的相关文章

flume组件汇总 source、sink、channel

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

【Flume】从入口Application来分析flume的source和sink是如何与channel交互的

大家在启动flume的时候,输入的命令就可以看出flume的启动入口了 [[email protected] apache-flume-1.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1 Info: Sourcing environment configuration script /home/flume/apache-flume-1.5.2-bin/conf/flume-env.sh + exec /home/

flume的source, channel, sink 列表

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

Flume的Source、Sink总结,及常用使用场景

数据源Source RPC异构流数据交换 Avro Source Thrift Source 文件或目录变化监听 Exec Source Spooling Directory Source Taildir Source MQ或队列订阅数据持续监听 JMS Source SSL and JMS Source Kafka Source Network类数据交换 NetCat TCP Source NetCat UDP Source HTTP Source Syslog Sources Syslog

日志收集框架 Flume 组件之Source使用

上一篇简单介绍了Flume几个组件,今天介绍下组件其一的source,整理这些,也是二次学习的过程,也是梳理知识的过程. Source 中文译为来源,源作用:采集数据,然后把数据传输到channel上.例如:监控某个文件或者某个端口或某个目录,新增数据,新增文件的变化,然后传输到channel. 常用的的source类型,也是平常用的比较多的几种类型,如下: source类型 说明 Avro Source 支持avro协议,内置支持 Thrift Source 支持Thirft rpc ,内置支

flume初识

一.flume特点 flume是目前大数据领域数据采集的一个利器,当然除了flume还有Fluentd和logstash,其他的目前来说并没有深入的了解,但是我觉得flume能够在大数据繁荣的今天屹立不倒,应该有以下几点: 1. Flume可以将应用产生的数据存储到任何集中存储器中,完美的介入HDFS和HBASE等,便于后期进行数据处理 2. 当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和

【Flume】flume输出sink到hbase的实现

flume 1.5.2 hbase 0.98.9 hadoop 2.6 zk 3.4.6 以上是基础的软件及对应版本,请先确认以上软件安装成功! 1.添加jar包支持 将hbase的lib下的这些jar包拷贝到flume的lib下 2.配置flume 注意看以上的serializer配置,采用的是官方的RegexHbaseEventSerializer, 当然还有一个SimpleHbaseEventSerializer 如果你使用了SimpleHbaseEventSerializer 就会出现如

【Flume】flume中transactionCapacity和batchSize概念的具体分析和解惑

不知道各位用过flume的读者对这两个概念是否熟悉了解 一开始本人的确有点迷惑,觉得这是不是重复了啊? 没感觉到transactionCapacity的作用啊? batchSize又是干啥的啊? -- -- 带着这些问题,我们深入源码来看一下: batchSize batchSize这个概念首先它出现在哪里呢? kafkaSink的process方法 HDFS Sink Exec Source 通过上面这三张图,相信大家应该知道batchSize从哪来的了 batchSize是针对Source和

【Flume】 flume 负载均衡环境的搭建 load_balance

flume的负载均衡即每次按照一定的算法选择sink输出到指定地方,如果在文件输出量很大的情况下,负载均衡还是很有必要的,通过多个通道输出缓解输出压力 flume内置的负载均衡的算法默认是round robin,轮询算法,按序选择 下面看一下具体实例: # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 # Describe/configure the source a1.so