Kafka实战-Kafka到Storm

1.概述

  在《Kafka实战-Flume到Kafka》一文中给大家分享了Kafka的数据源生产,今天为大家介绍如何去实时消费Kafka中的数据。这里使用实时计算的模型——Storm。下面是今天分享的主要内容,如下所示:

  • 数据消费
  • Storm计算
  • 预览截图

  接下来,我们开始分享今天的内容。

2.数据消费

  Kafka的数据消费,是由Storm去消费,通过KafkaSpout将数据输送到Storm,然后让Storm安装业务需求对接受的数据做实时处理,下面给大家介绍数据消费的流程图,如下图所示:

  从图可以看出,Storm通过KafkaSpout获取Kafka集群中的数据,在经过Storm处理后,结果会被持久化到DB库中。

3.Storm计算

  接着,我们使用Storm去计算,这里需要体检搭建部署好Storm集群,若是未搭建部署集群,大家可以参考我写的《Kafka实战-Storm Cluster》。这里就不多做赘述搭建的过程了,下面给大家介绍实现这部分的代码,关于KafkaSpout的代码如下所示:

  • KafkaSpout类:
package cn.hadoop.hdfs.storm;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * @Date Jun 10, 2015
 *
 * @Author dengjie
 *
 * @Note Data sources using KafkaSpout to consume Kafka
 */
public class KafkaSpout implements IRichSpout {

    /**
     *
     */
    private static final long serialVersionUID = -7107773519958260350L;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSpout.class);

    SpoutOutputCollector collector;
    private ConsumerConnector consumer;
    private String topic;

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.ZK);
        props.put("group.id", KafkaProperties.GROUP_ID);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    public KafkaSpout(String topic) {
        this.topic = topic;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void close() {
        // TODO Auto-generated method stub

    }

    public void activate() {
        this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
        Map<String, Integer> topickMap = new HashMap<String, Integer>();
        topickMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
        KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            String value = new String(it.next().message());
            LOGGER.info("(consumer)==>" + value);
            collector.emit(new Values(value), value);
        }
    }

    public void deactivate() {
        // TODO Auto-generated method stub

    }

    public void nextTuple() {
        // TODO Auto-generated method stub

    }

    public void ack(Object msgId) {
        // TODO Auto-generated method stub

    }

    public void fail(Object msgId) {
        // TODO Auto-generated method stub

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("KafkaSpout"));
    }

    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}
  • KafkaTopology类:
package cn.hadoop.hdfs.storm.client;

import cn.hadoop.hdfs.storm.FileBlots;
import cn.hadoop.hdfs.storm.KafkaSpout;
import cn.hadoop.hdfs.storm.WordsCounterBlots;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * @Date Jun 10, 2015
 *
 * @Author dengjie
 *
 * @Note KafkaTopology Task
 */
public class KafkaTopology {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("testGroup", new KafkaSpout("test"));
        builder.setBolt("file-blots", new FileBlots()).shuffleGrouping("testGroup");
        builder.setBolt("words-counter", new WordsCounterBlots(), 2).fieldsGrouping("file-blots", new Fields("words"));
        Config config = new Config();
        config.setDebug(true);
        if (args != null && args.length > 0) {
            // online commit Topology
            config.put(Config.NIMBUS_HOST, args[0]);
            config.setNumWorkers(3);
            try {
                StormSubmitter.submitTopologyWithProgressBar(KafkaTopology.class.getSimpleName(), config,
                        builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        } else {
            // Local commit jar
            LocalCluster local = new LocalCluster();
            local.submitTopology("counter", config, builder.createTopology());
            try {
                Thread.sleep(60000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            local.shutdown();
        }
    }
}

4.预览截图

  首先,我们启动Kafka集群,目前未生产任何消息,如下图所示:

  接下来,我们启动Flume集群,开始收集日志信息,将数据输送到Kafka集群,如下图所示:

  接下来,我们启动Storm UI来查看Storm提交的任务运行状况,如下图所示:

  最后,将统计的结果持久化到Redis或者MySQL等DB中,结果如下图所示:

5.总结

  这里给大家分享了数据的消费流程,并且给出了持久化的结果预览图,关于持久化的细节,后面有单独有一篇博客会详细的讲述,给大家分享其中的过程,这里大家熟悉下流程,预览结果即可。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

时间: 2024-11-08 21:35:26

Kafka实战-Kafka到Storm的相关文章

Kafka实战-Storm Cluster

1.概述 在<Kafka实战-实时日志统计流程>一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Storm Cluster的搭建部署.以下是今天的分享目录: Storm简述 基础软件 安装部署 效果预览 下面开始今天的内容分享. 2.Storm简述 Twitter将Storm开源了,这是一个分布式的.容错的实时计算系统,已被贡献到Apache基金会,下载地址如下所示: http://stor

Kafka实战-Flume到Kafka (转)

原文链接:Kafka实战-Flume到Kafka 1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面开始今天的分享内容. 2.数据来源 Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka(供实时计算处理)和HDFS(离线计算处理).关于Flume集群的Ag

大数据日志传输之Kafka实战教程

大数据日志传输之Kafka实战 本套课程围绕Kafka架构详细讲解kafka的核心 架构组件,broker,consumer,producer,以及日志的分段存储,稀疏索引,副本平衡,重分区, 数据同步,Kafka的核心组控制器和消费者控制器等机制. 全面讲解java 最新版的api ,指定分区消费,流控制,手动commit,异步Callback,同步的按照Partition进行批量commit等.实战集成Springboot,spring,以及会讲解到最新的exactly-once, 集成序列

Kafka实战分析

1. Kafka概要设计 kafka在设计之初就需要考虑以下4个方面的问题: 吞吐量/延时 消息持久化 负载均衡和故障转移 伸缩性 1.1 吞吐量/延时 对于任何一个消息引擎而言,吞吐量都是至关重要的性能指标.那么何为吞吐量呢?通常来说,吞吐量是某种处理能力的最大值.而对于Kafka而言,它的吞吐量就是每秒能够处理的消息数或者每秒能够处理的字节数.很显然,我们自然希望消息引擎的吞吐量越大越好. 消息引擎系统还有一个名为延时的性能指标.它衡量的是一段时间间隔,可能是发出某个操作与接收到操作响应(r

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

漫游kafka实战篇之搭建Kafka开发环境

转载注明出处:http://blog.csdn.net/honglei915/article/details/37563647 上篇文章中我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息.下面我们来搭建kafka的开发环境. 添加依赖 搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了.不过我们使用另一种更加流行的方式:使用maven管理jar包依赖. 创建好mav

第89课:SparkStreaming on Kafka之Kafka解析和安装实战

本篇博文将从以下方面组织内容: 1. Kafka解析 2. 消息组件Kafka 3. Kafka安装 实验搭建所需要的软件: kafka_2.10-0.9.0.1 Zookeeper集群已经安装好.在上一篇博文有安装步骤,不清楚的朋友可以参考下. 一:Kafka解析 1. Kafka是生产者和消费者模式中广播概念,Kafka也可以实现队列的方式. 2. Kafka不仅是一个消息中间键,还是一个存储系统,可以将流进来的数据存储一段时间.这就与传统的流式处理不一样,传统的流式处理处理完数据之后就消失

Kafka实战-KafkaOffsetMonitor

1.概述 前面给大家介绍了Kafka的背景以及一些应用场景,并附带上演示了Kafka的简单示例.然后,在开发的过程当中,我们会发现一些问题,那就是消息的监控情况.虽然,在启动Kafka的相关服务后,我们生产消息和消费消息会在终端控制台显示这些记录信息,但是,这样始终不够友好,而且,在实际开发中,我们不会有权限去一直观看终端控制台,那么今天就为大家来介绍Kafka的一个监控系统——KafkaOffsetMonitor.下面是今天所分享的目录内容: KafkaOffsetMonitor简述 Kafk

Kafka实战系列--Kafka API使用体验

前言: kafka是linkedin开源的消息队列, 淘宝的metaq就是基于kafka而研发. 而消息队列作为一个分布式组件, 在服务解耦/异步化, 扮演非常重要的角色. 本系列主要研究kafka的思想和使用, 本文主要讲解kafka的一些基本概念和api的使用. *) 准备工作1) 配置maven依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</