大数据采集工具flume各种采集方案案例

以下是整理flume的各种采集方式 代码直接用

一、source类型是netcat
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令./flume-ng agent -n a1 -f ../conf/netcat.conf -Dflume.root.logger=INFO,console
二、source类型是spooldir
a1.sources = r1 ##给agent的source起名
a1.sinks = k1 ##给agent的sinks起名
a1.channels = c1 ##给agent的channels起名
a1.sources.r1.type = spooldir ##文件夹
a1.sources.r1.spoolDir = /root/flume ##要采集的目录
a1.sources.r1.fileHeader = true ##采集过的文件是否需要添加一个后缀
a1.sinks.k1.type = logger
a1.channels.c1.type = memory ##把缓存数据放到内存
a1.channels.c1.capacity = 1000 ##管道里面最多可以存放多少事件
a1.channels.c1.transactionCapacity = 100 ##每次对最接收多少事件
a1.sources.r1.channels = c1 ## 把source和channels连接上
a1.sinks.k1.channel = c1 ##把sinks和channel连接上
命令:…/bin/flume-ng agent -n a1 -f ../conf/spooldir.conf -Dflume.root.logger=INFO,console
三、source 类型是avro
这个配置source是avro类型,是一个服务器,这个服务器
开启一个端口8088,目是接收数据的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = linux1 ##当前这一台机器的ip
a1.sources.r1.port = 8088
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令:…/bin/flume-ng agent -n a1 -f ../conf/server.conf -Dflume.root.logger=INFO,console
四、source是socket类型
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type =syslogtcp
a1.sources.r1.bind=linux1
a1.sources.r1.port=8080

a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
五、sink类型是avro
数据发送者 是一个客户端 目的就是发送数据
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2 ##当前这一台机器的ip
a1.sources.r1.port = 666

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

./bin/flume-ng agent -n a1 -f ./conf/client.conf -Dflume.root.logger=INFO,console
六、sink类型是两个avro
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2
a1.sources.r1.port = 666

a1.sources.r2.type = netcat
a1.sources.r2.bind = linux2
a1.sources.r2.port = 777

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux1
a1.sinks.k2.port = 8088
a1.sinks.k2.batch-size = 2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2
七、sink 类型是hdfs
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666

a1.sinks.k1.type = hdfs #sink到hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
##filePrefix 默认值:FlumeData
##写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。
a1.sinks.k1.hdfs.filePrefix = events-
##默认值:30
##hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒
##如果设置成0,则表示不根据时间来滚动文件
#注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,
#并新打开一个临时文件来写入数据;
a1.sinks.k1.hdfs.rollInterval = 30
##默认值:1024
##当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
##如果设置成0,则表示不根据临时文件大小来滚动文件;
a1.sinks.k1.hdfs.rollSize = 0
##默认值:10
##当events数据达到该数量时候,将临时文件滚动成目标文件;
##如果设置成0,则表示不根据events数据来滚动文件;
a1.sinks.k1.hdfs.rollCount = 0
##batchSize 默认值:100
##每个批次刷新到HDFS上的events数量;
a1.sinks.k1.hdfs.batchSize = 1
##useLocalTimeStamp
##默认值:flase
##是否使用当地时间。
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
八、sink类型是kafka类型的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = Hellokafka
a1.sinks.k1.brokerList = linux1:9092,linux2:9092,linux3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
九、Source 是mysql
a1.sources = r1
a1.sinks = k1
a1.channels = c1

###########sources#################

r1

a1.sources.r1.type = org.keedio.flume.source.SQLSource
a1.sources.r1.hibernate.connection.url = jdbc:mysql://localhost:3306/test
a1.sources.r1.hibernate.connection.user = root
a1.sources.r1.hibernate.connection.password = 123456
a1.sources.r1.hibernate.connection.autocommit = true
a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.r1.run.query.delay=10000
a1.sources.r1.status.file.path = /root/data/flume/
a1.sources.r1.status.file.name = sqlSource.status
a1.sources.r1.start.from = 0
a1.sources.r1.custom.query = select id,userName from user where id > [email protected]$ order by id asc
a1.sources.r1.batch.size = 1000
a1.sources.r1.max.rows = 1000
a1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.r1.hibernate.c3p0.min_size=1
a1.sources.r1.hibernate.c3p0.max_size=10

a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

原文地址:https://blog.51cto.com/14464937/2428412

时间: 2024-11-07 05:57:56

大数据采集工具flume各种采集方案案例的相关文章

详解大数据采集引擎之Sqoop&采集oracle数据库中的数据

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一.Sqoop的简介: Sqoop是一个数据采集引擎/数据交换引擎,采集关系型数据库(RDBMS)中的数据,主要用于在RDBMS与HDFS/Hive/HBase之间进行数据传递,可以通过sqoop import命令将RDBMS中的数据导入到HDFS/Hive/HBase中,也可以通过sqoop export命令将HDFS/Hive/HBase中的

数据采集工具flume

概述    Apache的flume是一个分布式的,可靠的,和可用的系统.能有效地收集,汇总和移动大量的从许多不同的来源,一个集中式数据存储日志数据.Apache的flume的使用不仅限于日志数据聚集.由于数据来源是可定制的,flume可以用来大量事件(每一行数据被当做一个event)数据包括但不限于传输网络数据,社交媒体产生的数据,电子邮件和几乎任何数据源的可能.    Apache的flume是Apache软件基金会的顶级项目,目前有两个版本的代码,版本0.9.x和1.x.1.x是全新的架构

带你看懂大数据采集引擎之Flume&采集目录中的日志

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一.Flume的介绍: Flume由Cloudera公司开发,是一种提供高可用.高可靠.分布式海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于采集数据:同时,flume提供对数据进行简单处理,并写到各种数据接收方的能力,如果能用一句话概括Flume,那么Flume是实时采集日志的数据采集引擎. 二.Flume的体

大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例

[TOC] 1 大数据处理的常用方法 大数据处理目前比较流行的是两种方法,一种是离线处理,一种是在线处理,基本处理架构如下: 在互联网应用中,不管是哪一种处理方式,其基本的数据来源都是日志数据,例如对于web应用来说,则可能是用户的访问日志.用户的点击日志等. 如果对于数据的分析结果在时间上有比较严格的要求,则可以采用在线处理的方式来对数据进行分析,如使用Spark.Storm等进行处理.比较贴切的一个例子是天猫双十一的成交额,在其展板上,我们看到交易额是实时动态进行更新的,对于这种情况,则需要

【2020】DBus,一个更能满足企业需求的大数据采集平台

功能远超Sqoop.DataX.Flume.Logatash.Filebeat等采集工具 注:由于文章篇幅有限,完整文档可扫免费获取 深知其他组件的局限性,才能彰显DBus的优越感 当前有很多数据采集工具(Sqoop.DataX.Flume.Logatash.Filebeat等),他们或多或少都存在一些局限性. 一个共性问题是缺乏统一的数据源端管控,所以也就无法找到统一的数据入口,那后续处理元数据或者血缘分析会异常困难.除此之外,现有各种数据采集工具的数据同步方法也有一定的局限性.比如: (1)

【2020】 DBus,一个更能满足企业需求的大数据采集平

功能远超Sqoop.DataX.Flume.Logatash.Filebeat等采集工具 深知其他组件的局限性,才能彰显DBus的优越感 当前有很多数据采集工具(Sqoop.DataX.Flume.Logatash.Filebeat等),他们或多或少都存在一些局限性.一个共性问题是缺乏统一的数据源端管控,所以也就无法找到统一的数据入口,那后续处理元数据或者血缘分析会异常困难.除此之外,现有各种数据采集工具的数据同步方法也有一定的局限性.比如:(1)各个数据使用方在业务低峰期各种抽取所需数据(缺点

一共81个,开源大数据处理工具汇总(下)

接上一部分:一共81个,开源大数据处理工具汇总(上),第二部分主要收集整理的内容主要有日志收集系统.消息系统.分布式服务.集群管理.RPC.基础设施.搜索引擎.Iaas和监控管理等大数据开源工具. 日志收集系统 一.Facebook Scribe 贡献者:Facebook 简介:Scribe是Facebook开源的日志收集系统,在Facebook内部已经得到大量的应用.它能够从各种日志源上收集日志,存储到一个中央存储系统(可以是NFS,分布式文件系统等)上,以便于进行集中统计分析处理.它为日志的

一共81个,开源大数据处理工具汇总(下),包括日志收集系统/集群管理/RPC等

作者:大数据女神-诺蓝(微信公号:dashujunvshen).本文是36大数据专稿,转载必须标明来源36大数据. 接上一部分:一共81个,开源大数据处理工具汇总(上),第二部分主要收集整理的内容主要有日志收集系统.消息系统.分布式服务.集群管理.RPC.基础设施.搜索引擎.Iaas和监控管理等大数据开源工具. 日志收集系统 一.Facebook Scribe 贡献者:Facebook 简介:Scribe是Facebook开源的日志收集系统,在Facebook内部已经得到大量的应用.它能够从各种

动态网页数据的采集方案

我在上一篇文章中介绍了使用ScrapySharp快速从网页中采集数据,这种方式是通过直接发送的Http请求来获取的原始页面信息,对于静态网页非常有效,但还有许多网站中的页面内容并非全部存放在原始的页面中,很多内容是通过javascript来动态生成的,这些数据用前面的方式就抓取不到了.本文这里就简单的介绍一下动态网页的采集方案. 对于这样的网页数据的采集,往往是利用一个浏览器引擎来实现整个页面的加载,输出加载完后的完整页面,然后就可以利用ScrapySharp等工具解析了.常用有如下几种方式: