Kafka发布订阅消息

Maven

	<dependency>
    		<groupId>org.apache.kafka</groupId>
    		<artifactId>kafka-clients</artifactId>
    		<version>0.11.0.0</version>
    	</dependency>
   	    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.11.0.0</version>
    	</dependency>

生产者Producer

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

    private final KafkaProducer<String, String> producer;

    public final static String TOPIC = "test5";

    private ProducerDemo() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "xxx:9092,1xxx:9092,xxx:9092");//xxx服务器ip
        props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed"
        props.put("retries", 0);//retries = MAX 无限重试,直到你意识到出现了问题:)
        props.put("batch.size", 16384);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
        //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
        props.put("linger.ms", 1);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
        props.put("buffer.memory", 33554432);//producer可以用来缓存数据的内存大小。
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer",
              "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String, String>(props);
    }

    public void produce() {
        int messageNo = 1;
        final int COUNT = 5;

        while(messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = String.format("hello KafkaProducer message %s from hubo 06291018 ", key);

            try {
                producer.send(new ProducerRecord<String, String>(TOPIC, data));
            } catch (Exception e) {
                e.printStackTrace();
            }

            messageNo++;
        }

        producer.close();
    }

    public static void main(String[] args) {
        new ProducerDemo().produce();
    }
}

消费者Consumer

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class UserKafkaConsumer extends Thread {

        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");//xxx是服务器集群的ip
            properties.put("group.id", "jd-group");
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("auto.offset.reset", "latest");
            properties.put("session.timeout.ms", "30000");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.subscribe(Arrays.asList("test5"));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("-----------------");
                    System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                    System.out.println();
                }
            }

        }
}

原文地址:https://www.cnblogs.com/kaleidoscope/p/9768747.html

时间: 2024-08-01 09:45:28

Kafka发布订阅消息的相关文章

Kafka是分布式发布-订阅消息系统

https://www.biaodianfu.com/kafka.html Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据. 在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转.传统的企业消息系统并不是非常适合大规模的数据处理.为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志

分布式发布订阅消息系统 Kafka 架构设计[转]

分布式发布订阅消息系统 Kafka 架构设计 转自:http://www.oschina.net/translate/kafka-design 我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部

Kafka logo分布式发布订阅消息系统 Kafka

kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息. 支持通过kafka服务器和消费机集群来分区消息. 支持Hadoop并行数据加载. 卡夫卡的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素.

高吞吐量的分布式发布订阅消息系统Kafka--安装及测试

一.Kafka概述 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素. 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决. 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案.Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费. 二.Kafka相关

JMS发布/订阅消息传送例子

阅读目录 前言 在Tomcat中配置JNDI 在Web工厂中编写代码 参考资料 前言 基于上篇文章"基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送"很容易就可以编写一个发布/订阅消息传送例子,相关环境准备与该篇文章基本类似,主要的区别如下. 在Tomcat中配置JNDI 配置连接工厂和话题 <Resource name="topic/connectionFactory" auth="Container" ty

ActiveMQ发布-订阅消息模式

一.订阅杂志我们很多人都订过杂志,其过程很简单.只要告诉邮局我们所要订的杂志名.投递的地址,付了钱就OK.出版社定期会将出版的杂志交给邮局,邮局会根据订阅的列表,将杂志送达消费者手中.这样我们就可以看到每一期精彩的杂志了. 仔细思考一下订杂志的过程,我们会发现这样几个特点:1.消费者订杂志不需要直接找出版社:2.出版社只需要把杂志交给邮局:3.邮局将杂志送达消费者.邮局在整个过程中扮演了非常重要的中转作用,在出版社和消费者相互不需要知道对方的情况下,邮局完成了杂志的投递. 二. 发布-订阅消息模

发布/订阅消息传送模型

1.发布/订阅模型概览 发布/订阅(publish-and-subscribe)模型通常被简写为pub/sub模型.在这个模型中,消息生产者成为发布者(publisher),而消息消费者则称为订阅者(subscribe).在点对点模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题.发布/订阅模型最重要的特性如下: 消息通过一个称为主题的虚拟通道进行交换. 每条消息都会传送给称为订阅者的多个消息消费者.订阅者有许多类型,包括持久性.非持久性和动态性. 发布者通常不会知道.也

开源的.NET发布-订阅消息服务器Laharsub

Laharsub是一种开源的.NET发布-订阅消息服务器,用于实时的web应用程序,像聊天.在线写作.新闻或者股票交易更新等等. Laharsub是一种构建在三层架构之上的发布-订阅消息服务器: 前端--客户端,中间层--web服务,后端--带有发布-订阅功能和存储能力的系统. 客户端一般是浏览器,但是可以是所有已知能够做出HTTP请求的程序. 中间层是一种WCF的HTTP服务,它会从客户端接收消息,并向其发送消息,而后端会包含真正的与消息相关的逻辑. 客户端可以创建主题,并通过RESTful

分布式发布订阅消息系统Kafka架构设计

我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部分.活动数据包括页面访问量(page view).被查看内容方面的信息以及搜索情况等内容.这种数据通常的处理方式是先把各种活动以日志的形式写入某种