Storm-Kafka模块常用接口分析及使用方式

使用storm-kafka模块读取kafka中的数据,按照以下两步进行构建(我使用的版本是0.9.3)

1. 使用BrokerHosts接口来配置kafka broker host与partition的mapping信息;

2. 使用KafkaConfig来配置一些与kafka自身相关的选项,如fetchSizeBytes、socketTimeoutMs

下面分别介绍这两块的实现:

对于配置1,目前支持两种实现方式:zk配置、静态ip端口方式

第一种方式:Zk读取(比较常见)

ZkHosts支持两种创建方式,
public ZkHosts(String brokerZkStr, String brokerZkPath)
//使用默认brokerZkPath:"/brokers"
public ZkHosts(String brokerZkStr)

通过这种方式访问的时候,经过60s会刷新一下host->partition的mapping

   

第二步:构建KafkaConfig对象

目前提供两种构造函数,

public KafkaConfig(BrokerHosts hosts, String topic)
//clientId如果不想每次随机生成的话,就自己设置一个
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

代码参考:

        //这个地方其实就是kafka配置文件里边的zookeeper.connect这个参数,可以去那里拿过来。
        String brokerZkStr = "10.100.90.201:2181/kafka_online_sample";
        String brokerZkPath = "/brokers";
        ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);

        String topic = "mars-wap";
        //以下:将offset汇报到哪个zk集群,相应配置
//        String offsetZkServers = "10.199.203.169";
        String offsetZkServers = "10.100.90.201";
        String offsetZkPort = "2181";
        List<String> zkServersList = new ArrayList<String>();
        zkServersList.add(offsetZkServers);
        //汇报offset信息的root路径
        String offsetZkRoot = "/stormExample";
        //存储该spout id的消费offset信息,譬如以topoName来命名
        String offsetZkId = "storm-example";

        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);
        kafkaConfig.zkRoot = offsetZkRoot;
        kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);
        kafkaConfig.zkServers = zkServersList;
        kafkaConfig.id = offsetZkId;
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        KafkaSpout spout = new KafkaSpout(kafkaConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", spout, 1);
        builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");

        Config config = new Config();

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", config, builder.createTopology());

        // cluster submit.
//        try {
//            StormSubmitter.submitTopology("storm-kafka-example",config,builder.createTopology());
//        } catch (AlreadyAliveException e) {
//            e.printStackTrace();
//        } catch (InvalidTopologyException e) {
//            e.printStackTrace();
//        }

完整的使用例子,见github源码

https://github.com/tonylee0329/storm-example/blob/master/src/main/java/org/tony/storm_kafka/common/ZkTopology.java

参考:

https://github.com/apache/storm/blob/v0.9.3/external/storm-kafka/README.md

时间: 2024-11-09 20:52:25

Storm-Kafka模块常用接口分析及使用方式的相关文章

VUE中常用的几种import(模块、文件)引入方式

VUE中常用的几种import(模块.文件)引入方式:https://blog.csdn.net/weixin_38930535/article/details/80177445 1.引入js文件 在用的那一页,引入文件 相应的js文件里面,必须是暴露出来的 2.引入组件 3.引入外部组件 4.引入外部js插件 原文地址:https://www.cnblogs.com/bydzhangxiaowei/p/12237156.html

SAP MM模块 常用Bapi

  1.sap货物移动相关的bapi(MIGO/MB1A) 货物移动的bapi  BAPI_GOODSMVT_CREATE 其中 参数 : GOODSMVT_CODE 有 GMCODE Table T158G - 01 - MB01 - Goods Receipts for Purchase Order *                     02 - MB31 - Goods Receipts for Prod Order *                     03 - MB1A -

【REACT NATIVE 系列教程之十三】利用LISTVIEW与TEXTINPUT制作聊天/对话框&&获取组件实例常用的两种方式

本站文章均为 李华明Himi 原创,转载务必在明显处注明: 转载自[黑米GameDev街区] 原文链接: http://www.himigame.com/react-native/2346.html 本篇Himi来利用ListView和TextInput这两种组件实现对话.聊天框. 首先需要准备的有几点:(组件的学习就不赘述了,简单且官方有文档) 1. 学习下 ListView: 官方示例:http://reactnative.cn/docs/0.27/tutorial.html#content

PA模块常用表

SELECT * FROM pa_projects_all; --项目 SELECT * FROM pa_project_types; --项目类型 SELECT * FROM pa_project_statuses; --项目状态 SELECT * FROM pa_project_options; --项目选项 SELECT * FROM pa_lookups l WHERE l.lookup_type = 'PA_OPTIONS' ; SELECT * FROM pa_project_cus

样式里常用的字段截取方式

<td style="width:120px; word-break: break-all;"><%= switch.download_url %></td> word-break overflow-wrap word-wrap 参考: 你真的了解word-wrap和word-break的区别吗? Difference between overflow-wrap and word-break overflow: hidden; white-space

tomcat的虚拟目录映射常用的几种方式

  我们在项目部署的时候,可以采用多种方式,接下来我们将在实际中比较常用的几种方式总结如下. 1.可以直接将我们的项目丢到tomcat的webapps目录下,这样当tomcat重启的时候,我们就可以访问到项目中的页面了 举例: ①明确了项目要提供外界访问的名称为:JavaWebProject(里面有一个页面welcome.html) ②明确了tomcat的端口号8080 ③这样重启tomcat,我们就可以通过http://localhost:8080/JavaWebProject/welcome

数据导入HBase最常用的三种方式及实践分析

数据导入HBase最常用的三种方式及实践分析         摘要:要使用Hadoop,需要将现有的各种类型的数据库或数据文件中的数据导入HBase.一般而言,有三种常见方式:使用HBase的API中的Put方法,使用HBase 的bulk load工具和使用定制的MapReduce Job方式.本文均有详细描述. [编者按]要使用Hadoop,数据合并至关重要,HBase应用甚广.一般而言,需要 针对不同情景模式将现有的各种类型的数据库或数据文件中的数据转入至HBase 中.常见方式为:使用H

常用的页面提交方式

本文来总结一下,常用的页面提交方式. 提到常用,大家就要注意了.如果你选用的方式不常用,那么你就该思量一下自己的设计或者程序了.还是要站在巨人的肩膀上,才能创造出好的东西. 其实,本文将要讲的几种方式都比较常用. 常用就是合适的时候,做合适的事情.合适的技术,用在合适的地方,就是好的设计:就像合适的时间,遇到合适的人一样心有灵犀. 方式一: 常用的form提交. 该种方式提交页面,页面会进行跳转.这种方式可以选择post提交方式,还是get提交方式,一般情况下,提交form通用post.这种方式

cocos2d-x 3.0 常用对象的创建方式

cocos2d-x 3.0 中所有对象几乎都可以用create函数来创建,其他的创建方式也是有create函数衍生. 下面来介绍下create函数创建一般对象的方法,省得开发中经常忘记啥的. 1.精灵Sprite的4种创建方式 (1)根据图片资源路径来创建 ? 1 2 3 4 //根据图片路径来创建 auto sprite1 = Sprite::create(filepath); //根据图片路径来创建,并设置要显示的图片大小 auto sprite2 = Sprite::create(file