Kafka 生产者 自定义序列化

Kafka在生产者中序列化为二进制对象推送给Broker,下面是一个自定义序列化的示例,序列化一个User对象;

首先,引入jackson-mapper-asl

<dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-mapper-asl</artifactId>
    <version>1.9.12</version>
</dependency>

然后定义需要被序列化的实体类:

package cn.org.fubin;

public class User {
    private String firstName;
    private String lastName;
    private int age;
    private String address;

    public User() {
    }

    public User(String firstName, String lastName, int age, String address) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.age = age;
        this.address = address;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "User{" +
                "firstName=‘" + firstName + ‘\‘‘ +
                ", lastName=‘" + lastName + ‘\‘‘ +
                ", age=" + age +
                ", address=‘" + address + ‘\‘‘ +
                ‘}‘;
    }
}

接下来,创建序列化类,实现Kafka客户端提供的Serializer接口:

import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.Map;

public class UserSerializer implements Serializer {

    private ObjectMapper objectMapper;

    public void configure(Map configs, boolean isKey) {
        objectMapper = new ObjectMapper();
    }

    public byte[] serialize(String topic, Object data) {
        byte[] ret = null;
        try {
            ret = objectMapper.writeValueAsString(data).getBytes("utf-8");
        } catch (IOException e) {
            System.out.println("序列化失败");
            e.printStackTrace();
        }
        return ret;
    }

    public void close() {

    }
}

Kafka默认提供如下实现:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.RetriableException;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 *
 * 可重试异常
 * 1. 分区副本不可用
 * 2. Controller当前不可用
 * 3. 网络瞬时故障
 *
 * 可自行恢复,超过重试次数也需要自行处理
 *
 *
 * 不可重试异常
 * 1. 发送消息尺寸过大
 * 2. 序列化失败异常
 * 3. 其他类型异常
 *
 *
 */

public class KafkaProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("value.serializer", "cn.org.fubin.UserSerializer");

        properties.put("acks", "-1");
        System.out.println(ProducerConfig.ACKS_CONFIG);
        properties.put("retries", "3");
        properties.put("batch.size", 1048576);
        properties.put("linger.ms", 10);
        properties.put("buffer.memory", "33554432");
        System.out.println(ProducerConfig.BUFFER_MEMORY_CONFIG);
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
        properties.put("max.block.ms", "3000");

        String topic = "test-topic";
        Producer<String,User> producer = new KafkaProducer<String, User>(properties);

        User user = new User("a","b",23,"china");
        ProducerRecord<String ,User> record = new ProducerRecord<String, User>(topic,user);
        producer.send(record).get();
        producer.close();

    }

}

然后在主类中指定声明好的序列化类,并发送一个User实体:

原文地址:https://www.cnblogs.com/fubinhnust/p/11967891.html

时间: 2024-10-05 04:55:08

Kafka 生产者 自定义序列化的相关文章

java实现Kafka生产者示例

使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.lisg.kafkatest; import

kafka producer serializer序列化(六)

生产者需要将要发送的数据转换成字节数组才能通过网络发送给kafka,对于一些简单的数据,kafka自带了一些序列化工具, 如:StringSerializer Double Long Integer Byte,它们都实现了  Serializer 接口,但是如果你要发送的数据是一个对象 Persion,那么就需要自定义序列化才能将数据发送给kafka 原文地址:https://www.cnblogs.com/MrRightZhao/p/11345777.html

Kafka 系列(三)—— Kafka 生产者详解

一.生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区.在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输. 接下来,数据被传给分区器.如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情.如果没有指定分区 ,那么分区器

kafka 生产者

KafkaProducer 创建一个 KafkaThread 来运行 Sender.run 方法. 1. 发送消息的入口在 KafkaProducer#doSend 中,但其实是把消息加入到 batches 中: kafka 生产者是按 batch 发送消息,RecordAccumulator 类有个变量 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches,KafkaProducer#doSend 方法会把当前的这

Kafka 生产者、消费者与分区的关系

kafka 生产者.消费者与分区的关系 背景 最近和海康整数据对接, 需要将海康产生的结构化数据拿过来做二次识别. 基本的流程: 海康大数据 --> kafka server --> 平台 Kafka 的 topic 正常过车 topic: BAYONET_VEHICLEPASS 违法过车 topic: BAYONET_VEHICLEALARM 前言 首先我们需要对kafka中的一些名词有一定的了解, 有过一些使用经验, 一般来说, 生产者发送消息到主题, 而消费者从主题消费数据 ( 我初次接

Kafka生产者——结合spring开发

目录 Kafka生产者端 可靠性保证: spring-kafka生产端 Kafka生产者端 可靠性保证: producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据: 1.不丢失 2.不重复 producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性. acks 参数配置 为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledg

自定义序列化过程

除了使用事件以外,还可以通过自定义序列化来完成这一过程. 自定义序列化只需要实现ISerializable接口就可以了.它位于using System.Runtime.Serialization; SerializationInfo有点儿类似于BinaryWriter和BinaryReader,用于写入和读取对象的属性值.它主要包括一组AddValue()方法和一组Get<Type>()方法,还有其他一些属性用于获得程序集和类型的信息.StreamingContext则极少使用. 得到和前面类

MSMQ实现自定义序列化存储

http://www.cnblogs.com/smark/archive/2013/05/31/3110208.html 在使用MSMQ的时候一般只会使用默认的XML序列化来对消息进行存储,但XML存储的缺点是序列化体积相对比较大和效率上有点低.其实.net提供非常简单的方式让我们实现不同序列化方式来存储MSMQ信息,如json,protobuf等.为了能够让开发人员实现自定义序列化的消息存储,.NET提供了IMessageFormatter这样一个接口,只需要简单地实现这个接口就可以对MSMQ

Java Serializable接口(序列化)理解及自定义序列化

  1 Serializable接口 (1)简单地说,就是可以将一个对象(标志对象的类型)及其状态转换为字节码,保存起来(可以保存在数据库,内存,文件等),然后可以在适当的时候再将其状态恢复(也就是反序列化).serialization 不但可以在本机做,而且可以经由网络操作.它自动屏蔽了操作系统的差异,字节顺序等.比如,在 Windows 平台生成一个对象并序列化之,然后通过网络传到一台 Unix 机器上,然后可以在这台Unix机器上正确地重构(deserialization)这个对象. 不必