ambari下的flume和kafka整合

1、配置flume

 1 #扫描指定文件配置
 2 agent.sources = s1
 3 agent.channels = c1
 4 agent.sinks = k1
 5
 6 agent.sources.s1.type=exec
 7 agent.sources.s1.command=tail -F /home/flume/test/test.log
 8 agent.sources.s1.channels=c1
 9 agent.channels.c1.type=memory
10 agent.channels.c1.capacity=10000
11 agent.channels.c1.transactionCapacity=100
12
13 #设置Kafka接收器
14 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
15 #设置Kafka的broker地址和端口号
16 agent.sinks.k1.brokerList=172.16.38.159:6667
17 #设置Kafka的Topic
18 agent.sinks.k1.topic=test
19 #设置序列化方式
20 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
21
22 agent.sinks.k1.channel=c1

二、配置kafka

  1、创建一个topic

#创建一个test的topic
bin/kafka-topics.sh --zookeeper 172.16.38.159:2181 --create --topic test --replication-factor 1 --partitions 1

  2、使用kafka的监控来查看

java -cp KafkaOffsetMonitor-assembly-0.2.0.jar  com.quantifind.kafka.offsetapp.OffsetGetterWeb  --zk 172.16.38.159:2181  --port 8089  --refresh 10.seconds  --retain 1.days

  3、使用customer来查看收到的数据

./kafka-console-consumer.sh -zookeeper 172.16.38.159:2181 --from-beginning --topic test
时间: 2024-11-01 11:39:40

ambari下的flume和kafka整合的相关文章

Flume 学习笔记之 Flume NG+Kafka整合

Flume NG集群+Kafka集群整合: 修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafka hadoop1: #set Agent name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacit

大数据入门第二十四天——SparkStreaming(2)与flume、kafka整合

前一篇中数据源采用的是从一个socket中拿数据,有点属于“旁门左道”,正经的是从kafka等消息队列中拿数据! 主要支持的source,由官网得知如下: 获取数据的形式包括推送push和拉取pull 一.spark streaming整合flume 1.push的方式 更推荐的是pull的拉取方式 引入依赖: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streami

Flume+Kafka整合

一.准备工作 准备5台内网服务器创建Zookeeper和Kafka集群 服务器地址: 192.168.2.240 192.168.2.241 192.168.2.242 192.168.2.243 192.168.2.244 服务器系统:Centos 6.5  64位 下载安装包 Zookeeper:http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz Flume:http://apache.fayea.

非kerberos环境下,flume采集日志到kafka

kafkaflume.conf agent.sources = s1 agent.channels = c1 agent.sinks = k1 agent.sources.s1.type=exec agent.sources.s1.command=tail -F /usr/local/src/flume/testflume2.log agent.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capac

Flume 与Kafka区别

今天开会讨论日志处理为什么要同时使用Flume和Kafka,是否可以只用Kafka 不使用Flume?当时想到的就只用Flume的接口多,不管是输入接口(socket 和 文件)以及输出接口(Kafka/HDFS/HBase等). 考虑单一应用场景,从简化系统的角度考虑,在满足应用需求的情况下可能只使用一个比较好.但是考虑到现有系统业务发展,为了后面的灵活扩展,在先用系统设计时留有一定的扩展性感觉更重要,可能使用Flume+kafka架构相对只使用Kafka会多占用1-2台机器做Flume日志采

flume写kafka topic覆盖问题fix

结构: nginx-flume->kafka->flume->kafka(因为牵扯到跨机房问题,在两个kafka之间加了个flume,蛋疼..) 现象: 在第二层,写入kafka的topic和读取的kafka的topic相同,手动设定的sink topic不生效 打开debug日志: source实例化: 21 Apr 2015 19:24:03,146 INFO [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFac

Kafka实战-Flume到Kafka (转)

原文链接:Kafka实战-Flume到Kafka 1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面开始今天的分享内容. 2.数据来源 Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka(供实时计算处理)和HDFS(离线计算处理).关于Flume集群的Ag

Flume连接Kafka的broker出错

在启动Flume的时候,出现下面的异常,但是程序照样能运行,Kafka也能够收到数据,只是偶尔会断点. 2016-08-25 15:32:54,561 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:2,host:10.208.129.5,port:9092 with

Flume、Kafka结合

Todo: 对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息; 在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出 编写KafkaSink 从$KAFKA_HOME/lib下复制 kafka_2.10-0.8.2.1.jar kafka-clients-0.8.2.1.jar scala-library-2.10.4.jar 到$FLUME_H