Flume-ng-mongodb-sink

本文主要介绍使用Flume传输数据到MongoDB的过程,内容涉及环境部署和注意事项。

一、环境搭建

1、flune-ng下载地址:http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
2、mongodb java driver jar包下载地址:https://oss.sonatype.org/content/repositories/releases/org/mongodb/mongo-java-driver/2.13.0/mongo-java-driver-2.13.0.jar
3、flume-ng-mongodb-sink 源码下载地址:https://github.com/leonlee/flume-ng-mongodb-sink
     flume-ng-mongodb-sink 需要自己编译jar包,从github上下载代码,解压之后执行mvn package,即可生成。需要先安装maven用于编译jar包

二、Flume配置

1、env配置

将mongo-java-driver和flume-ng-mongodb-sink两个jar包放到flume\lib目录下,并将路径加入到flume-env.sh文件的FLUME_CLASSPATH变量中;
  JAVA_OPTS变量: 加上-Dflume.monitoring.type=http -Dflume.monitoring.port=xxxx,可以在[hostname:xxxx]/metrics 上看到监控信息; -Xms指定JVM初始内存,-Xmx指定JVM最大内存
  FLUME_HOME变量: 设定FLUME根目录
  JAVA_HOME变量: 设定JAVA根目录

2、 log配置

在调试时,将日志设置为debug并打到文件:flume.root.logger=DEBUG,LOGFILE

3、 传输配置

采用 Exec Source、file-channel、flume-ng-mongodb-sink

my_agent.sources.my_source_1.channels = my_channel_1
my_agent.sources.my_source_1.type = exec
my_agent.sources.my_source_1.command = python  xxx.py
my_agent.sources.my_source_1.shell = /bin/bash -c
my_agent.sources.my_source_1.restartThrottle = 10000
my_agent.sources.my_source_1.restart = true
my_agent.sources.my_source_1.logStdErr = true
my_agent.sources.my_source_1.batchSize = 1000
my_agent.sources.my_source_1.interceptors = i1 i2 i3
my_agent.sources.my_source_1.interceptors.i1.type = static
my_agent.sources.my_source_1.interceptors.i1.key = db
my_agent.sources.my_source_1.interceptors.i1.value = cswuyg_test
my_agent.sources.my_source_1.interceptors.i2.type = static
my_agent.sources.my_source_1.interceptors.i2.key = collection
my_agent.sources.my_source_1.interceptors.i2.value = cswuyg_test
my_agent.sources.my_source_1.interceptors.i3.type = static
my_agent.sources.my_source_1.interceptors.i3.key = op
my_agent.sources.my_source_1.interceptors.i3.value = upsert

字段说明:采用exec source,指定执行命令行为python xxx.py,在xxx.py代码中处理日志,并按照跟flume-ng-mongodb-sink的约定,print出json格式的数据,如果update类操作必须带着_id字段,print出来的日志被当作Event的Body,我再通过interceptors给它加上自定义Event Header;

static interceptors用于为Event Header添加信息,这里我为它加上了db=cswuyg_test、collection=cswuyg_test、op=upsert,这三个key是跟flume-ng-mongodb-sink 约定的,用于指定mongodb中的db、collection名以及操作类型为update。

my_agent.channels.my_channel_1.type = file
my_agent.channels.my_channel_1.checkpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint
my_agent.channels.my_channel_1.useDualCheckpoints = true
my_agent.channels.my_channel_1.backupCheckpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint2
my_agent.channels.my_channel_1.dataDirs = /home/work/flume/file-channel/my_channel_1/data
my_agent.channels.my_channel_1.transactionCapacity = 10000
my_agent.channels.my_channel_1.checkpointInterval = 30000
my_agent.channels.my_channel_1.maxFileSize = 4292870142
my_agent.channels.my_channel_1.minimumRequiredSpace = 524288000
my_agent.channels.my_channel_1.capacity = 100000

sink配置:

my_agent.sinks.my_mongo_1.type = org.riderzen.flume.sink.MongoSink
my_agent.sinks.my_mongo_1.host = xxxhost
my_agent.sinks.my_mongo_1.port = yyyport
my_agent.sinks.my_mongo_1.model = DYNAMIC/SINGLE ---查看源码仅支持此二种方式,并且必须大小my_agent.sinks.my_mongo_1.db = XXX --mongo表名,默认名称为eventsmy_agent.sinks.my_mongo_1.username = XXX --mongo用户名my_agent.sinks.my_mongo_1.password = YYY --mongo密码my_agent.sinks.my_mongo_1.collecion = log
my_agent.sinks.my_mongo_1.batch = 10
my_agent.sinks.my_mongo_1.channel = my_channel_1
my_agent.sinks.my_mongo_1.timestampField = _S

参见:http://www.cnblogs.com/cswuyg/p/4498804.html

时间: 2024-07-29 11:39:25

Flume-ng-mongodb-sink的相关文章

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann

Flume NG 学习笔记(五)Sinks和Channel配置

一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. 目前,它支持HDFS的文本和序列文件格式,以及支持两个文件类型的压缩.支持将所用的时间.数据大小.事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的).它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径. HDFS目录路径可

【Flume NG用户指南】(2)构造

作者:周邦涛(Timen) Email:[email protected] 转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575 上一篇请參考[Flume NG用户指南](1)设置 3. 配置 前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件里读取的. 3.1 定义数据流 要在一个Flume Agent中定义数据流,你须要通过一个Channel将Source和Sin

【Flume NG用户指南】(2)配置

作者:周邦涛(Timen) Email:[email protected] 转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575 上一篇请参考[Flume NG用户指南](1)设置 3. 配置 前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件中读取的. 3.1 定义数据流 要在一个Flume Agent中定义数据流,你需要通过一个Channel将Source和Sin

Flume 学习笔记之 Flume NG+Kafka整合

Flume NG集群+Kafka集群整合: 修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafka hadoop1: #set Agent name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacit

Flume 学习笔记之 Flume NG高可用集群搭建

Flume NG高可用集群搭建: 架构总图: 架构分配: 角色 Host 端口 agent1 hadoop3 52020 collector1 hadoop1 52020 collector2 hadoop2 52020 agent1配置(flume-client.conf): #agent1 name agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 k2 #set gruop agent1.sinkgroups = g1 #

【转】Flume(NG)架构设计要点及配置实践

Flume(NG)架构设计要点及配置实践 Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡. 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元

Flume 学习笔记之 Flume NG概述及单节点安装

Flume NG概述: Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持 Failover和负载均衡.其中Agent包含Source,Channel和 Sink,三者组建了一个Agent.三者的职责如下所示: Source:用来消费(收集)数据源到Channel组件中 Channel:中转临时存储,保存所有Source组件信息 Sink:从Channel中读取,读取成功后会删除Channel中的

Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了

Flume NG源码分析(三)使用Event接口表示数据流

Flume NG有4个主要的组件: Event表示在Flume各个Agent之间传递的数据流 Source表示从外部源接收Event数据流,然后传递给Channel Channel表示对从Source传递的Event数据流的临时存储 Sink表示从Channel中接收存储的Event数据流,并传递给下游的Source或者终点仓库 这篇看一下Event接口表示的数据流.Source, Channel, Sink操作的数据流都是基于Event接口的封装. public interface Event