storm和kafka整合

storm和kafka整合

依赖

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

App

package test;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.kafka.spout.Func;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class App
{
    public static void main( String[] args ) throws Exception{

        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig
            .builder("worker1:9092,worker2:9092,worker3:9092", "test") // 你的kafka集群地址和topic
            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "consumer") // 设置消费者组,随便写
            .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024 * 4)
            // .setRecordTranslator(new MyRecordTranslator())
            .setRecordTranslator( // 翻译函数,就是将消息过滤下,具体操作自己玩
                    new MyRecordTranslator(),
                    new Fields("word")
                )
            .setRetry( // 某条消息处理失败的策略
                    new KafkaSpoutRetryExponentialBackoff(
                        new TimeInterval(500L, TimeUnit.MICROSECONDS),
                        TimeInterval.milliSeconds(2),
                        Integer.MAX_VALUE,
                        TimeInterval.seconds(10)
                    )
                )
            .setOffsetCommitPeriodMs(10000)
            .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
            .setMaxUncommittedOffsets(250)
            .build();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("KafkaSpout", new KafkaSpout<String, String>(conf), 1);
        builder.setBolt("Recieve", new RecieveBolt(), 1).globalGrouping("KafkaSpout");
        builder.setBolt("Consume", new ConsumeBolt(), 1).globalGrouping("Recieve");
        builder.createTopology();

        // 集群运行
        // Config config = new Config();
        // config.setNumWorkers(3);
        // config.setDebug(true);
        // StormSubmitter.submitTopology("teststorm", config, builder.createTopology());

        // 本地测试
        // Config config = new Config();
        // config.setNumWorkers(3);
        // config.setDebug(true);
        // config.setMaxTaskParallelism(20);
        // LocalCluster cluster = new LocalCluster();
        // cluster.submitTopology("teststorm", config, builder.createTopology());
        // Utils.sleep(60000);
        // // 执行完毕,关闭cluster
        // cluster.shutdown();
    }
}

class MyRecordTranslator implements Func<ConsumerRecord<String, String>, List<Object>> {

    private static final long serialVersionUID = 1L;

    @Override
    public List<Object> apply(ConsumerRecord<String, String> record) {
        return new Values(record.value());
    }

}

ConsumeBolt

package test;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ConsumeBolt extends BaseRichBolt {

    private static final long serialVersionUID = -7114915627898482737L;

    private FileWriter fileWriter = null;

    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;

        try {
            fileWriter = new FileWriter("/usr/local/tmpdata/" + UUID.randomUUID());
            // fileWriter = new FileWriter("C:\\Users\\26401\\Desktop\\test\\" + UUID.randomUUID());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void execute(Tuple tuple) {

        try {
            String word = tuple.getStringByField("word") + "......." + "\n";
            fileWriter.write(word);
            fileWriter.flush();
            System.out.println(word);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

RecieveBolt

package test;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class RecieveBolt extends BaseRichBolt {

    private static final long serialVersionUID = -4758047349803579486L;

    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        // 将spout传递过来的tuple值进行转换
        this.collector.emit(new Values(tuple.getStringByField("word") + "!!!"));
    }

    // 声明发送消息的字段名
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

原文地址:https://www.cnblogs.com/ye-hcj/p/10264092.html

时间: 2024-07-30 09:09:20

storm和kafka整合的相关文章

Storm集成Kafka应用的开发

我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用kafka作为消息队列是非常合适的选择,kafka可以将不均匀的数据转换成均匀的消息流,从而和storm比较完善的结合,这样才可以实现稳定的流式计算,那么我们接下来开发一个简单的案例来实现storm和kafka的结合 s

storm集成kafka

kafkautil: import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; public class KafkaUtil { @Value("#{sys['connect']}") private static

storm消费kafka实现实时计算

大致架构 * 每个应用实例部署一个日志agent * agent实时将日志发送到kafka * storm实时计算日志 * storm计算结果保存到hbase storm消费kafka 创建实时计算项目并引入storm和kafka相关的依赖 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.

Spark Streaming和Kafka整合开发指南(一)

Apache Kafka是一个分布式的消息发布-订阅系统.可以说,任何实时大数据处理工具缺少与Kafka整合都是不完整的.本文将介绍如何使用Spark Streaming从Kafka中接收数据,这里将会介绍两种方法:(1).使用Receivers和Kafka高层次的API:(2).使用Direct API,这是使用低层次的KafkaAPI,并没有使用到Receivers,是Spark 1.3.0中开始引入的.这两种方法有不同的编程模型,性能特点和语义担保.下文将会一一介绍. 基于Receiver

storm集成kafka的应用,从kafka读取,写入kafka

storm集成kafka的应用,从kafka读取,写入kafka by 小闪电 0前言 storm的主要作用是进行流式的实时计算,对于一直产生的数据流处理是非常迅速的,然而大部分数据并不是均匀的数据流,而是时而多时而少.对于这种情况下进行批处理是不合适的,因此引入了kafka作为消息队列,与storm完美配合,这样可以实现稳定的流式计算.下面是一个简单的示例实现从kafka读取数据,并写入到kafka,以此来掌握storm与kafka之间的交互. 1程序框图 实质上就是storm的kafkasp

【转】Spark Streaming和Kafka整合开发指南

基于Receivers的方法 这个方法使用了Receivers来接收数据.Receivers的实现使用到Kafka高层次的消费者API.对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据. 然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志,这是在Spark 1.2.0才引入的功能,这使得我们可以将接收到的数据保存到WA

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

SpringBoot Kafka 整合实例教程

1.使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer. 工程POM文件代码如下: 1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instanc

storm 与mysql整合问题

首先说明下问题的情况, 1.我storm 环境已经搭建完成,在本地测试wordcount是没问题的, 2.我在wordcount中加入一个MysqlBolt,此Bolt只是简单的把 wordcount的结果存入mysql数据库中,在本地模式测试测试时,完全可以把结果插入指定表. 3.我的每个storm 节点都已经把mysql-connector-java-5.1.23.jar 放到storm的lib目录下. 4.每个节点均可以访问指定数据库,都已经开通相应权限 5.并且在远程模式下执行原始wor