flume 读取kafka 数据

本文介绍flume读取kafka数据的方法

代码:

/*******************************************************************************

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements.  See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership.  The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License.  You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing,

* software distributed under the License is distributed on an

* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

* KIND, either express or implied.  See the License for the

* specific language governing permissions and limitations

* under the License.

*******************************************************************************/

package org.apache.flume.source.kafka;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.Message;

import kafka.message.MessageAndMetadata;

import org.apache.flume.*;

import org.apache.flume.conf.Configurable;

import org.apache.flume.conf.ConfigurationException;

import org.apache.flume.event.SimpleEvent;

import org.apache.flume.source.AbstractSource;

import org.apache.flume.source.SyslogParser;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

* A Source for Kafka which reads messages from kafka. I use this in company production environment

* and its performance is good. Over 100k messages per second can be read from kafka in one source.<p>

* <tt>zookeeper.connect: </tt> the zookeeper ip kafka use.<p>

* <tt>topic: </tt> the topic to read from kafka.<p>

* <tt>group.id: </tt> the groupid of consumer group.<p>

*/

public class KafkaSource extends AbstractSource implements Configurable, PollableSource {

private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);

private ConsumerConnector consumer;

private ConsumerIterator<byte[], byte[]> it;

private String topic;

public Status process() throws EventDeliveryException {

List<Event> eventList = new ArrayList<Event>();

MessageAndMetadata<byte[],byte[]> message;

Event event;

Map<String, String> headers;

String strMessage;

try {

if(it.hasNext()) {

message = it.next();

event = new SimpleEvent();

headers = new HashMap<String, String>();

headers.put("timestamp", String.valueOf(System.currentTimeMillis()));

strMessage =  String.valueOf(System.currentTimeMillis()) + "|" + new String(message.message());

log.debug("Message: {}", strMessage);

event.setBody(strMessage.getBytes());

//event.setBody(message.message());

event.setHeaders(headers);

eventList.add(event);

}

getChannelProcessor().processEventBatch(eventList);

return Status.READY;

} catch (Exception e) {

log.error("KafkaSource EXCEPTION, {}", e.getMessage());

return Status.BACKOFF;

}

}

public void configure(Context context) {

topic = context.getString("topic");

if(topic == null) {

throw new ConfigurationException("Kafka topic must be specified.");

}

try {

this.consumer = KafkaSourceUtil.getConsumer(context);

} catch (IOException e) {

log.error("IOException occur, {}", e.getMessage());

} catch (InterruptedException e) {

log.error("InterruptedException occur, {}", e.getMessage());

}

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topic, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

if(consumerMap == null) {

throw new ConfigurationException("topicCountMap is null");

}

List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);

if(topicList == null || topicList.isEmpty()) {

throw new ConfigurationException("topicList is null or empty");

}

KafkaStream<byte[], byte[]> stream =  topicList.get(0);

it = stream.iterator();

}

@Override

public synchronized void stop() {

consumer.shutdown();

super.stop();

}

}

/*******************************************************************************

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements.  See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership.  The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License.  You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing,

* software distributed under the License is distributed on an

* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

* KIND, either express or implied.  See the License for the

* specific language governing permissions and limitations

* under the License.

*******************************************************************************/

package org.apache.flume.source.kafka;

import java.io.IOException;

import java.util.Map;

import java.util.Properties;

import com.google.common.collect.ImmutableMap;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.flume.Context;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class KafkaSourceUtil {

private static final Logger log = LoggerFactory.getLogger(KafkaSourceUtil.class);

public static Properties getKafkaConfigProperties(Context context) {

log.info("context={}",context.toString());

Properties props = new Properties();

ImmutableMap<String, String> contextMap = context.getParameters();

for (Map.Entry<String,String> entry : contextMap.entrySet()) {

String key = entry.getKey();

if (!key.equals("type") && !key.equals("channel")) {

props.setProperty(entry.getKey(), entry.getValue());

log.info("key={},value={}", entry.getKey(), entry.getValue());

}

}

return props;

}

public static ConsumerConnector getConsumer(Context context) throws IOException, InterruptedException {

ConsumerConfig consumerConfig = new ConsumerConfig(getKafkaConfigProperties(context));

ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);

return consumer;

}

}

配置文件:( /etc/flume/conf/flume-kafka-file.properties)

agent_log.sources = kafka0

agent_log.channels = ch0

agent_log.sinks = sink0

agent_log.sources.kafka0.channels = ch0

agent_log.sinks.sink0.channel = ch0

agent_log.sources.kafka0.type = org.apache.flume.source.kafka.KafkaSource

agent_log.sources.kafka0.zookeeper.connect = node3:2181,node4:2181,node5:2181

agent_log.sources.kafka0.topic = kkt-test-topic

agent_log.sources.kafka0.group.id= test

agent_log.channels.ch0.type = memory

agent_log.channels.ch0.capacity = 2048

agent_log.channels.ch0.transactionCapacity = 1000

agent_log.sinks.sink0.type=file_roll

agent_log.sinks.sink0.sink.directory=/data/flumeng/data/test

agent_log.sinks.sink0.sink.rollInterval=300

启动脚本:

sudo su  -l -s /bin/bash  flume  -c ‘/usr/lib/flume/bin/flume-ng agent --conf /etc/flume/conf --conf-file /etc/flume/conf/flume-kafka-file.properties -name agent_log -Dflume.root.logger=INFO,console ‘

注意: 红色字体的功能是对原来数据加入时间戳

版本 flume-1.4.0.2.1.1.0 + kafka2.8.0-0.8.0

参考资料:https://github.com/baniuyao/flume-kafka

编译用到的库:

flume-ng-configuration-1.4.0.2.1.1.0-385

flume-ng-core-1.4.0.2.1.1.0-385

flume-ng-sdk-1.4.0.2.1.1.0-385

flume-tools-1.4.0.2.1.1.0-385

guava-11.0.2

kafka_2.8.0-0.8.0

log4j-1.2.15

scala-compiler

scala-library

slf4j-api-1.6.1

slf4j-log4j12-1.6.1

zkclient-0.3

zookeeper-3.3.4

flume 读取kafka 数据,布布扣,bubuko.com

时间: 2024-10-13 21:15:21

flume 读取kafka 数据的相关文章

使用flume将kafka数据sink到HBase【转】

1. hbase sink介绍 1.1 HbaseSink 1.2 AsyncHbaseSink 2. 配置flume 3. 运行测试flume 4. 使用RegexHbaseEventSerializer来处理些HBASE的值 5. 效率测试 1. hbase sink介绍 如果还不了解flume请查看我写的其他flume下的博客. 接下来的内容主要来自flume官方文档的学习. 顺便也强烈推荐flume 1.6 官方API hbase的sink主要有以下两种.两种方式都提供和HBASE一样的

Spark Streaming 读取 Kafka 数据的两种方式

在Spark1.3之前,默认的Spark接收Kafka数据的方式是基于Receiver的,在这之后的版本里,推出了Direct Approach,现在整理一下两种方式的异同. 1. Receiver-based Approach val kafkaStream = KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 2. Direct Approach (No Receivers) v

SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件

# -*- coding: UTF-8 -*- #!/bin/env python3 # filename readFromKafkaStreamingGetLocation.py import IP from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import datetime class

【Druid】Druid读取Kafka数据的简单配置过程

Druid的单机版安装参考:https://blog.51cto.com/10120275/2429912 Druid实时接入Kafka的过程 下载.安装.启动kafka过程: wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgz tar -zxvf kafka_2.11-2.2.1.tgz ln -s kafka_2.11-2.2.1 kafka $KAFKA_HOME/kafka-se

sparkStreaming读取kafka的两种方式

概述 Spark Streaming 支持多种实时输入源数据的读取,其中包括Kafka.flume.socket流等等.除了Kafka以外的实时输入源,由于我们的业务场景没有涉及,在此将不会讨论.本篇文章主要着眼于我们目前的业务场景,只关注Spark Streaming读取Kafka数据的方式. Spark Streaming 官方提供了两种方式读取Kafka数据: 一是Receiver-based Approach.该种读取模式官方最先支持,并在Spark 1.2提供了数据零丢失(zero-d

spark streaming从指定offset处消费Kafka数据

spark streaming从指定offset处消费Kafka数据 2017-06-13 15:19 770人阅读 评论(2) 收藏 举报 分类: spark(5) 原文地址:http://blog.csdn.net/high2011/article/details/53706446 首先很感谢原文作者,看到这篇文章我少走了很多弯路,转载此文章是为了保留一份供复习用,请大家支持原作者,移步到上面的连接去看,谢谢 一.情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中

flume从kafka读取数据到hdfs中的配置

#source的名字 agent.sources = kafkaSource # channels的名字,建议按照type来命名 agent.channels = memoryChannel # sink的名字,建议按照目标来命名 agent.sinks = hdfsSink # 指定source使用的channel名字 agent.sources.kafkaSource.channels = memoryChannel # 指定sink需要使用的channel的名字,注意这里是channel

flume从kafka中读取数据

a1.sources = r1 a1.sinks = k1 a1.channels = c1 #使用内置kafka source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource #kafka连接的zookeeper a1.sources.r1.zookeeperConnect = localhost:2181 a1.sources.r1.topic = kkt-test-topic a1.sources.r1.batc

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs