监听kafka消息

1、main方法中(1.0以上)

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Kafka消息消费者
 * 〈功能详细描述〉
 *
 * @author 17090889
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
public class ConsumerSample {
    public static void main(String[] args) {
        String topic = "test-topic";
        Properties props = new Properties();
        // Kafka集群,多台服务器地址之间用逗号隔开
        props.put("bootstrap.servers", "localhost:9092");
        // 消费组ID
        props.put("group.id", "test_group1");
        // Consumer的offset是否自动提交
        props.put("enable.auto.commit", "true");
        // 自动提交offset到zk的时间间隔,时间单位是毫秒
        props.put("auto.commit.interval.ms", "1000");
        // 消息的反序列化类型
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 订阅的话题
        consumer.subscribe(Arrays.asList(topic));
        // Consumer调用poll方法来轮询Kafka集群的消息,一直等到Kafka集群中没有消息或者达到超时时间100ms为止
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println(record.partition() + record.offset());
                System.out.println(record.key());
                System.out.println(record.value());
            }
        }
    }
}

2、Spring下kafka1.0以上版本(不依赖Spring-Kafka)

3、Spring下kafka 0.8版本

  1)kafka消费者抽象工厂类

/**
 * kafka消费者抽象工厂类
 * 〈功能详细描述〉
 *
 * @author
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public abstract class BaseKafkaConsumerFactory implements InitializingBean, DisposableBean {

    private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerFactory.class);

    /**
     * 消费的Topic与消费线程数组成的Map
     */
    private Map<String, Integer> topicThreadMap;
    /**
     * Consumer实例所需的配置
     */
    private Properties properties;

    /**
     * 线程池
     */
    private ThreadPoolExecutor taskExecutor;

    private ConsumerConnector consumerConnector;

    /**
     * zkConnect
     */
    private String zkConnect;

    @Value("${kafka.groupId}")
    private String groupId;

    /**
     * sessionTimeOut
     */
    @Value("${kafka.sessionTimeOut}")
    private String sessionTimeOut;

    /**
     * syncTime
     */
    @Value("${kafka.syncTime}")
    private String syncTime;

    /**
     * commitInterval
     */
    @Value("${kafka.commitInterval}")
    private String commitInterval;

    /**
     * offsetReset
     */
    @Value("${kafka.offsetReset}")
    private String offsetReset;

    @Override
    public void afterPropertiesSet() {
        logger.info("afterPropertiesSet-start");
        // 初始化properties
        if(properties==null){
            properties = new Properties();
            properties.put("zookeeper.connect", zkConnect);
            logger.info("zkConnect={}", zkConnect);
            // group 代表一个消费组
            properties.put("group.id", groupId);
            logger.info("groupId={}", groupId);
            // zk连接超时
            properties.put("zookeeper.session.timeout.ms", sessionTimeOut);
            properties.put("zookeeper.sync.time.ms", syncTime);
            properties.put("auto.commit.interval.ms", commitInterval);
            properties.put("auto.offset.reset", offsetReset);
            // 序列化类
            properties.put("serializer.class", "kafka.serializer.StringEncoder");

            properties.put("rebalance.max.retries", "10");
            // 当rebalance发生时,两个相邻retry操作之间需要间隔的时间。
            properties.put("rebalance.backoff.ms", "3100");
        }

        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(topicThreadMap);
        // 实际有多少个stream,就设置多少个线程处理
//        int messageProcessThreadNum = 0;
//        for (List<KafkaStream<byte[], byte[]>> streamList : topicMessageStreams.values()) {
//            messageProcessThreadNum = messageProcessThreadNum + streamList.size();
//        }
        // 创建实际处理消息的线程池
        taskExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000));
        for (List<KafkaStream<byte[], byte[]>> streams : topicMessageStreams.values()) {
            for (final KafkaStream<byte[], byte[]> stream : streams) {
                taskExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        ConsumerIterator<byte[], byte[]> it = stream.iterator();
                        while (it.hasNext()) {
                            MessageAndMetadata<byte[], byte[]> data = it.next();
                            try {
                                String kafkaMsg = new String(data.message(),"UTF-8");
                                logger.info("来自topic:{}的消息:{}", topicThreadMap.keySet(), kafkaMsg);
                                // 消息处理
                                onMessage(data);
                            } catch (RuntimeException e) {
                                logger.error("处理消息异常.", e);
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }

                        }
                    }

                });
            }
        }

    }

    /**
     * 消息处理类
     * @param data
     */
    protected abstract void onMessage(MessageAndMetadata<byte[], byte[]> data);

    @Override
    public void destroy() throws Exception {
        try {
            if (consumerConnector != null) {
                consumerConnector.shutdown();
            }
        } catch (Exception e) {
            logger.warn("shutdown consumer failed", e);
        }
        try {
            if (taskExecutor != null) {
                taskExecutor.shutdown();
            }
        } catch (Exception e) {
            logger.warn("shutdown messageProcessExecutor failed", e);
        }
        logger.info("shutdown consumer successfully");
    }

    public Properties getProperties() {
        return properties;
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
    }

    public Map<String, Integer> getTopicThreadMap() {
        return topicThreadMap;
    }

    public void setTopicThreadMap(Map<String, Integer> topicThreadMap) {
        this.topicThreadMap = topicThreadMap;
    }

    public String getZkConnect() {
        return zkConnect;
    }

    public void setZkConnect(String zkConnect) {
        this.zkConnect = zkConnect;
    }
}

  2)具体的kafka消费者实现类

import com.xxx.sfmms.common.util.JsonConvertUtil;
import com.xxx.sfmms.common.util.RedisUtil;
import com.xxx.sfmms.common.util.StringUtil;
import com.xxx.sfmms.service.intf.RecommendService;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

/**
 * 实名kafka消费者
 * 〈功能详细描述〉
 *
 * @author 17090889
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
public class RealNameKafkaConsumer extends BaseKafkaConsumerFactory {

    private final Logger LOGGER = LoggerFactory.getLogger(RealNameKafkaConsumer.class);

    private static final String STR_INVOKENO = "invokeNo";

    @Autowired
    private RecommendService recommendService;

    /**
     * 消息处理
     * @param data
     */
    @Override
    protected void onMessage(MessageAndMetadata<byte[], byte[]> data) {
        MDC.put(STR_INVOKENO, StringUtil.getUuid());
        String msg="";
        try {
            msg=new String(data.message(),"UTF-8");
            LOGGER.info("RealNameKafkaConsumer-data={},topic={}",msg,data.topic());
        } catch (UnsupportedEncodingException e) {
            LOGGER.info("字节数组转字符串异常");
            e.printStackTrace();
        }
        // 实名的事后kafka数据
        Map<String, String> map = JsonConvertUtil.json2pojo(msg, Map.class);
        LOGGER.info("RealNameKafkaConsumer-map={}", map);
        String userNo = map.get("eppAccountNO");
        LOGGER.info("RealNameKafkaConsumer-userNo={}", userNo);
        String flag = RedisUtil.getString("PULLNEW:RACCOUNTNO_" + userNo, "MEIS");
        // 不是渠道6被邀请用户
        if(!"1".equals(flag)){
            LOGGER.info("不是渠道6拉新用户");
            return;
        }
        // 20-初级认证 30-高级实名认证   40- 实名申诉降级、50-高级到期降级 60-实名撤销(人工手动降级) 70-申诉找回身份降级
        String authenStatus=map.get("authenStatus");
        // 真实姓名
        String realName=map.get("realName");
        // 身份证号码
        String idNo = map.get("idNO");
        // apptoken
        String appToken=map.get("appToken");
        // 校验任务
        Map<String, String> paramMap = new HashMap<String, String>(4);
        paramMap.put("userNo", userNo);
        paramMap.put("authenStatus",authenStatus);
        paramMap.put("realName",realName);
        paramMap.put("idNo", idNo);
        paramMap.put("appToken",appToken);
        Map<String,String> resultMap=recommendService.checkRulesAndRiskSendMoney(paramMap);
        LOGGER.info("resultMap={}", resultMap);
        MDC.remove(STR_INVOKENO);
    }
}

  3)实现类的bean注入配置

<bean id="realNameKafkaConsumer" class="com.xxx.sfmms.service.RealNameKafkaConsumer">
   <property name="topicThreadMap">
      <map>
         <entry key="${realTopic}" value="5"/>
      </map>
   </property>
   <property name="zkConnect">
      <value>${realZkConnect}</value>
   </property>
</bean>

<bean id="preCreditKafkaConsumer" class="com.xxx.sfmms.service.PreCreditKafkaConsumer">
   <property name="topicThreadMap">
      <map>
         <entry key="${rxdTopic}" value="5"/>
      </map>
   </property>
   <property name="zkConnect">
      <value>${rxdZkConnect}</value>
   </property>
</bean>

  4)kafka consumer参数配置

#kafka监听配置
#实zk
realZkConnect=xxx
#topic
realTopic=xxx
#任zk
rxdZkConnect=xxx
#任性贷topic
rxdTopic=xxx
kafka.sessionTimeOut=6000
kafka.syncTime=2000
kafka.commitInterval=30000
kafka.offsetReset=smallest
kafka.groupId=xxx

  5)依赖包配置

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.9.2</artifactId>
   <version>0.8.1.1</version>
   <exclusions>
      <exclusion>
         <artifactId>jmxtools</artifactId>
         <groupId>com.sun.jdmk</groupId>
      </exclusion>
      <exclusion>
         <artifactId>jmxri</artifactId>
         <groupId>com.sun.jmx</groupId>
      </exclusion>
   </exclusions>
</dependency>

END

原文地址:https://www.cnblogs.com/yangyongjie/p/12520348.html

时间: 2024-08-29 07:51:07

监听kafka消息的相关文章

Rabbitmq无法监听后续消息

现象: 消息队列在处理完一条消息后,无法继续监听后续消息. 首先,系统启动时要启动接收方法如下: 1 protected void Application_Start() 2 { 3 RouteTable.Routes.MapHubs(); 4 AreaRegistration.RegisterAllAreas(); 5 6 RegisterGlobalFilters(GlobalFilters.Filters); 7 RegisterRoutes(RouteTable.Routes); 8 R

微信小程序监听WebSocket消息事件wx.onSocketMessage(CALLBACK)

微信小程序WebSocket消息wx.onSocketMessage(CALLBACK) wx.onSocketMessage(CALLBACK) ? 监听WebSocket接受到服务器的消息事件 CALLBACK返回参数: 参数 类型 说明 data String 服务器返回的消息 示例代码: wx.connectSocket({ url:"qkxue.net" }); wx.onSocketMessage(function(res){ console.log("收到服务器

mqtt实现自动监听服务器消息

本示例借助meteor的一个环境跑,和我们平时用的node自己搭的环境或java,php的环境本质一样,在此不多讨论. 首先需求是:多系统对接进行消息实时传递. 安装好mqtt:  npm install mqtt --save 本地服务(可以直接配在java中):这里采用mosca 安装好mosca: npm install mosca --save var mosca=reqire('mosca'); var mqttServer=new mosca.Server({port:8000});

消费滚动滴log日志文件(flume监听,kafka消费,zookeeper协同)

第一步:数据源 手写程序实现自动生成如下格式的日志文件: 15837312345,13737312345,2017-01-09 08:09:10,0360 打包放到服务器,使用如下命令执行,模拟持续不断的日志文件: java -cp ct_producter-1.0-SNAPSHOT.jar producter.ProductLog ./awen.tsv 第二步:监听log.tsv日志 使用Flume监控滚动的awen.tsv日志,编写flume # Name the components on

Spring整合ActiveMQ及多个Queue消息监听的配置

消息队列(MQ)越来越火,在java开发的项目也属于比较常见的技术,MQ的相关使用也成java开发人员必备的技能.笔者公司采用的MQ是ActiveMQ,且消息都是用的点对点的模式.本文记录了实现Spring整合ActivateMQ的全过程及如何使用MQ,便于后续查阅. 一.项目的搭建 采用maven构建项目,免去了copy jar包的麻烦.因此,我们创建了一个java类型的Maven Project (1)项目结构图 先把项目结构图看一下,便于对项目的理解. (2)pom.xml 我们需要加入以

Yii2.0 安装yii2-queue并在Linux启动守护进程监听消息

一.什么是yii2-queue? Yii2-queue是Yii2.0 PHP框架下一个消息队列拓展插件,支持基于DB, Redis, RabbitMQ, AMQP, Beanstalk 和 Gearman等.yii2-queue GitHub地址:https://github.com/yiisoft/yii2-queue 二.如何安装yii2-queue? php composer.phar require --prefer-dist yiisoft/yii2-queue 三.Linux sys

C#全局键盘监听(Hook)的使用

一.为什么需要全局键盘监听? 在某些情况下应用程序需要实现快捷键执行特定功能,例如大家熟知的QQ截图功能Ctrl+Alt+A快捷键,只要QQ程序在运行(无论是拥有焦点还是处于后台运行状态),都可以按下快捷键使用此功能... 这个时候在程序中添加键盘监听肯定不能满足需求了,当用户焦点不在App上时(如最小化,或者用户在处理其它事物等等)键盘监听就失效了 二.怎样才能实现全局键盘监听? 这里需要用到Windows API,源码如下:(可以作为一个工具类[KeyboardHook.cs]收藏起来) u

linux epoll机制对TCP 客户端和服务端的监听C代码通用框架实现

1 TCP简介 tcp是一种基于流的应用层协议,其"可靠的数据传输"实现的原理就是,"拥塞控制"的滑动窗口机制,该机制包含的算法主要有"慢启动","拥塞避免","快速重传". 2 TCP socket建立和epoll监听实现 数据结构设计 linux环境下,应用层TCP消息体定义如下: typedef struct TcpMsg_s { TcpMsgHeader head; void* msg; }TcpM

【转】【C#】全局键盘监听

using System; using System.Collections.Generic; using System.Text; using System.Runtime.InteropServices; using System.Windows.Forms; using System.Reflection; namespace 梦琪动漫屋 { /// <summary> /// 键盘钩子/// </summary> class KeyboardHook { public ev