Flume、Kafka结合

Todo:

对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息;

在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出

编写KafkaSink

从$KAFKA_HOME/lib下复制

kafka_2.10-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

scala-library-2.10.4.jar

到$FLUME_HOME/lib


在Eclipse新建工程,从$FLUME_HOME/lib下导入

commons-logging-1.1.1.jar

flume-ng-configuration-1.6.0.jar

flume-ng-core-1.6.0.jar

flume-ng-sdk-1.6.0.jar

zkclient-0.3.jar

kafka_2.10-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

scala-library-2.10.4.jar

到工程。

新建文件KafkaSink.java

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

public class KafkaSink extends AbstractSink implements Configurable {
    private static final Log logger = LogFactory.getLog(KafkaSink.class);

    private String topic;
    private Producer<String, String> producer;

    public void configure(Context context) {
        topic = "flume_test";
        Properties props = new Properties();
        props.setProperty("metadata.broker.list", "localhost:9092");
        props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zookeeper.connect", "localhost:2181");
        props.setProperty("num.partitions", "4"); //
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        producer = new Producer<String, String>(config);
        logger.info("KafkaSink初始化完成.");

    }

    public Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction tx = channel.getTransaction();
        try {
            tx.begin();
            Event e = channel.take();
            if (e == null) {
                tx.rollback();
                return Status.BACKOFF;
            }
            KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));
            producer.send(data);
            logger.info("flume向kafka发送消息:" + new String(e.getBody()));
            tx.commit();
            return Status.READY;
        } catch (Exception e) {
            logger.error("Flume KafkaSinkException:", e);
            tx.rollback();
            return Status.BACKOFF;
        } finally {
            tx.close();
        }
    }
}

导出jar包,放到$FLUME_HOME/lib下

(File->Export->Jar File 全部默认参数)

创建kafka.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = KafkaSink

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

测试

启动kafka

cd ~/app/kafka
./bin/zookeeper-server-start.sh ./config/zookeeper.properties> /dev/null &
./bin/kafka-server-start.sh ./config/server.properties > /dev/null &

创建topic

~/app/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4  --topic flume_test

启动控制台消费者

~/app/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume_test  --from-beginning

启动flume agent

flume-ng agent -c conf  -f ~/test/kafka.conf --name a1 -Dflume.root.logger=INFO,console

发送消息

echo "hey manhua" |nc localhost 5140
echo "nice shot" |nc localhost 5140

flume和kafka结合的一个工具

https://github.com/kevinjmh/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin/src/main/java/org/apache/flume/plugins

时间: 2024-08-07 00:13:02

Flume、Kafka结合的相关文章

flume+kafka+storm+mysql架构设计

前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考. 这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql (项目是maven项目,需要改动mysql配置,提供两种topology:读取本地文件(用来本地测试):读取服务器日志文件.) (是visio画的,图太大,放上来字看起来比较小,如果有需要的朋友留邮箱) 实时日志分析系统架构简介 系统主要分为四部分:                         负责从各节点上

[转载] 利用flume+kafka+storm+mysql构建大数据实时系统

原文: http://mp.weixin.qq.com/s?__biz=MjM5NzAyNTE0Ng==&mid=205526269&idx=1&sn=6300502dad3e41a36f9bde8e0ba2284d&key=c468684b929d2be22eb8e183b6f92c75565b8179a9a179662ceb350cf82755209a424771bbc05810db9b7203a62c7a26&ascene=0&uin=Mjk1ODMy

Flume+Kafka+Strom基于分布式环境的结合使用

目录: 一.Flume.Kafka.Storm是什么,如何安装? 二.Flume.Kafka.Storm如何结合使用? 1) 原理是什么? 2) Flume和Kafka的整合  3) Kafka和Storm的整合  4) Flume.Kafka.Storm的整合    一.Flume.Kafka.Storm是什么,如何安装? Flume的介绍,请参考这篇文章<Flume1.5.0的安装.部署.简单应用> Kafka的介绍,请参考这篇文章<kafka2.9.2的分布式集群安装和demo(j

flume+kafka+hdfs构建实时消息处理系统

flume是一个实时消息收集系统,它定义了多种的source.channel.sink,可以根据实际情况选择. Flume下载及文档: http://flume.apache.org/ Kafka kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息. 支持通过kafka服务器和消费机集群来分区消息. 支持Ha

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

Flume+Kafka+Storm+Redis 大数据在线实时分析

1.实时处理框架 即从上面的架构中我们可以看出,其由下面的几部分构成: Flume集群 Kafka集群 Storm集群 从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同的集群系统之间打通(从上面的图示中也能很好地说明这一点),即需要做各个系统之前的整合,包括Flume与Kafka的整合,Kafka与Storm的整合.当然,各个环境是否使用集群,依个人的实际需要而定,在我们的环境中,Flume.Kafka.Storm都使用集群. 2. Flume+Kafka整合 2.1 整合思路

Flume+Kafka整合

一.准备工作 准备5台内网服务器创建Zookeeper和Kafka集群 服务器地址: 192.168.2.240 192.168.2.241 192.168.2.242 192.168.2.243 192.168.2.244 服务器系统:Centos 6.5  64位 下载安装包 Zookeeper:http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz Flume:http://apache.fayea.

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

一.Hadoop配置安装 注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库, 所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译 1.修改Linux主机名 2.修改IP 3.修改主机名和IP的映射关系 ######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机.阿里云主机等) /etc/hosts里面要配置的是内网IP地址和主机名的映射关系 4.关闭防火墙 5.s

flume+kafka+hdfs详解

flume架构图 单节点flume配置 flume-1.4.0  启动flume bin/flume-ng agent --conf ./conf  -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent -n表示配置文件中agent的名字 agent.sources = r1 agent.sinks = s1 agent.channels = c1 agent.sources.r1.channels = 

Flume+Kafka+Storm+Redis实时分析系统基本架构

PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型.当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要