参考《在Kafka中使用Avro编码消息:Consumer篇》、《在Kafka中使用Avro编码消息:Producter篇》
pom.xml
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.10</artifactId> <version>0.9.2</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.4</version> </dependency>
需要依赖于avro的包,同时这里是需要使用kafka api。
在使用 Avro 之前,我们需要先定义模式(schemas)。模式通常使用 JSON 来编写,我们不需要再定义相关的类,这篇文章中,我们将使用如下的模式:
{ "fields": [ { "name": "str1", "type": "string" }, { "name": "str2", "type": "string" }, { "name": "int1", "type": "int" } ], "name": "Iteblog", "type": "record" }
上面的模式中,我们定义了一种 record 类型的对象,名字为 Iteblog
,这个对象包含了两个字符串和一个 int 类型的fields。定义好模式之后,我们可以使用 avro 提供的相应方法来解析这个模式:
Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA);
这里的 USER_SCHEMA
变量存储的就是上面定义好的模式。
解析好模式定义的对象之后,我们需要将这个对象序列化成字节数组,或者将字节数组转换成对象。Avro 提供的 API 不太易于使用,所以本文使用 twitter 开源的 Bijection 库来方便地实现这些操作。我们先创建 Injection
对象来讲对象转换成字节数组:
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
现在我们可以根据之前定义好的模式来创建相关的 Record,并使用 recordInjection
来序列化这个 Record :
GenericData.Record record = new GenericData.Record(schema); avroRecord.put("str1", "My first string"); avroRecord.put("str2", "My second string"); avroRecord.put("int1", 42); byte[] bytes = recordInjection.apply(record);
Producter实现
有了上面的介绍之后,我们现在就可以在 Kafka 中使用 Avro 来序列化我们需要发送的消息了:
package example.avro; import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; public class AvroKafkaProducter { public static final String USER_SCHEMA = "{" + "\"type\":\"record\"," + "\"name\":\"Iteblog\"," + "\"fields\":[" + " { \"name\":\"str1\", \"type\":\"string\" }," + " { \"name\":\"str2\", \"type\":\"string\" }," + " { \"name\":\"int1\", \"type\":\"int\" }" + "]}"; public static final String TOPIC = "t-testavro"; public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); for (int i = 0; i < 1000; i++) { GenericData.Record avroRecord = new GenericData.Record(schema); avroRecord.put("str1", "Str 1-" + i); avroRecord.put("str2", "Str 2-" + i); avroRecord.put("int1", i); byte[] bytes = recordInjection.apply(avroRecord); ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "" + i, bytes); producer.send(record); System.out.println(">>>>>>>>>>>>>>>>>>" + i); } producer.close(); System.out.println("complete..."); } }
因为我们使用到 Avro 和 Bijection 类库,所有我们需要在 pom.xml
文件里面引入以下依赖:
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.10</artifactId> <version>0.9.2</version> </dependency>
从 Kafka 中读取 Avro 格式的消息
从 Kafka 中读取 Avro 格式的消息和读取其他类型的类型一样,都是创建相关的流,然后迭代:
ConsumerConnector consumer = ...; Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { .... }
关键在于如何将读出来的 Avro 类型字节数组转换成我们要的数据。这里还是使用到我们之前介绍的模式解释器:
Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
上面的 USER_SCHEMA
就是上边介绍的消息模式,我们创建了一个 recordInjection
对象,这个对象就可以利用刚刚解析好的模式将读出来的字节数组反序列化成我们写入的数据:
GenericRecord record = recordInjection.invert(message).get();
然后我们就可以通过下面方法获取写入的数据:
record.get("str1") record.get("str2") record.get("int1")
Kafka 0.9.x 版本Consumer实现
package example.avro; import java.util.Collections; import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; public class AvroKafkaConsumer { public static void main(String[] args) { Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer"); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092"); props.put("group.id", "testgroup"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList(AvroKafkaProducter.TOPIC)); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(AvroKafkaProducter.USER_SCHEMA); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); try { while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { GenericRecord genericRecord = recordInjection.invert(record.value()).get(); String info = String.format(String.format("topic = %s, partition = %s, offset = %d, customer = %s,country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), genericRecord.get("str1"))); logger.info(info); } } } finally { consumer.close(); } } }
测试:
producer:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5 >>>>>>>>>>>>>>>>>>0 >>>>>>>>>>>>>>>>>>1 >>>>>>>>>>>>>>>>>>2 >>>>>>>>>>>>>>>>>>3 >>>>>>>>>>>>>>>>>>4 >>>>>>>>>>>>>>>>>>5 >>>>>>>>>>>>>>>>>>6 >>>>>>>>>>>>>>>>>>7 >>>>>>>>>>>>>>>>>>8 >>>>>>>>>>>>>>>>>>9 >>>>>>>>>>>>>>>>>>10 ... >>>>>>>>>>>>>>>>>>997 >>>>>>>>>>>>>>>>>>998 >>>>>>>>>>>>>>>>>>999 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. complete...
consumer:
[main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4321, customer = 165,country = Str 1-165 [main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4322, customer = 166,country = Str 1-166 [main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4323, customer = 167,country = Str 1-167 [main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4324, customer = 168,country = Str 1-168 [main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4325, customer = 169,country = Str 1-169 [main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4326, customer = 170,country = Str 1-170 [main] INFO AvroKafkaConsumer - topic = t-testavro, partition = 0, offset = 4327, customer = 171,country = Str 1-171
原文地址:https://www.cnblogs.com/yy3b2007com/p/9286780.html