《OD大数据实战》Flume实战

一、netcat source + memory channel + logger sink

1. 修改配置

1)修改$FLUME_HOME/conf下的flume-env.sh文件,修改内容如下

export JAVA_HOME=/opt/modules/jdk1.7.0_67

2)在$FLUME_HOME/conf目录下,创建agent子目录,新建netcat-memory-logger.conf,配置内容如下:

# netcat-memory-logger

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = beifeng-hadoop-02
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2. 启动flume并测试

1) 启动

bin/flume-ng agent -n a1 -c conf/ -f conf/agent/netcat-memory-logger.conf -Dflume.root.logger=INFO,console

2) 测试

nc beifeng-hadoop-02 44444

输入任意字符串,观察服务器的日志文件即可。

使用linux的nc命令,如果命令不存在则先安装一下。

安装netcat:sudo yum -y install nc

二、agent: avro source + file channel + hdfs sink

1. 增加配置

在$FLUME_HOME/conf目录下,创建agent子目录,新建avro-file-hdfs.conf,配置内容如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = beifeng-hadoop-02
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://beifeng-hadoop-02:9000/flume/events/%Y-%m-%d
# default:FlumeData
a1.sinks.k1.hdfs.filePrefix = FlumeData
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollCount = 0
# 一般接近block 128 120 125
a1.sinks.k1.hdfs.rollSize = 10240
a1.sinks.k1.hdfs.fileType = DataStream
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint
a1.channels.c1.dataDirs = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2. 启动并测试

1)启动flume agent

bin/flume-ng agent -n a1 -c conf/ -f conf/agent/avro-file-hdfs.conf -Dflume.root.logger=INFO,console

2)使用flume自带的avro-client测试

bin/flume-ng avro-client --host beifeng-hadoop-02 --port 4141 --filename /home/beifeng/order_info.txt
时间: 2024-10-19 21:13:29

《OD大数据实战》Flume实战的相关文章

Flume 实战(1) -- 初体验

前言: Flume-ng是数据收集/聚合/传输的组件, Flume-ng抛弃了Flume OG原本繁重的zookeeper和Master, Collector, 其整体的架构更加的简洁和明了. 其基础组件就Agent进程, 内部又可以细分为Source, Channel, Sink三个组件, Source是数据的输入源, channel作为消息的管道, 而sink是作为数据流的输出, Source可以配置多个channel, sink和channel一一对应. *) 初体验Flume-ng 以C

Flume 实战(2)--Flume-ng-sdk源码分析

具体参考: 官方用户手册和开发指南 http://flume.apache.org/FlumeDeveloperGuide.html *) 定位和简单例子 1). Flume-ng-sdk是用于编写往flume agent发送数据的client sdk2). 简单示例 RpcClient client = null; try { client = RpcClientFactory.getDefaultInstance("127.0.0.1", 41414); Event event =

flume实战应用解析

业务背景: 将java项目生成的日志文件分门别类的输出给flume 第一步: 将日志输出到flume中,在java程序中编写log4j,并指定输出到哪个flume服务器中 log4j.rootLogger=INFO,flume log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname=192.168.13.132 log4j.appender.fl

Flume实战案例

1. 日志的采集和汇总1.1. 案例场景A.B两台日志服务机器实时生产日志主要类型为access.log.nginx.log.web.log现在要求:把A.B 机器中的access.log.nginx.log.web.log 采集汇总到C机器上然后统一收集到hdfs中.但是在hdfs中要求的目录为:/source/logs/access/20160101//source/logs/nginx/20160101//source/logs/web/20160101/**1.2. 场景分析1.3. 数

flume实战

业务背景: A,B两台机器实时生产日志主要类型为access.log,ugcheader.log,ugctail.log 实现要求: 把A,B机器中的access.log,ugcheader.log,ugctail.log汇总到C机器上然后统一收集到HDFS中 但是在HDFS中要求的目录为: /source/access/20160101/** /source/ugcheader/20160101/** /source/ugctail/20160101/**

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

日志采集框架Flume

概述 Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS.hbase.hive.kafka等众多外部存储系统中 一般的采集需求,通过对flume的简单配置即可实现 Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景 运行机制 1. Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形

第88讲:Spark Streaming从Flume Poll数据

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming