flink kafka consumer with avro schema. handling null

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

    private static final long serialVersionUID = 1L;

    private final Class<T> avroType;

    private transient DatumReader<T> reader;
    private transient BinaryDecoder decoder;

    public AvroDeserializationSchema(Class<T> avroType) {
        this.avroType = avroType;
    }

    public T deserialize(byte[] message) {
        ensureInitialized();
        try {
            decoder = DecoderFactory.get().binaryDecoder(message, decoder);
            T t = reader.read(null, decoder);
            return t;
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void ensureInitialized() {
        if (reader == null) {
            if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                reader = new SpecificDatumReader<T>(avroType);
 } else {
                reader = new ReflectDatumReader<T>(avroType);
            }
        }
    }

    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avroType);
    }
}


https://developer.aliyun.com/ask/131116?spm=a2c6h.13159736

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

原文地址:https://www.cnblogs.com/connie313/p/12609114.html

时间: 2024-08-06 18:10:30

flink kafka consumer with avro schema. handling null的相关文章

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析

参考<在Kafka中使用Avro编码消息:Consumer篇>.<在Kafka中使用Avro编码消息:Producter篇> pom.xml <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependenc

Kafka中使用Avro编码、解码消息

1.消费者代码 import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clie

Kafka Consumer接口

对于kafka的consumer接口,提供两种版本,   high-level 一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该Consumer group的last offset 参考,https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example 不过要注意一些注意事项,对于多个partition和多个consumer 1. 如果consumer比partition

Flink kafka producer with transaction support

Background In flink 1.5 above, it provides a new kafka producer implementation: FlinkKafkaProducer011, aligning with kafka 0.11 above that supports transaction. Kafka transaction allows multiple kafka messages sent by producer to deliver in an atomic

Kafka Consumer应用与高级应用

Kafka Consumer应用与高级应用 PS:本博客仅作学习.总结.交流使用,参考以下博客&资料 1.http://kafka.apache.org/intro.html 2.https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 3.http://www.cnblogs.com/luotianshuai/p/5206662.html 4.http://www.cnblogs.com/fxj

【原创】Kafka Consumer 多线程 实例

Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖.社区最近也在探讨正式用这套consumer API替换Scala版本的consumer的计划.鉴于目前这方面的资料并不是很多,本文将尝试给出一个利用KafkaConsumer编写的多线程消费者实例,希望对大家有所帮助. 这套API最重要的入口就是KafkaConsumer(o.a.k.clients.consumer.KafkaConsumer),普通的单线程使

Kafka详解五、Kafka Consumer的底层API- SimpleConsumer

Kafka提供了两套API给Consumer The high-level Consumer API The SimpleConsumer API 第一种高度抽象的Consumer API,它使用起来简单.方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情 一个消息读取多次 在一个处理过程中只消费Partition其中的一部分消息 添加事务管理机制以保证消息被处理且仅被处理一次 使用SimpleConsumer有哪些弊端呢? 必须在程序

Kafka consumer在项目中的多线程处理方式

对于KafkaConsumer而言,它不像KafkaProducer,不是线程安全的,状态是在consumer中维护的,所以实现时要注意多线程的使用,一般有2种使用方法: 1:每个Consumer有自己的线程,consumer去拉取数据,并对数据处理,这种方式比较简单,易于实现,容易顺序处理消息 2:消费者处理者方式,创建一个线程池,在consumer拉取数据后,由线程池来中的线程来处理数据,把拉取数据与处理数据解耦,但数据处理有可能破坏partition的消息顺序 从Kafka 文档中我们也可

Kafka设计解析(四)- Kafka Consumer设计解析

本文转发自Jason’s Blog,原文链接 http://www.jasongj.com/2015/08/09/KafkaColumn4 摘要 本文主要介绍了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer实现的语义,以及适用场景.以及未来版本中对High Level Consumer的重新设计–使用Consumer Coordinator解决Split Brain和Herd等问题. H