Flume基本操作

Flume基本操作
 
1.把Telnet产生的内容写入到控制台显示

bin/flume-ng agent -c conf -n a1 -f conf/a1.conf -Dflume.root.logger=DEBUG,console 

a.conf内容如下:

##### define agent name ####
a1.sources = src1
a1.channels = channel1
a1.sinks = sink1

####  define source  ####
a1.sources.src1.type = netcat
a1.sources.src1.bind = haoguan-HP-Compaq-Pro-6380-MT
a1.sources.src1.port = 44444

####  define channel  ####
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

####  define sink  ####
a1.sinks.sink1.type = logger
a1.sinks.sink1.maxBytesToLog = 1024

#### bind the source and sink to the channel
a1.sources.src1.channels = channel1
a1.sinks.sink1.channel = channel1  

2.把hive中产生的log写入到hdfs

bin/flume-ng agent -c conf -n a2 -f conf/flume-hive.conf -Dflume.root.logger=DEBUG,console 

flume-hive.conf内容如下:

##### define agent name ####
a2.sources = src2
a2.channels = channel2
a2.sinks = sink2

####  define source  ####
a2.sources.src2.type = exec
a2.sources.src2.command = tail -f /opt/modules/cdh/hive-0.13.1-cdh5.3.6/log/hive.log
a2.sources.src2.shell = /bin/bash -c

####  define channel  ####
a2.channels.channel2.type = memory
a2.channels.channel2.capacity = 1000
a2.channels.channel2.transactionCapacity = 100

####  define sink  ####
a2.sinks.sink2.type = hdfs
a2.sinks.sink2.hdfs.path = hdfs://haoguan-HP-Compaq-Pro-6380-MT:9000/flume_hive_log/%Y%m%d  #可以指定时间戳作为分区目录
a2.sinks.sink2.hdfs.filePrefix = events-
a2.sinks.sink2.hdfs.fileType = DataStream
a2.sinks.sink2.hdfs.writeFormat = Text
a2.sinks.sink2.hdfs.batchSize = 10
a2.sinks.sink2.hdfs.rollInterval = 30      #设置flush间隔,30秒flush一次,无论有没到达rollSize大小
a2.sinks.sink2.hdfs.rollSize = 10240       #设置文件大小(byte),到指定大小flush一次,无论有没到达rollInterval间隔
a2.sinks.sink2.hdfs.rollCount = 0          #rollCount必须设置成0,不然会影响rollInterval,rollSize的设置
a2.sinks.sink2.hdfs.idleTimeout=0
a2.sinks.sink2.hdfs.useLocalTimeStamp = true    #使用时间戳作为分区必须设置useLocalTimeStamp为true

#### bind the source and sink to the channel
a2.sources.src2.channels = channel2
a2.sinks.sink2.channel = channel2  

如果是HA架构需要把HA的core-site.xml与hdfs-site.xml放入到/opt/modules/cdh/flume-1.5.0-cdh5.3.6/conf中
a2.sinks.sink2.hdfs.path = hdfs://haoguan-HP-Compaq-Pro-6380-MT:9000/flume_hive_log
换成
a2.sinks.sink2.hdfs.path = hdfs://ns1/flume_hive_log
 
 
3. spooldir方式抽取文件到hdfs中

bin/flume-ng agent -c conf -n a3 -f conf/flume-app.conf -Dflume.root.logger=DEBUG,console 

flume-app.conf内容如下:

##### define agent name ####
a3.sources = src3
a3.channels = channel3
a3.sinks = sink3

####  define source  ####
a3.sources.src3.type = spooldir
a3.sources.src3.spoolDir = /opt/modules/cdh/flume-1.5.0-cdh5.3.6/spoollogs #指定被抽取的文件夹
a3.sources.src3.ignorePattern = ^.*\.log$  #过滤被抽取文件夹中指定的文件
a3.sources.src3.fileSuffix = _COMP         #文件抽取完成以后更改后缀

####  define channel  ####
a3.channels.channel3.type = file
a3.channels.channel3.checkpointDir = /opt/modules/cdh/flume-1.5.0-cdh5.3.6/filechannel/checkpoint
a3.channels.channel3.dataDirs = /opt/modules/cdh/flume-1.5.0-cdh5.3.6/filechannel/data
a3.channels.channel3.capacity = 1000
a3.channels.channel3.transactionCapacity = 100

####  define sink  ####
a3.sinks.sink3.type = hdfs
a3.sinks.sink3.hdfs.path = hdfs://haoguan-HP-Compaq-Pro-6380-MT:9000/flume_app_log
a3.sinks.sink3.hdfs.filePrefix = events-
a3.sinks.sink3.hdfs.fileType = DataStream
a3.sinks.sink3.hdfs.writeFormat = Text
a3.sinks.sink3.hdfs.batchSize = 10
a3.sinks.sink3.hdfs.rollInterval = 30
a3.sinks.sink3.hdfs.rollSize = 10240
a3.sinks.sink3.hdfs.rollCount = 0
a3.sinks.sink3.hdfs.idleTimeout=0
#a3.sinks.sink3.hdfs.useLocalTimeStamp = true

#### bind the source and sink to the channel
a3.sources.src3.channels = channel3
a3.sinks.sink3.channel = channel3  
时间: 2024-11-08 22:21:54

Flume基本操作的相关文章

Flume Spool Source 源码过程分析(未运行)

主要涉及到的类: SpoolDirectorySource 读取用户配置,并按照batchSize去读取这么多量的Event从用户指定的Spooling Dir中.SpoolDirectorySource 不会去读取某一个具体的文件,而是通过内部的reader去读取.文件切换等操作,都是reader去实现 内部类:SpoolDirectoryRunnable是一个线程,其中的run方法,完成从Spooling Dir读取Event(使用reader去读取) 1 @Override 2 publi

Flume 入门--几种不同的Sources

1.flume概念 flume是分布式的,可靠的,高可用的,用于对不同来源的大量的日志数据进行有效收集.聚集和移动,并以集中式的数据存储的系统. flume目前是apache的一个顶级项目. flume需要java运行环境,要求java1.6以上,推荐java1.7. 将下载好的flume安装包解压到指定目录即可. 2.flume中的重要模型 2.1.1.flume Event: flume 事件,被定义为一个具有有效荷载的字节数据流和可选的字符串属性集. 2.1.2.flume Agent:

flume初探

配置 demoagent.conf # example.conf: A single-node Flume configuration # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1 # Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.

<二叉树的基本操作>

#include<stdio.h> #include<stdlib.h> #include<string.h> #define num 100 #define OK 1 typedef int Status; typedef char DataType; typedef struct node { DataType data; struct node *lchild,*rchild; }BinTNode,*BinTree; Status CreateBiTree(Bin

日志系统之Flume采集加morphline解析

概述 这段时间花了部分时间在处理消息总线跟日志的对接上.这里分享一下在日志采集和日志解析中遇到的一些问题和处理方案. 日志采集-flume logstash VS flume 首先谈谈我们在日志采集器上的选型.由于我们选择采用ElasticSearch作为日志的存储与搜索引擎.而基于ELK(ElasticSearch,Logstash,Kibana)的技术栈在日志系统方向又是如此流行,所以把Logstash列入考察对象也是顺理成章,Logstash在几大主流的日志收集器里算是后起之秀,被Elas

iOS_UITextField 基本操作

基本操作 UITextField *userNameTextField = [[UITextField alloc] init]; userNameTextField.frame = CGRectMake(30, 100, 220, 50); [self.window addSubview:userNameTextField]; [userNameTextField release]; // 设置样式 userNameTextField.borderStyle = UITextBorderSty

Mongodb入门系列(4)——Mongodb体系结构与客户端基本操作及注意细节

说到Mongodb的体系结构,免不了与关系型数据库做个对比.这里以MySQL为例,我们进行一些比较: 从逻辑结构上对比: MySQL层次概念 MongoDB层次概念 数据库(database) 数据库(database) 表(table) 集合(collection) 记录(row) 文档(document) 在MongoDB中没有行.列.关系的概念,集合中的文档相当于一条记录,这体现了模式自由的特点. 从数据存储结构上对比: MySQL的每个数据库存放在一个与数据库同名的文件夹中,MySQL如

Oracle的基本操作-解除用户,赋予权限

一.表的基本操作 1. 用户管理 1.1 初始状态下系统的用户 在系统用户下,执行下面的查询语句可以查看到当前系统的所有用户  select * from dba_users; 1.2 创建一个用户 SQL> create user xp identified by xp; User created. 授予连接数据库的权限:SQL> grant connect to xp; Grant succeeded. SQL> conn xp/xp;Connected. 1.3 解锁一个用户并修改

数据结构中线性表的基本操作-合并两个线性表-依照元素升序排列

#include<iostream> #include<stdlib.h> #define LIST_INIT_SIZE 10/*线性表初始长度*/ #define LIST_CREATENT 2/*每次的增量*/ typedef int ElemType; using namespace std; typedef struct SqList/*线性表的数据结构定义*/ { ElemType *elem;/*线性表基址*/ int length;/*当前线性表所含的元素个数*/ i