Trident整合Kafka

首先编写一个打印函数KafkaPrintFunction

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

public class KafkaPrintFunction extends BaseFunction {

    @Override
    public void execute(TridentTuple input, TridentCollector collector) {
        String msg = input.getStringByField("str");
        System.out.println(this.getClass().getSimpleName() + ": " + msg);
        collector.emit(new Values(msg));
    }

}

然后编写trident整合kafka的topology

import net.baiqu.shop.report.utils.Constants;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

/**
 * kafka连接trident
 */
public class KafkaTrident {

    public static void main(String[] args) {
        TridentTopology topology = new TridentTopology();

        BrokerHosts hosts = new ZkHosts(Constants.ZK_HOSTS);
        String topic = "tridentTestTopic";
        String id = "kafka.queue.tridentTestTopic";
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topic, id);
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);

        Stream stream = topology.newStream("kafkaSpout", kafkaSpout);
        stream.shuffle().each(new Fields("str"), new KafkaPrintFunction(), new Fields("result"));

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("kafkaTridentDemo", new Config(), topology.build());
    }

}

另一个Java项目发送kafka数据

    @Scheduled(fixedRate = 3000)
    public void shopDataTestJob9() {
        for (int i = 0; i < 1; i++) {
            kafkaTemplate.send("tridentTestTopic", "test kafka trident");
            System.out.println("test kafka trident");
        }
    }

最后运行storm项目以及java项目(需要先运行java项目往kafka发数据,建立此topic,storm才能消费这个topic)

观察结果,storm项目控制台输出

KafkaPrintFunction: test kafka trident
KafkaPrintFunction: test kafka trident
KafkaPrintFunction: test kafka trident

表示storm trident消费kafka数据成功

原文地址:https://www.cnblogs.com/tangzhe/p/9618495.html

时间: 2024-08-29 18:58:30

Trident整合Kafka的相关文章

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

flume 整合 kafka

flume 整合 kafka: flume 采集业务日志,发送到kafka 安装部署Kafka Download 1.0.0 is the latest release. The current stable version is 1.0.0. You can verify your download by following these procedures and using these KEYS. 1.0.0 Released November 1, 2017 Source downloa

storm 整合 kafka之保存MySQL数据库

整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理.实际上在 apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置. 1.配置Maven依赖包 [ht

spring boot整合kafka

最近项目需求用到了kafka信息中间件,在此做一次简单的记录,方便以后其它项目用到. 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 配置文件 kafka.consumer.servers=127.0.0.1:9092 kafka.consumer.ena

SpringBoot整合Kafka和Storm

前言 本篇文章主要介绍的是SpringBoot整合kafka和storm以及在这过程遇到的一些问题和解决方案. kafka和storm的相关知识 如果你对kafka和storm熟悉的话,这一段可以直接跳过!如果不熟,也可以看看我之前写的博客.一些相关博客如下. kafka 和 storm的环境安装 地址:http://www.panchengming.com/2018/01/26/pancm70/ kafka的相关使用 地址:http://www.panchengming.com/2018/01

spring boot 整合kafka 报错 Exception thrown when sending a message with key=&#39;null&#39; and payload=JSON to topic proccess_trading_end: TimeoutException: Failed to update metadata after 60000 ms.

org.springframework.kafka.support.LoggingProducerListener- Exception thrown when sending a message with key='null' and payload='{"dataDts":["20180329","20180328","20180327","20180326","20180323"]

Nginx整合Kafka

背景 nginx-kafka-module是nginx的一个插件,可以将kafka整合到nginx中,便于web项目中前端页面埋点数据的收集,如前端页面设置了埋点,即可将用户的一些访问和请求数据通过http请求直接发送到消息中间件kafka中,后端可以通过程序消费kafka中的消息来进行实时的计算.比如通过SparkStream来实时的消费Kafka中的数据来分析用户PV,UV.用户的一些行为及页面的漏斗模型转化率,来更好的对系统进行优化或者对来访用户进行实时动态的分析. 具体整合步骤 1.安装

SparkStreaming整合kafka的补充

(1)SparkStreaming 整合 kafka 两种方式对比 Direct 方式的优缺点分析 : 优点: 简化并行(Simplified Parallelism).不现需要创建以及 union 多输入源,Kafka topic 的partition 与 RDD 的 partition 一一对应. 高效(Efficiency).基于 Receiver-based 的方式保证数据零丢失(zero-data loss)需要配置 spark.streaming.receiver.writeAhea

Flume整合Kafka完成实时数据采集

agent选择 agent1 exec source + memory channel + avro sink agent2 avro source + memory channel 模拟实际工作中的场景,agent1 为A机器,agent2 为B机器. avro source: 监听avro端口,并且接收来自外部avro信息, avro sink:一般用于跨节点传输,主要绑定数据移动目的地的ip和port 在创建agent2配置文件 cd /app/flume/flume/conf vi te