Storm集成Kafka应用的开发

  我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用kafka作为消息队列是非常合适的选择,kafka可以将不均匀的数据转换成均匀的消息流,从而和storm比较完善的结合,这样才可以实现稳定的流式计算,那么我们接下来开发一个简单的案例来实现storm和kafka的结合

  storm和kafka结合,实质上无非是之前我们说过的计算模式结合起来,就是数据先进入kafka生产者,然后storm作为消费者进行消费,最后将消费后的数据输出或者保存到文件、数据库、分布式存储等等,具体框图如下:

  

  这张图片摘自博客地址:http://www.cnblogs.com/tovin/p/3974417.html 在此感谢作者的奉献

  首先我们保证在服务器上zookeeper、kafka、storm正常运行,然后我们开始写程序,这里使用eclipse for javaee IDE

  和之前一样,建立一个maven项目,在pom.xml写入如下代码:

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3   <modelVersion>4.0.0</modelVersion>
 4
 5   <groupId>kafkastorm</groupId>
 6   <artifactId>kafkastorm</artifactId>
 7   <version>0.0.1-SNAPSHOT</version>
 8   <packaging>jar</packaging>
 9
10   <name>kafkastorm</name>
11   <url>http://maven.apache.org</url>
12
13   <properties>
14     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15   </properties>
16
17   <dependencies>
18     <dependency>
19       <groupId>junit</groupId>
20       <artifactId>junit</artifactId>
21       <version>3.8.1</version>
22       <scope>test</scope>
23     </dependency>
24     <dependency>
25         <groupId>org.apache.storm</groupId>
26         <artifactId>storm-core</artifactId>
27         <version>0.9.6</version>
28         <scope>provided</scope>
29     </dependency>
30     <dependency>
31         <groupId>org.apache.kafka</groupId>
32         <artifactId>kafka_2.9.2</artifactId>
33         <version>0.8.2.2</version>
34         <exclusions>
35             <exclusion>
36                 <groupId>org.apache.zookeeper</groupId>
37                 <artifactId>zookeeper</artifactId>
38             </exclusion>
39             <exclusion>
40                 <groupId>log4j</groupId>
41                 <artifactId>log4j</artifactId>
42             </exclusion>
43         </exclusions>
44     </dependency>
45     <dependency>
46         <groupId>org.apache.storm</groupId>
47         <artifactId>storm-kafka</artifactId>
48         <version>0.9.6</version>
49     </dependency>
50   </dependencies>
51
52   <build>
53     <plugins>
54       <plugin>
55         <artifactId>maven-assembly-plugin</artifactId>
56         <configuration>
57           <descriptorRefs>
58             <descriptorRef>jar-with-dependencies</descriptorRef>
59           </descriptorRefs>
60         </configuration>
61         <executions>
62           <execution>
63             <id>make-assembly</id>
64             <phase>package</phase>
65             <goals>
66               <goal>single</goal>
67             </goals>
68           </execution>
69         </executions>
70       </plugin>
71     </plugins>
72   </build>
73 </project>

  主要是导入的zookeeper、storm、kafka外部依赖这些叠加起来,还有<plugin>插件便于我们后续对程序进程maven的打包

  和之前一样首先编写storm消费kafka的逻辑,MessageScheme类,代码如下:

 1 package net.zengzhiying;
 2
 3 import java.io.UnsupportedEncodingException;
 4 import java.util.List;
 5
 6 import backtype.storm.spout.Scheme;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Values;
 9
10 public class MessageScheme implements Scheme {
11
12     public List<Object> deserialize(byte[] arg0) {
13         try {
14             String msg = new String(arg0, "UTF-8");
15             return new Values(msg);
16         } catch (UnsupportedEncodingException e) {
17             e.printStackTrace();
18         }
19         return null;
20     }
21
22     public Fields getOutputFields() {
23         return new Fields("msg");
24     }
25
26 }

  逻辑很简单,就是对kafka出来的数据转换成字符串,接下来我们想办法来处理strom清洗之后的数据,我们为了简单就把输出保存到一个文件中,Bolt逻辑SenqueceBolt类的代码如下:

 1 package net.zengzhiying;
 2
 3 import java.io.DataOutputStream;
 4 import java.io.FileNotFoundException;
 5 import java.io.FileOutputStream;
 6 import java.io.IOException;
 7
 8 import backtype.storm.topology.BasicOutputCollector;
 9 import backtype.storm.topology.OutputFieldsDeclarer;
10 import backtype.storm.topology.base.BaseBasicBolt;
11 import backtype.storm.tuple.Fields;
12 import backtype.storm.tuple.Tuple;
13 import backtype.storm.tuple.Values;
14
15 public class SenqueceBolt extends BaseBasicBolt {
16
17     public void execute(Tuple arg0, BasicOutputCollector arg1) {
18         String word = (String) arg0.getValue(0);
19         String out = "output:" + word;
20         System.out.println(out);
21
22         //写文件
23         try {
24             DataOutputStream out_file = new DataOutputStream(new FileOutputStream("kafkastorm.out"));
25             out_file.writeUTF(out);
26             out_file.close();
27         } catch (FileNotFoundException e) {
28             // TODO Auto-generated catch block
29             e.printStackTrace();
30         } catch (IOException e) {
31             // TODO Auto-generated catch block
32             e.printStackTrace();
33         }
34
35         arg1.emit(new Values(out));
36     }
37
38     public void declareOutputFields(OutputFieldsDeclarer arg0) {
39         arg0.declare(new Fields("message"));
40     }
41
42 }

  就是把输出的消息放到文件kafkastorm.out中

  然后我们编写主类,也就是配置kafka提交topology到storm的代码,类名为StormKafkaTopo,代码如下:

 1 package net.zengzhiying;
 2
 3 import java.util.HashMap;
 4 import java.util.Map;
 5
 6 import backtype.storm.Config;
 7 import backtype.storm.LocalCluster;
 8 import backtype.storm.StormSubmitter;
 9 import backtype.storm.generated.AlreadyAliveException;
10 import backtype.storm.generated.InvalidTopologyException;
11 import backtype.storm.spout.SchemeAsMultiScheme;
12 import backtype.storm.topology.TopologyBuilder;
13 import backtype.storm.utils.Utils;
14 import storm.kafka.BrokerHosts;
15 import storm.kafka.KafkaSpout;
16 import storm.kafka.SpoutConfig;
17 import storm.kafka.ZkHosts;
18 import storm.kafka.bolt.KafkaBolt;
19
20 public class StormKafkaTopo {
21     public static void main(String[] args) {
22         BrokerHosts brokerHosts = new ZkHosts("192.168.1.216:2181/kafka");
23
24         SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/kafka", "kafkaspout");
25
26         Config conf = new Config();
27         Map<String, String> map = new HashMap<String, String>();
28
29         map.put("metadata.broker.list", "192.168.1.216:9092");
30         map.put("serializer.class", "kafka.serializer.StringEncoder");
31         conf.put("kafka.broker.properties", map);
32         conf.put("topic", "topic2");
33
34         spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
35
36         TopologyBuilder builder = new TopologyBuilder();
37         builder.setSpout("spout", new KafkaSpout(spoutConfig));
38         builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
39         builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");
40
41         if(args != null && args.length > 0) {
42             //提交到集群运行
43             try {
44                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
45             } catch (AlreadyAliveException e) {
46                 e.printStackTrace();
47             } catch (InvalidTopologyException e) {
48                 e.printStackTrace();
49             }
50         } else {
51             //本地模式运行
52             LocalCluster cluster = new LocalCluster();
53             cluster.submitTopology("Topotest1121", conf, builder.createTopology());
54             Utils.sleep(1000000);
55             cluster.killTopology("Topotest1121");
56             cluster.shutdown();
57         }
58
59
60
61     }
62 }

  注意上面代码的配置,和之前单独运行storm和kafka代码不太一样,配置也很简单,注意区别即可,如果细心的话会注意到这里建了两个topic一个是topic1,一个是topic2,topic1的含义kafka接收生产者过来的数据所需要的topic,topic2是KafkaBolt也就是storm中的bolt生成的topic,当然这里topic2这行配置可以省略,是没有任何问题的,类似于一个中转的东西,另外我们这次测试是上传到服务器执行,本地模式的代码没有执行到,当然原理是一样的

  之前一般网上的教程到这里就完毕了,这样我们会引起一种没有生产者的误区,注意:上面3个类实现的功能是kafka消费者输出的数据被storm消费!生产者的代码可以看成独立的其他来源,可以写在其他项目中,根据数据源的情况来,下面我们为了示例,编写一个类来进行生产,代码和之前kafka单独的一样:

 1 package net.zengzhiying;
 2
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.Properties;
 6
 7 import kafka.javaapi.producer.Producer;
 8 import kafka.producer.KeyedMessage;
 9 import kafka.producer.ProducerConfig;
10
11 public class DataProducerInsert {
12     private static Producer<Integer,String> producer;
13     private final Properties props=new Properties();
14     public DataProducerInsert(){
15             //定义连接的broker list
16             props.put("metadata.broker.list", "192.168.1.216:9092");
17             //定义序列化类 Java中对象传输之前要序列化
18             props.put("serializer.class", "kafka.serializer.StringEncoder");
19             //props.put("advertised.host.name", "192.168.1.216");
20             producer = new Producer<Integer, String>(new ProducerConfig(props));
21     }
22     public static void main(String[] args) {
23             DataProducerInsert sp=new DataProducerInsert();
24             //定义topic
25             String topic="topic1";
26             //开始时间统计
27             long startTime = System.currentTimeMillis();
28             //定义要发送给topic的消息
29             String messageStr = "This is a message";
30             List<KeyedMessage<Integer, String>> datalist = new ArrayList<KeyedMessage<Integer, String>>();
31
32             //构建消息对象
33             KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
34             datalist.add(data);
35
36             //结束时间统计
37             long endTime = System.currentTimeMillis();
38             KeyedMessage<Integer, String> data1 = new KeyedMessage<Integer, String>(topic, "用时" + (endTime-startTime)/1000.0);
39             datalist.add(data1);
40
41             //推送消息到broker
42             producer.send(data);
43             producer.close();
44     }
45 }

  注意,这里我们定义的topic是topic1,正好和前面的topic1数据源对应,是整个kafka保持一致的topic,也就是说kafka生产者topic和消费者topic是必须名称相同才可以响应,下面简单添加了一点时间统计的代码,也很简单

  另外还要注意kafka配置文件host.name尽量改成ip,和之前说过的一样

  到现在项目就编写完成了,然后我们使用maven命令对项目打包,首先得保证我们windows上安装好了maven,我们运行cmd,进入到当前项目目录下,执行命令: mvn assembly:assembly 进行打包,打包的前提就是之前pom.xml的所有配置,执行后maven会自动下载相应的依赖并完成打包,需要耐心等待一会:

  

  看到如图所示的BUILD SUCCESS返回之后,那么打包就成功了,现在进入项目目录下的target目录中,会看到2个jar包

  

  其中后面那个文件名较长的大小也比较大,是包含相关依赖的包,接下来我们将这个包上传到服务器,然后使用storm执行jar包将我们的topology上传到集群中运行,注意是使用storm执行jar包,而不是java

/usr/storm/apache-storm-0.9.6/bin/storm jar kafkastorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar net.zengzhiying.StormKafkaTopo kafkagostorm

  前面是storm的绝对路径,参数jar执行jar包,后面跟的是上传topology的主类,最后kafkagostorm是我们上传拓扑的名称

  

  这里执行完之后会回到命令行,现在就在后台集群中开始分发运行了,这就是集群模式,之前我们讲的storm案例会不断滚动大量数据,那个属于本地模式,如果我们现在开启ui界面的话,那么访问我们的地址http://192.168.1.216:8080/可以看到正在运行的Topology

  

  可以看到状态是active正在运行了,我们上面代码中kafkabolt创建了一个topic2的消息,我们现在可以测试一下,消费者这里只是简单地原样输出,我们进入kafka目录,执行下面命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic topic2 --from-beginning

  后面参数--from-beginning不添加也是可以的,添加包括旧信息,不添加就是新的输出

  现在界面卡住,待会我们来观察输出,现在我们新开一个窗口,还是使用storm执行刚才的生产者类DataProducerInsert来生产一条消息,命令如下:

/usr/storm/apache-storm-0.9.6/bin/storm jar kafkastorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar net.zengzhiying.DataProducerInsert

  回车之后,等待界面滚动2s程序跑完之后,我们看到另一个窗口输出了消息:

  

  然后,我们的输出文件在哪呢,刚才我们使用storm执行的生产者代码,所以输出的kafkastorm.out就在storm的安装目录下,我们使用cat或者vim都可以看到文件内容,如果有时间统计的话两行内容显示可能会有点问题,因为后续要进行简单的转换,去掉时间统计代码只输出消息的内容如下,这里使用vim打开的:

  

  这样storm集成kafka的测试案例就完成了,并且实现了一定的功能,只要我们灵活掌握了怎么写kafka和storm结合的整体拓扑结构,那么主要的代码就集中在数据源也就是kafka生产者的发送和storm消费后的存储问题,这所有的代码都是在storm和kafka给好的方法内写逻辑,而不用关心底层,这样使开发更加简单快捷,比如我们消费后的数据既可以写到文件、数据库还可以索引到solr,存到Hbase等,这样就可以灵活运用了;其实最关键的还是要了解这些框架底层的实现原理,这样遇到问题才可以知其然知其所以然

时间: 2024-11-03 21:27:52

Storm集成Kafka应用的开发的相关文章

storm集成kafka的应用,从kafka读取,写入kafka

storm集成kafka的应用,从kafka读取,写入kafka by 小闪电 0前言 storm的主要作用是进行流式的实时计算,对于一直产生的数据流处理是非常迅速的,然而大部分数据并不是均匀的数据流,而是时而多时而少.对于这种情况下进行批处理是不合适的,因此引入了kafka作为消息队列,与storm完美配合,这样可以实现稳定的流式计算.下面是一个简单的示例实现从kafka读取数据,并写入到kafka,以此来掌握storm与kafka之间的交互. 1程序框图 实质上就是storm的kafkasp

storm集成kafka

kafkautil: import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; public class KafkaUtil { @Value("#{sys['connect']}") private static

5、Storm集成Kafka

1.pom文件依赖 <!--storm相关jar --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!--排除相关依赖 --> <exclusions> <exclusion>

Storm集群上的开发 ,任务计算输出到mysql数据库,集成jdbc(十)

storm集成jdbc,把计算结果保存到mysql中. 首先在mysql中建表 ,表的字段与输出的tuple的schema一致: create table result( word varchar(20), total int ); 编写一个连接提供器,用于获取mysql数据库连接: 需要引入jar :/usr/local/apps/apache-storm-1.0.3/external/storm-jdbc 的 storm-jdbc-1.0.3.jar package mystorm.word

spark streaming集成kafka

Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark等都支持与Kafka集成. Spark streaming集成kafka是企业应用中最为常见的一种场景. 一.安装kafka 参考文档: http://kafka.apache.org/quickstart#quickstart_createtopic 1.安

详解Kafka: 大数据开发最火的核心技术

详解Kafka: 大数据开发最火的核心技术 架构师技术联盟 2019-06-10 09:23:51 本文共3268个字,预计阅读需要9分钟. 广告 大数据时代来临,如果你还不知道Kafka那你就真的out了(快速掌握Kafka请参考文章:如何全方位掌握Kafka核心技术)!据统计,有三分之一的世界财富500强企业正在使用Kafka,包括所有TOP10旅游公司,7家TOP10银行,8家TOP10保险公司,9家TOP10电信公司等等. LinkedIn.Microsoft和Netflix每天都用Ka

storm 整合 kafka之保存MySQL数据库

整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理.实际上在 apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置. 1.配置Maven依赖包 [ht

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

SpringCloud学习之SpringCloudStream&amp;集成kafka

一.关于Spring-Cloud-Stream Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架.通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理. 在这里我先放一张官网的图: 应用程序通过Spring Cloud Stream注入到其中的输入和输出通道与外界进行通信,应用程序并不关心到底和