创建kafka生产者
要往kafka写入消息,首先要创建一个生产者对象,并设置一些熟悉。kafka生产者有3个必选的属性。
1.bootstrap.servers 该属性指定broker的地址清单,地址的格式为host:port 一般建议至少要提供两个broker的信息,一旦其中一个宏机,生产者仍然能够连接到集群中。
2.key.serialier broker希望收到的消息的键和值都是字节数组,生产者接口允许使用参数化类型,因此可以把java对象作为键和值发送给broker。
3.value.serializer 与key.serializer一样,value.serializer指定的类将值序列化。
构建kafkaProduce对象步骤
1.构建一个properties对象
2.将上诉3个参数按照格式put到properties中。
3.new kafkaProduce对象,将properties放入kafkaproduce构造函数
kafka生产者发送的3种方式
1.发送并忘记(fire-and-forget),我们把消息发送给服务器,但不关心它是否正常到达,大多数情况会正常到达,因为kafka是高可用的,而且生产者会自动尝试重发。不过这种方式会存在丢失一些消息。
2.同步发送,我们使用send()方法发送消息,它会返回一个futured对象,调用get()方法进行等待,就可以知道消息时候发送成功。
2.异步发送,我们调用send(),并指定一个回调函数,服务器在返回响应时调用该函数。
发送消息到kafka过程
最简单的一个代码示例
ProducerRecord<String,String> record = new ProducerRecord<>("test","test","test")
try{
producer.send(record);
}catch(Exception e){
e.printStackTrace()
}
1.生产者的send()方法将producerRecord对象作为参数,所以我们先创建一个producerRecord对象。
2.执行send()方法先是放进缓存区,然后使用单独的线程发送到服务器端,send()会返回一个包含RecordMetadata的Future对象。
3.我们忽略发送至服务器的异常,但是在发送之前,生产者还是有可能发生其他异常。这些异常有可能是serializationException(说明序列化消息失败),bufferExhaustedException或TimeoutException(说明缓存区已经满了) ,又或者是InterrupException(说明发送线程被中断)
同步发送消息
1.首先producer.send()方法先返回一个future对象,然后调用future.get()方法来等待kafka响应。如果没有发生异常就会得到一个recordMetadata对象,可以用它来获取消息的偏移量。
2.如果在发送数据之前或者已经超过重发的次数,那么就会抛异常。
kafkaProducer一般会发送两类错误。其中一种就是重试错误,这类错误可以通过重发消息来解决,对于连接错误,可以通过再次连接来解决,无主(no leader) 错误则可以通过重新分区选举来解决
还有一类就是消息太大比kafka配置还大。
异步发送消息
1.为了使用回调,需要一个实现了producer.callback接口
2.如果kafka返回一个错误,onCompletion方法会抛出一个非空异常
生产者的配置
1.acks 参数指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响,如果acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢了。这样设置可以达到很高的吞吐量。
acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务成功响应。如果消息无法到达首领节点,生产者就会收到一个错误的响应,为了避免数据丢失,生产者会重发消息。
acks=all 只要到所以参与复制的节点全部收到消息时,生产者才会收到一个来自服务器成功的消息,这种模式是最安全的。
2.buffer.memory 设置生产者内存缓冲器的大小,生产者用它缓冲要发送到服务器的消息。如果空间不足一般send()阻塞或者抛异常。
3.compressionType 默认情况下,消息发送时不会被压缩。该参数可以设置为snappy,gzip,lz4 它指定消息发送给broker之前使用哪一种压缩算法进行压缩。
4.retries 决定生产者可以重发消息的次数,如果达到重发次数,生产放弃发送并返回错误。
5.batch.size 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定一个批次可以使用的内存大小,安装字节数计算而不是消息个数。
6.linger.ms 该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,批次会在linger.ms达到上限时把批次发送出去。
7.client.id 该参数是任意字符串,服务器会用它来识别消息的来源。
8.max.in.fight.requests.per.connection 该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,占用的内存越大。
9.timeout.ms request.timeout.ms和metadata.fetch.timeout.ms
request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间
metadata.fetch.timeout.ms 指定了生产在获取元数据时等待服务器返回响应的时间,如果超时了那么生产者要么重试发送数据,要么返回错误。
timeout.ms 指定了broker等待同步副本返回消息确认的时间,与acks的配置相匹配。如果指定时间没有收到副本的确认,那么broker就会返回一个错误。
10.max.request.ms 该参数指定了调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。
11.max.request.size 该参数用于控制生产者发送的请求大小。
12.receive.buffer.bytes和send.buffer.bytes
这两个参数分别指定了TCP socket接受和发送数据包的缓存区大小。
提一下: kafka可以保证一个分区的消息时有序的。
序列化器
创建一个生产者对象必须指定序列化器,kafka提供了整型和字符数组序列化器,不过它们还不足满足大部分需求。
自定义序列化器
一般实现serializer接口来实现序列化,但是这一般都用序列化框架,比如,Avro,thrift,protobuf
kafka使用apache avro序列化
apache avro 是一种与编程语言无关的序列化格式,arvo目的是提供一种共享数据文件的方式。
avro数据通过与语言无关的schema来定义。schema通过JSON来描述,数据被序列化成二进制文件或JSON文件,不过一般会只用二进制文件。
在kafka中使用Avro
生产者配置的两个配置参数key.serializer 和value.serializer 在properties中将这两个参数设置为key ,vaue值为io.conflunt.kafka.serializers.kafkaAvroSerializer,还要设置schema.registry.url参数,值为scheme的存储位置,其他和上述生产者对象一样。
分区
我们知道producerRecord对象包含了目标主题,键和值。kafka消息是一个个键值对,键可以设置为null。
键有两个作用
1.可以作为消息的附加消息
2.可以用来决定消息该被写到主题的那个分区。
如果键为null,并且使用默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询算法将消息均衡地分布到各个分区上。
实现自定义分区
这里实现Partition接口来自定义分区 这个接口包含configure partition和close方法。规则由自己去定。
原文地址:https://www.cnblogs.com/Seeasunnyday/p/9230584.html