kafka多线程消费

建立kafka消费类ConsumerRunnable ,实现Runnable接口:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.*;

/**
 * @Auther: lyl
 * @Date: 2019/9/12 16:28
 * @Description:
 */
@Slf4j
public class ConsumerRunnable implements Runnable {

    // 每个线程维护私有的KafkaConsumer实例
    private final KafkaConsumer<String, String> consumer;

    public ConsumerRunnable(String brokerList, String groupId, String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        try {
            while (true) {
                try {
                    ConsumerRecords<String, String> records = consumer.poll(100);   // 本例使用100ms作为获取超时时间
                    for (ConsumerRecord<String, String> record : records) {
                        // 这里面写处理消息的逻辑
                        String value = record.value();
                        if (value.startsWith("obj_vehicle_pass")) {
//                            System.out.println(value);
                            value = value.substring(17, value.length());
                            JSONObject parse = JSONObject.parseObject(value);

                        }
                    }

                } catch (Exception e) {
                    log.error("kafka数据消费异常=============");
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            log.error("初始化kafka异常=============");
            e.printStackTrace();
        }
    }

}

  在编写一个类,用来初始化上面这个类,并通过线程启动

import java.util.ArrayList;
import java.util.List;

/**
 * @Auther: lyl
 * @Date: 2019/9/12 16:29
 * @Description:
 */
public class ConsumerGroup {
    private List<ConsumerRunnable> consumers;

    public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
        consumers = new ArrayList<>(consumerNum);
        for (int i = 0; i < consumerNum; ++i) {
            ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
            consumers.add(consumerThread);
        }
    }

    public void execute() {
        for (ConsumerRunnable task : consumers) {
            new Thread(task).start();
        }
    }

}

  最后项目启动时先初始化一下ConsumerGroup这个类,在调用一下execute()方法就能进行消费

原文地址:https://www.cnblogs.com/liaoyanglong/p/11730215.html

时间: 2024-11-04 09:35:15

kafka多线程消费的相关文章

Kafka重复消费和丢失数据研究

Kafka重复消费原因 底层根本原因:已经消费了数据,但是offset没提交. 原因1:强行kill线程,导致消费后的数据,offset没有提交. 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费.例如: try { consumer.unsubscribe(); } catch (Exception e) { } try { consumer.close(); }

Flume简介与使用(三)——Kafka Sink消费数据之Kafka安装

前面已经介绍了如何利用Thrift Source生产数据,今天介绍如何用Kafka Sink消费数据. 其实之前已经在Flume配置文件里设置了用Kafka Sink消费数据 agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.topic = TRAFFIC_LOG agent1.sinks.kafkaSink.brokerList = 10.208.129.3:90

Kafka 温故(五):Kafka的消费编程模型

Kafka的消费模型分为两种: 1.分区消费模型 2.分组消费模型 一.分区消费模型 二.分组消费模型 Producer : package cn.outofmemory.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * Hello wo

kafka查看消费数据

一.如何查看 在老版本中,使用kafka-run-class.sh 脚本进行查看.但是对于最新版本,kafka-run-class.sh 已经不能使用,必须使用另外一个脚本才行,它就是kafka-consumer-groups.sh 普通版 查看所有组 要想查询消费数据,必须要指定组.那么线上运行的kafka有哪些组呢?使用以下命令: bin/kafka-consumer-groups.sh --bootstrap-server kafka-1.default.svc.cluster.local

kafka Consumer分区数与多线程消费topic

单线程消费数据适合在本地跑. 参考文档: http://kafka.apache.org/documentation.html 对于一个topic,可以发送给若干个partitions. partition在创建topic的时候就指定分区的数目. 分区.Offset.消费线程.group.id的关系 1)一组(类)消息通常由某个topic来归类,我们可以把这组消息"分发"给若干个分区(partition),每个分区的消息各不相同: 2)每个分区都维护着他自己的偏移量(Offset),记

NET中解决KafKa多线程发送多主题的问题

一般在KafKa消费程序中消费可以设置多个主题,那在同一程序中需要向KafKa发送不同主题的消息,如异常需要发到异常主题,正常的发送到正常的主题,这时候就需要实例化多个主题,然后逐个发送. 在NET中用RdKafka组件来做消息处理,在Nuget中引用. 在程序中初始化Producer,并创建多个Topic private string comtopic = "topic1"; private string errtopic = "topic2"; private

关于kafka重新消费数据问题

我们在使用consumer消费数据时,有些情况下我们需要对已经消费过的数据进行重新消费,这里介绍kafka中两种重新消费数据的方法. 1. 修改offset 我们在使用consumer消费的时候,每个topic会产生一个偏移量,这个偏移量保证我们消费的消息顺序且不重复.Offest是在zookeeper中存储的,我们可以设置consumer实时或定时的注册offset到zookeeper中.我们修改这个offest到我们想重新消费的位置,就可以做到重新消费了.具体修改offest的方法这里就不详

storm-kafka(storm spout作为kafka的消费端)

storm是grovvy写的 kafka是scala写的 storm-kafka  storm连接kafka consumer的插件 下载地址: https://github.com/wurstmeister/storm-kafka-0.8-plus 除了需要storm和kafka相关jar包还需要google-collections-1.0.jar 以及zookeeper相关包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar 以前由co

spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)

application-test.properties 1 #kafka 2 kafka.consumer.zookeeper.connect=*:2181 3 kafka.consumer.servers=*:9092 4 kafka.consumer.enable.auto.commit=true 5 kafka.consumer.session.timeout=6000 6 kafka.consumer.auto.commit.interval=1000 7 #保证每个组一个消费者消费同一