Kafka学习-Producer和Customer

  在上一篇kafka入门的基础之上,本篇主要介绍Kafka的生产者和消费者。

Kafka 生产者  

  kafka Producer发布消息记录到Kakfa集群。生产者是线程安全的,在线程之间共享生产者实例。一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main方法里就能直接运行,

public class ProducerDemo {
    private static final String KAFKA_TOPIC="kafka-topic";
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9092");
        configs.put("acks", "all");
        configs.put("retries", 0);
        configs.put("batch.size", 16384);
        configs.put("linger.ms", 1);
        configs.put("buffer.memory", 33554432);
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer=new KafkaProducer<String, String>(configs);
        ProducerRecord<String, String> record=null;

        for (int i = 0; i <10; i++) {
            record=new ProducerRecord<String, String>(KAFKA_TOPIC, "record-"+i);
            Future<RecordMetadata> future=producer.send(record);
            try {
                RecordMetadata recordMetadata=future.get();
                System.out.format("PARTITION: %d OFFSET: %d\n", recordMetadata.partition(),recordMetadata.offset());
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
}

  • 生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。
  • 生产者的send()方法是异步的,send()方法添加消息到缓冲区等待发送,并立即返回,这样并行发送多条消息而不阻塞去等待每一条消息的响应。为了减少请求的数量,生产者将单个的消息批量在一起发送来提高效率。
  • acks是判断消息是否成功发送的条件,将acks指定了"all"将会阻塞消息,当所有的副本都返回后才表明该消息发送成功,这种设置性能最低,但是是最可靠的。
  • retries表示重试的次数,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
  • batch.size指定了缓冲区的大小,kafka的Producer会缓存每个分区未发送消息。
  • linger.ms指示生产者发送请求前等待一段时间,等待等多的消息来填满批中。默认缓冲可立即发送,即遍缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.time大于0。如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使linger.time=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
  • buffer.memory:控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。
  • key.serializer和value.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节。

Producer的Send()

  kafka的producer的send()方法提供多种重载:

  send是异步的,一旦消息被保存在等待发送的消息缓存中,此方法就立即返回,这样可以你并行发送多条消息而不阻塞去等待每一条消息的响应。发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间,如果topic使用的是LogAppendTime,时间戳是broker的本地时间。由于send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。如果要模拟一个简单的阻塞调用,你可以调用get()方法。

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = new
 ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();

  完全无阻塞的话,可以利用参数提供的回调函数处理请求完成时的回调通知。

record=new ProducerRecord<String, String>(KAFKA_TOPIC, "CallbackRecord-"+i);
            producer.send(record,new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.format("PARTITION: %d OFFSET: %d\n", metadata.partition(),metadata.offset());
                }
            });

  发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 之前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

  注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果需要执行阻塞或耗时的回调,建议在callback主体中使用自己的Executor来并行处理。

时间: 2024-10-09 23:03:26

Kafka学习-Producer和Customer的相关文章

Kafka的Producer和Consumer源码学习

先解释下两个概念: high watermark (HW) 它表示已经被commited的最后一个message offset(所谓commited, 应该是ISR中所有replica都已写入),HW以下的消息都已被ISR中各个replica同步,从而保持一致.HW以上的消息可能是脏数据:部分replica写成功,但最终失败了. Kafka Partition:  1> 均衡各个Broker之间的数据和请求压力: 2> 分摊处理不同的消费者进程: 3> 在partition内可以保证局部

kafka学习之路(二)——提高

kafka学习之路(二)--提高 消息发送流程 因为Kafka内在就是分布式的,一个Kafka集群通常包括多个代理.为了均衡负载,将话题分成多个分区,每个代理存储一或多个分区.多个生产者和消费者能够同时生产和获取消息.     过程: 1.Producer根据指定的partition方法(round-robin.hash等),将消息发布到指定topic的partition里面 2.kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否

kafka学习笔记:知识点整理

一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕. 3.扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可. 4.

1、Kafka学习分享-V1.0

Kafka学习分享 .1       什么是Kafka Apache Kafka是一个开源的流处理平台,由 Apache Software Foundation使用Scala and Java编写发展而来.Kafka?用于构建实时数据管道和流媒体应用. 它具有水平可扩展性,容错性,快速性,并在数千家公司生产中运行. 它的主要功能:数据流的发布和订阅.数据流的处理.数据流的存储.像一个消息系统一样发布和订阅数据流,有效且实时地处理数据流,在一个分布式备份的集群中安全地处理存储数据流. .2    

[Big Data - Kafka] kafka学习笔记:知识点整理

一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕. 3.扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可. 4.

kafka java producer consumer实践

java提供了方便的API进行kafka消息处理.简单总结一下: 学习参考:http://www.itnose.net/st/6095038.html POM配置(关于LOG4J的配置参看 http://www.cnblogs.com/huayu0815/p/5341712.html) <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka

Kafka学习-简介

Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark都支持与Kafka集成. Kafka创建背景 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础.现在它已被作为多种类型的数据管道和消息系统使用.活动流数据是几乎所有站点在对其网站使用情

3、Kafka学习分享|快速入门-V3.0

Kafka学习分享|快速入门 这个教程假定你刚开始是新鲜的,没有现存的Kafka或者Zookeeper 数据.由于Kafka控制控制脚本在Unix和Windows平台不同,在Windows平台使用bin\windows\ 代替 bin/,并且更改脚本扩展名为.bat. 第一步:下载编码 下载0.10.2.0版本并且解压它. 第二步:启动服务器 Kafka使用Zookeeper,因此如果你没有Zookeeper server,你需要先启动a ZooKeeper server.你可以使用Kafka的

kafka 学习(二)

kafka 学习(二) 一.Kafka的架构 如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU.Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群.Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance.Producer