storm集成kafka的应用,从kafka读取,写入kafka

storm集成kafka的应用,从kafka读取,写入kafka

                                                      by 小闪电

0前言

  storm的主要作用是进行流式的实时计算,对于一直产生的数据流处理是非常迅速的,然而大部分数据并不是均匀的数据流,而是时而多时而少。对于这种情况下进行批处理是不合适的,因此引入了kafka作为消息队列,与storm完美配合,这样可以实现稳定的流式计算。下面是一个简单的示例实现从kafka读取数据,并写入到kafka,以此来掌握storm与kafka之间的交互。

1程序框图

  实质上就是storm的kafkaspout作为一个consumer,kafkabolt作为一个producer。

  框图如下:

        

2 pom.xml

  建立一个maven项目,将storm,kafka,zookeeper的外部依赖叠加起来。

  

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.tony</groupId>
    <artifactId>storm-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.3</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.3</version>
            <!--<scope>provided</scope>-->
        </dependency>

       <dependency>

         <groupId>com.google.protobuf</groupId>

         <artifactId>protobuf-java</artifactId>

         <version>2.5.0</version>

        </dependency>

        <!-- storm-kafka模块需要的依赖 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.5.0</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>central</id>
            <url>http://repo1.maven.org/maven2/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>clojars</id>
            <url>https://clojars.org/repo/</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>scala-tools</id>
            <url>http://scala-tools.org/repo-releases</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>conjars</id>
            <url>http://conjars.org/repo/</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <encoding>UTF-8</encoding>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3 kafkaspout的消费逻辑,修改MessageScheme类,其中定义了俩个字段,key和message,方便分发到kafkabolt。代码如下

package com.tony.storm_kafka.util;

import java.io.UnsupportedEncodingException;
import java.util.List;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/*
 *author: hi
 *public class MessageScheme{ }
 **/
public class MessageScheme implements Scheme {

    @Override
    public List<Object> deserialize(byte[] arg0) {
        try{
         String msg = new String(arg0, "UTF-8");
         String msg_0 = "hello";
         return new Values(msg_0,msg);
        }
        catch (UnsupportedEncodingException  e) {
            // TODO: handle exception
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public Fields getOutputFields() {

        return new Fields("key","message");
    }

}

4.编写topology主类,配置kafka,提交topology到storm的代码,其中kafkaspout的zkhost有动态和静态俩种配置,尽量使用动态自寻的方式。

package org.tony.storm_kafka.common;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TridentKafkaState;

import java.util.Arrays;
import java.util.Properties;
import org.tony.storm_kafka.bolt.ToKafkaBolt;
import com.tony.storm_kafka.util.MessageScheme;

public class KafkaBoltTestTopology {

    //配置kafka spout参数
    public static String kafka_zk_port = null;
    public static String topic = null;
    public static String kafka_zk_rootpath = null;
    public static BrokerHosts brokerHosts;
    public static String spout_name = "spout";
    public static String kafka_consume_from_start = null;

    public static class PrinterBolt extends BaseBasicBolt {

        /**
         *
         */
            private static final long serialVersionUID = 9114512339402566580L;

            //    @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            }

         //   @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                System.out.println("-----"+(tuple.getValue(1)).toString());
            }

        }

    public StormTopology buildTopology(){
        //kafkaspout 配置文件
        kafka_consume_from_start = "true";
        kafka_zk_rootpath = "/kafka08";
        String spout_id = spout_name;
        brokerHosts = new ZkHosts("192.168.201.190:2191,192.168.201.191:2191,192.168.201.192:2191", kafka_zk_rootpath+"/brokers");
        kafka_zk_port = "2191";
      
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, "testfromkafka", kafka_zk_rootpath, spout_id);
        spoutConf.scheme = new SchemeAsMultiScheme(new MessageScheme());
        spoutConf.zkPort = Integer.parseInt(kafka_zk_port);
        spoutConf.zkRoot = kafka_zk_rootpath;
        spoutConf.zkServers = Arrays.asList(new String[] {"10.9.201.190", "10.9.201.191", "10.9.201.192"});

        //是否從kafka第一條數據開始讀取
        if (kafka_consume_from_start == null) {
            kafka_consume_from_start = "false";
        }
        boolean kafka_consume_frome_start_b = Boolean.valueOf(kafka_consume_from_start);
        if (kafka_consume_frome_start_b != true && kafka_consume_frome_start_b != false) {
            System.out.println("kafka_comsume_from_start must be true or false!");
        }
        System.out.println("kafka_consume_from_start: " + kafka_consume_frome_start_b);
        spoutConf.forceFromStart=kafka_consume_frome_start_b;

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new KafkaSpout(spoutConf));
        builder.setBolt("forwardToKafka", new ToKafkaBolt<String, String>()).shuffleGrouping("spout");
        return builder.createTopology();
    }

    public static void main(String[] args) {

        KafkaBoltTestTopology kafkaBoltTestTopology = new KafkaBoltTestTopology();
        StormTopology stormTopology = kafkaBoltTestTopology.buildTopology();

        Config conf = new Config();
        //设置kafka producer的配置
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.10.43.150:9092");
        props.put("producer.type","async");
        props.put("request.required.acks", "0"); // 0 ,-1 ,1
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
        conf.put("topic","testTokafka");

        if(args.length > 0){
            // cluster submit.
            try {
                 StormSubmitter.submitTopology("kafkaboltTest", conf, stormTopology);
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        }else{
            new LocalCluster().submitTopology("kafkaboltTest", conf, stormTopology);
        }

    }
}

5 示例结果,testfromkafka topic里面的数据可以通过另外写个类来进行持续的生产。

  topic testfromkafka的数据

  topic testTokafka的数据

时间: 2024-12-23 01:03:03

storm集成kafka的应用,从kafka读取,写入kafka的相关文章

Storm集成Kafka应用的开发

我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用kafka作为消息队列是非常合适的选择,kafka可以将不均匀的数据转换成均匀的消息流,从而和storm比较完善的结合,这样才可以实现稳定的流式计算,那么我们接下来开发一个简单的案例来实现storm和kafka的结合 s

5、Storm集成Kafka

1.pom文件依赖 <!--storm相关jar --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!--排除相关依赖 --> <exclusions> <exclusion>

filebeat读取nginx日志并写入kafka

filebeat写入kafka的配置: filebeat.inputs: - type: log paths: - /tmp/access.log tags: ["nginx-test"] fields: type: "nginx-test" log_topic: "nginxmessages" fields_under_root: true processors: - drop_fields: fields: ["beat"

java实时监听日志写入kafka(转)

原文链接:http://www.sjsjw.com/kf_cloud/article/020376ABA013802.asp 目的 实时监听某目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整) 源码: import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File;

java实时监听日志写入kafka

目的 实时监听某目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整) 源码: [java] view plain copy import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import j

java实时监听日志写入kafka(多目录)

目的 实时监听多个目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整) 源码 [java] view plain copy import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import j

Kafka学习之路 (五)Kafka在zookeeper中的存储

当kafka启动的时候,就会向zookeeper里面注册一些信息,这些数据也称为Kafka的元数据信息. 一.Kafka在zookeeper中存储结构图 二.分析 根目录下的结构 服务端开启的情况下,进入客户端的命令:{zookeeper目录}/bin/zkCli.sh # {zookeeper目录}/bin/zkCli.sh [zk: localhost:2181(CONNECTED) 1] ls / [cluster, controller_epoch, controller, broker

Kafka基础系列第2讲:Kafka技术架构剖析

使用过 Kafka 框架的朋友都知道,启动 Kafka 框架只需要两个关联的组件,分别是:Zookeeper 和 Kafka.如果你还没使用过 Kafka 框架,建议先阅读<Kafka 快速入门教程>把玩一下,对 Kafka 有一个感性的认识. 当我们熟悉了 Kafka 的使用之后,我们自然有一些疑惑:Kafka 到底是如何工作的?消息从生产者到 Kafka Server 这中间到底做了什么事情?而 Zookeeper Server 在这过程中有起到什么作用?带着这些疑问,今天我们来深入了解一

kafka解释三的具体:发展Kafka应用

一个.整体外观Kafka 我们知道.Kafka系统有三大组件:Producer.Consumer.broker . producers 生产(produce)消息(message)并推(push)送给brokers,consumers从brokers把消息提取(pull)出来消费(consume). 二.开发一个Producer应用 Producers用来生产消息并把产生的消息推送到Kafka的Broker.Producers能够是各种应用.比方web应用.server端应用,代理应用以及log