flume与kafka结合上传文件到HDFS上

实现如图的的效果(详细步骤请参考官方文档(http://flume.apache.org/FlumeUserGuide.html),flume更新版本比较快)

flume1.conf的配置文件内容

a1.sources = r1
a1.sinks = k1
a1.channels = c1

#具体定义source
a1.sources.r1.type = spooldir
#先创建此目录,保证里面空的
a1.sources.r1.spoolDir = /logs 

#sink到kafka里面
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的Topic
a1.sinks.k1.kafka.topic = haha1
#设置Kafka的broker地址和端口号
a1.sinks.k1.kafka.bootstrap.servers = zhiyou01:9092,zhiyou02:9092,zhiyou03:9092
#配置批量提交的数量
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type= snappy

#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume2.conf的配置文件内容

参考官方文档截图如下

详细配置如下

a1.sources = r1
a1.sinks = k1
a1.channels = c1

#对于source的配置描述 监听avro
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize=5000
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers = han01:9092,han02:9092,han03:9092
a1.sources.r1.kafka.topics=test

#定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#对于sink的配置描述 传递到hdfs上面
a1.sinks.k1.type = hdfs
#集群的nameservers名字
#单节点的直接写:hdfs://han01/xxx
a1.sinks.k1.hdfs.path = hdfs://ns/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a1.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
a1.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
a1.sinks.k1.hdfs.rollInterval = 60

#对于channel的配置描述 使用内存缓冲区域做数据的临时缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

原文地址:https://www.cnblogs.com/han-guang-xue/p/9966078.html

时间: 2024-10-31 03:00:18

flume与kafka结合上传文件到HDFS上的相关文章

hadoop上传文件到hdfs出错

通过命令: Hadoop fs -put /opt/program/userall20140828 hdfs://localhost:9000/tmp/tvbox/ 上传文件到hdfs是出现错误 14/12/11 17:57:49 WARN hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.RemoteException: java.io.IOException: File /tmp/tvbox/behavior_2014

PHP上传文件时无法上传成功,$_FILES['screenshot']['tmp_name']为空

最近在学习<HeadFirst PHP & MySQL>一书的第5章"使用存储在文件中的数据",做一个文件上传的应用时,出现了错误,就是文件无法成功上传.这个问题困扰了我很久,不过还好最后终于解决了.原因是我上传的图片文件大小超过了HTML 表单中 MAX_FILE_SIZE 选项指定的值32768Bytes即32KB导致无法上传成功. 我使用了XAMPP(Apache + MySQL + PHP + Perl)集成开发包和Zend Studio 10.6作为PHP

使用HttpRequest.Files 获取上传文件,实现上传附件功能

使用HttpRequest.Files 获取上传文件,实现上传附件功能,不同浏览器会有差异: 获得在 Google 浏览器上传后得到的 HttpRequest.Files  (客户端上载文件的集合) 单个文件查看:对应的FileName 是上传文件的原始文件名:例:开发管理手册2017版.docx 获取IE浏览器上传后HttpRequest.Files: 单个文件查看:对应的FileName 是上传文件 带路径的文件名 例:C:\\Users\\XXX\\Desktop\\开发管理手册2017版

异步无刷新上传文件并且上传文件可以带上参数

关于异步上传文件并且带上参数,网上有很多关于这样的插件,而我最喜欢用的插件是ajaxfileupload.js,该插件的代码如下: /*   131108-xxj-ajaxFileUpload.js 无刷新上传图片 jquery 插件,支持 ie6-ie10    依赖:jquery-1.6.1.min.js   主方法:ajaxFileUpload 接受 json 对象参数   参数说明:   fileElementId:必选,上传文件域ID   url:必选,发送请求的URL字符串   fi

异步无刷新上传文件而且上传文件能够带上參数

关于异步上传文件而且带上參数,网上有非常多关于这种插件.而我最喜欢用的插件是ajaxfileupload.js.该插件的代码例如以下: /*   131108-xxj-ajaxFileUpload.js 无刷新上传图片 jquery 插件.支持 ie6-ie10    依赖:jquery-1.6.1.min.js   主方法:ajaxFileUpload 接受 json 对象參数   參数说明:   fileElementId:必选,上传文件域ID   url:必选,发送请求的URL字符串  

HDFS设计思路,HDFS使用,查看集群状态,HDFS,HDFS上传文件,HDFS下载文件,yarn web管理界面信息查看,运行一个mapreduce程序,mapreduce的demo

26 集群使用初步 HDFS的设计思路 l 设计思想 分而治之:将大文件.大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析: l 在大数据系统中作用: 为各类分布式运算框架(如:mapreduce,spark,tez,--)提供数据存储服务 l 重点概念:文件切块,副本存放,元数据 26.1 HDFS使用 1.查看集群状态 命令:   hdfs  dfsadmin –report 可以看出,集群共有3个datanode可用 也可打开web控制台查看HDFS集群

批量上传文件到HDFS的Shell脚本

在做Hadoop数据挖掘项目的时候,我们第一步是源数据的获取,即把相应的数据放置到HDFS中,以便Hadoop进行计算,手动将文件上传到HDFS中,未免太费时费力,所以我们可以采取像Flume一样的框架,或者采用Shell脚本进行文件的上传.下面主要提供Shell脚本的大致写法,仅供参考,可以根据不同业务进行相应的修改. 版本1: #!/bin/bash #set java env export JAVA_HOME=/export/servers/jdk export JRE_HOME=${JA

HDFS上传文件错误--hdfs:DFSClient:DataStreamer Exception

今天上传文件的时候发现传上去的文件为空,错误提示如上述所示,原来是IP地址改掉了对呀应etc/hosts下面的IP地址也要改变,永久改ip命令-ifconfig eth0 xxx·xxx·xxx·xxx up.改完IP即可同时ssh其他节点也可以成功

ajax 上传文件,post上传文件,ajax 提交 JSON 格式的数据

ajax简介 前后台做数据交互 前后端做数据交互的方式(三种): (1)浏览器窗口输入地址(get的方式)(2)form表单提交数据(3)ajax提交数据 特点 特点: (1)异步 异步与同步的区别:同步是请求发过去,要等着回应:异步不需要等待,可以进行其他操作 (2)局部刷新 使用 使用: (1)url:匹配的路由 (2)type:发送的的方式 (3)data:发送的数据 (4)success:发送的数据成功回调条数 $('.btn').click(function () { $.ajax({