Flume 测试 Kafka 案例

Flume Kafka 测试案例,Flume 的配置。

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = netcat
a1.sources.s1.bind = master
a1.sources.s1.port = 44444

a1.channels.c1.type = memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = t1 # kafka topic 不需要加 k1.kafka.topic,直接去掉 kafka
a1.sinks.k1.brokerList = master:9092 # 新的使用 brokerList,旧的使用 kafka.bootstrap.servers

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

  1. 启动 kafka。

kafka-server-start.sh config/server.properties

  2. 创建 kafka topic,flume配置中的 topic 为 t1。

# 这里 --replication-factor 为1,是因为只启动了master上的kafka,从节点上面没有启动kafka,如果设置大于1的,需要将从节点的kafka也启动
# partitions 分区数量保持大于 replication-factor,分区大的话可以缓解数据过大的问题,解决内存不够,但是解决内存本质上还是需要从机器上解决。
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 2 --topic t1

  3. 启动 flume。

flume-ng agent -c conf -f conf/kafka_test.conf -n a1 -Dflume.root.logger=INFO,console

  4. 启动 kafka 的消费者,来观察看是否成功。

kafka-console-consumer.sh --bootstrap-server master:9092 --topic t1

  5. 由于 flume 配置文件中监控的命令是 netcat,启动一个远程,来发送消息。

# 如果没有 telnet, 使用 yum install telnet 进行安装
# localhost 本机
# 端口 44444,是flume配置文件中指定的,flume启动就会启动对应的端口监听
telnet localhost 44444 

  6. 测试

telnet localhost 44444
> hello
>world
>nice

  查看 kafka 的消费者窗口,会发现已经有了对应的内容

# kafka-console-consumer.sh --bootstrap-server master:9092 --topic t1
 hello
world
nice

  总结:一开始由于 flume 的配置文件没有写对,调试很久才调通,真是不应该。其次,flume启动之后要学会看对应的日志信息,比如启动flume后,就应该可以观察到kafka对应的topic,但是由于没有仔细看,发现前几次调试都是不通的,不论怎么做kafka 的消费者就是拿不到数据。但是最后发现如果 flume 配置文件不正确的话,启动 flume,监听的topic 是默认的 default-topic,所以最后问题出现在 flume 的配置文件上面,把对应的 flume 中关于 sink 部分的配置要注意,由于版本不一样有的配置需要做一点转换才能跑成功。一定要注意检查日志。

  

  

原文地址:https://www.cnblogs.com/hanwen1014/p/11260307.html

时间: 2024-08-04 23:49:40

Flume 测试 Kafka 案例的相关文章

Flume 学习笔记之 Flume NG+Kafka整合

Flume NG集群+Kafka集群整合: 修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafka hadoop1: #set Agent name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacit

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Flume、Kafka结合

Todo: 对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息; 在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出 编写KafkaSink 从$KAFKA_HOME/lib下复制 kafka_2.10-0.8.2.1.jar kafka-clients-0.8.2.1.jar scala-library-2.10.4.jar 到$FLUME_H

Flume+LOG4J+Kafka

基于Flume+LOG4J+Kafka的日志采集架构方案 本文将会介绍如何使用 Flume.log4j.Kafka进行规范的日志采集. Flume 基本概念 Flume是一个完善.强大的日志采集工具,关于它的配置,在网上有很多现成的例子和资料,这里仅做简单说明不再详细赘述.Flume包含Source.Channel.Sink三个最基本的概念: Source——日志来源,其中包括:Avro Source.Thrift Source.Exec Source.JMS Source.Spooling D

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

使用flume将kafka数据sink到HBase【转】

1. hbase sink介绍 1.1 HbaseSink 1.2 AsyncHbaseSink 2. 配置flume 3. 运行测试flume 4. 使用RegexHbaseEventSerializer来处理些HBASE的值 5. 效率测试 1. hbase sink介绍 如果还不了解flume请查看我写的其他flume下的博客. 接下来的内容主要来自flume官方文档的学习. 顺便也强烈推荐flume 1.6 官方API hbase的sink主要有以下两种.两种方式都提供和HBASE一样的

flume 整合 kafka

flume 整合 kafka: flume 采集业务日志,发送到kafka 安装部署Kafka Download 1.0.0 is the latest release. The current stable version is 1.0.0. You can verify your download by following these procedures and using these KEYS. 1.0.0 Released November 1, 2017 Source downloa

flume整合kafka

一.需求 利用flume采集Linux下的文件信息,并且传入到kafka集群当中. 环境准备zookeeper集群和kafka集群安装好. 二.配置flume 官网下载flume.博主自己这里使用的是flume1.6.0. 官网地址http://flume.apache.org/download.html 解压缩.tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /usr/apps/ 创建flume配置文件. cd /usr/apps/flume/apache

Flume整合Kafka完成实时数据采集

agent选择 agent1 exec source + memory channel + avro sink agent2 avro source + memory channel 模拟实际工作中的场景,agent1 为A机器,agent2 为B机器. avro source: 监听avro端口,并且接收来自外部avro信息, avro sink:一般用于跨节点传输,主要绑定数据移动目的地的ip和port 在创建agent2配置文件 cd /app/flume/flume/conf vi te