flume从kafka读取数据到hdfs中的配置

#source的名字
agent.sources = kafkaSource
# channels的名字,建议按照type来命名
agent.channels = memoryChannel
# sink的名字,建议按照目标来命名
agent.sinks = hdfsSink

# 指定source使用的channel名字
agent.sources.kafkaSource.channels = memoryChannel
# 指定sink需要使用的channel的名字,注意这里是channel
agent.sinks.hdfsSink.channel = memoryChannel

#-------- kafkaSource相关配置-----------------
# 定义消息源类型
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定义kafka所在zk的地址
#
# 这里特别注意: 是kafka的zookeeper的地址
#
agent.sources.kafkaSource.zookeeperConnect = 127.0.0.1:2181
# 配置消费的kafka topic
#agent.sources.kafkaSource.topic = testtopic# 配置消费者组的id
agent.sources.kafkaSource.groupId = flume
# 消费超时时间,参照如下写法可以配置其他所有kafka的consumer选项。注意格式从kafka.xxx开始是consumer的配置属性
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100

#------- memoryChannel相关配置-------------------------
# channel类型
agent.channels.memoryChannel.type = memory
# channel存储的事件容量
agent.channels.memoryChannel.capacity=10000
# 事务容量
agent.channels.memoryChannel.transactionCapacity=1000

#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
# 注意, 我们输出到下面一个子文件夹datax中
agent.sinks.hdfsSink.hdfs.path = hdfs://lenovo:9000/user/hive/warehouse/test/%Y%m%d%H
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream

agent.sinks.hdfsSink.hdfs.rollSize = 1024
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.rollInterval = 60

#配置前缀和后缀
agent.sinks.hdfsSink.hdfs.filePrefix=test
agent.sinks.hdfsSink.hdfs.fileSuffix=.data

#避免文件在关闭前使用临时文件
agent.sinks.hdfsSink.hdfs.inUserPrefix=_
agent.sinks.hdfsSink.hdfs.inUserSuffix=

#自定义拦截器
agent.sources.kafkaSource.interceptors=i1
agent.sources.kafkaSource.interceptors.i1.type=com.hadoop.flume.FormatInterceptor$Builder
时间: 2024-12-25 04:31:18

flume从kafka读取数据到hdfs中的配置的相关文章

Ubuntu_12.04 server amd64安装读取数据失败以及samba的配置

Ubuntu_12.04 server amd64.iso 找了好多工具都会出现在安装组件的时候报错: 从光盘读取数据失败,请检查光盘的完整性: 那首先需要怀疑的是制作u盘启动盘工具错误,我试过很多建议不要使用UltraISO,LinuxLive USB Creator ,Win32 Disk Imager,LaoMaoTao_V2014,WinImage,SaleaeLogic_1.1.16C... 这些都是血淋淋的教训. 建议使用Universal-USB-Installer-1.9.5.4

hbase 从hdfs上读取数据到hbase中

1 <dependencies> 2 <dependency> 3 <groupId>org.apache.hbase</groupId> 4 <artifactId>hbase-client</artifactId> 5 <version>2.0.2</version> 6 </dependency> 7 <dependency> 8 <groupId>org.apache

7.从Hbase中读取数据写入hdfs

/** public abstract classTableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable,Result, KEYOUT, VALUEOUT> { }  *@author [email protected]  *  */ public class HbaseReader {          publicstatic String flow_fields_import = "fl

利用Flume将MySQL表数据准实时抽取到HDFS

转自:http://blog.csdn.net/wzy0623/article/details/73650053 一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问.这种方式只需要很少量的配置即可完成数据抽取任务,但缺点同样明显,那就是实时性.Sqoop使用MapReduce读写数据,而MapReduce是为了批处理场景设计的,目标是大吞吐量,并不太关心低延时问题.就像实验中所做的,每天定

Flume和Kafka

本文是学习时的自我总结,用于日后温习.如有错误还望谅解,不吝赐教 此处附上部分内容所出博客:http://blog.csdn.net/ymh198816/article/details/51998085 Flume+Kafka+Storm+Redis实时分析系统基本架构 1)    整个实时分析系统的架构是 2)    先由电商系统的订单服务器产生订单日志, 3)    然后使用Flume去监听订单日志, 4)    并实时把每一条日志信息抓取下来并存进Kafka消息系统中, 5)    接着由

JAVA BufferedReader 类从标准输入读取数据

1,建立输入流: BufferedReader localReader = new BufferedReader( new InputStreamReader(System.in)); System.in 表示标准输入,一般指键盘. 建立输入流,从标准输入读取数据到缓冲区中. 当在标准输入中输入一行字符串时,按回车之后,这行数据就会被读取到缓冲区中. 比如: abc  \r\n(表示按下回车键) 那么msg就会赋值为 abc String msg = null; while ((msg = lo

sas数据读取详解 四种读取数据方式以及数据指针的位置 、读取mess data的两个小工具、特殊的读取技巧、infile语句及其选项(dsd dlm missover truncover obs firstobs)、proc import、自定义缺失值

(The record length is the number of characters, including spaces, in a data line.) If your data lines are long, and it looks like SAS is not reading all your data, then use the LRECL= option in the INFILE statement to specify a record length at least

C#实现从数据库读取数据到Excel

用第三方组件:NPOI来实现 先去官网:http://npoi.codeplex.com/下载需要引入dll(可以选择.net2.0或者.net4.0的dll),然后在网站中添加引用.使用 NPOI 你就可以在没有安装 Office 或者相应环境的机器上对 WORD/EXCEL 文档进行读写. 创建一个实体类: [Table("Customer") ] public class Customer { [Key] public int Id { get; set; } public st

tensorflow读取数据之CSV格式

tensorflow要想用起来,首先自己得搞定数据输入.官方文档中介绍了几种,1.一次性从内存中读取数据到矩阵中,直接输入:2.从文件中边读边输入,而且已经给设计好了多线程读写模型:3.把网络或者内存中的数据转化为tensorflow的专用格式tfRecord,存文件后再读取. 其中,从文件中边读边输入,官方文档举例是用的CSV格式文件.我在网上找了一份代码,修改了一下,因为他的比较简略,我就补充一下遇到的问题 先贴代码 #coding=utf-8import tensorflow as tf