Flink资料(4) -- 类型抽取和序列化

类型抽取和序列化

本文翻译自Type Extraction and Serialization

Flink处理类型的方式比较特殊,包括它自己的类型描述,一般类型抽取和类型序列化框架。该文档描述这些概念并解释其机理。

Java API和Scala API处理类型信息的方式有根本性的区别,所以本文描述的问题仅与其中一种API相关

一、Flink中对类型的处理

一般处理类型时,我们并不干涉,而是让编程语言和序列化框架来自动处理类型。与之相反的,Flink想要尽可能掌握进出用户函数的数据类型的信息。

1.    为了可以使用持久化类(POJO)以及通过成员名来grouping/joining,Flink需要数据类型信息来在job被执行前进行检查(拼写错误和类型兼容性)

2.    信息知道得越多,编译器/优化器就可以开发出更好的序列化(serialization)和数据分布模式(data layout scheme)。这对Flink的“memory usage paradigm”是非常重要的(它用于序列化从堆中出入的数据,并使得序列化开销变得十分低)

3.    对于即将推出的逻辑程序(logic program,我们需要这些信息来识别函数的“模式(scheme)”

4.    最后,这些信息使得用户免于考虑序列化框架,以及考虑将类型注册到这些框架中去

二、Flink的TypeInformation类

TypeInformation是所有类型描述类的基类。它包括了一些类型的基本属性,并可以动过它来生成序列化器(serializer),特殊情况下可以生成类型的比较器。(Note:Flink中的比较器不仅仅是定义大小,它们是处理keys时的基本辅助工具)

Flink对类型做出如下区分:

1.    基本类型:所有Java基本数据类型和对应装箱类型,加上void, String, Date

2.    基本数组和Object数组

3.    复合类型:

a.     Flink Java Tuple(Flink Java API的一部分)

b.    Scala case 类(包括Scala Tuple)

c.     POJO类:遵循类bean模式的类

4.    Scala辅助类型(Option,Either,Lists,Maps…)

5.    泛型(Generic):这些类型将不会由Flink自己序列化,而是借助Kryo来序列化

POJO类支持复杂类型的创建,并且在定义keys时可以使用成员的名字:dataSet.join(another).where("name").equalTo("personName")。同时,POJO类对于运行时是透明的,这使得Flink可以十分高效地处理它们。

POJO类型的规则

当以下条件满足时,Flink将以POJO类型识别一个数据类型,并允许以成员名引用:

1.    该类是public并且独立的(即没有非静态的内部类)

2.    该类拥有一个public的无参数构造函数

3.    该类(以及该类的超类)的成员要么是public的,要么拥有public的符合Java bean对Getter和Setter命名规则的Getter和Setter函数。

三、Scala API中的类型信息

四、Java API中的类型信息

Java一般会清除泛型信息。仅对泛型类的子类,由子类存储泛型变量捆绑的类型。

Flink对实现用户函数的(匿名anonymous)类使用反射(reflection)来确定该函数的泛型参数的类型。该逻辑还包含简单的类型推论,用于函数返回的类型由输入类型决定的情况,例如如下的泛型函数:

1 public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>>
2 {
3     public Tuple2<T, Long> map(T value)
4     {
5          return new Tuple<T, Long>(value, 1L);
6     }
7 }

Flink并不可能在所有情况下都可靠地识别出函数的数据类型。在泛型Lambda表达式(我们试图在Java社区解决这个问题,见下)和泛型变量推断上仍然存在一些问题。

4.1 Java API中的Type Hints

为了帮助Flink无法重建被清除的泛型信息的情况,自版本0.9后提供了type hint的Java API。Type hint通知系统一个函数产生的数据集的类型。下面是一个Type hint的例子:

DataSet<SomeType> result = dataSet
  .map(new MyGenericNonInferrableFunction<Long, SomeType>())
  .returns(SomeType.class);

在本例中,returns语句通过一个类指定了产生的类型。Type hint支持如下几种方法进行类型定义:

1.    类,针对无参数的类型(非泛型)

2.    String,语法为returns("Tuple2<Integer, my.SomeType>"),该字符串将被分析并转换为TypeInformation对象

3.    一个TypeInfomation对象

4.2 针对Java 8 Lambda表达式的类型抽取

由于Lambda表达式不涉及继承函数接口的实现类,Java 8 Lambda的类型抽取与非lambda表达式的机理并不相同。

现在,Flink正尝试确定如何实现Lambda表达式,并使用Java的泛型签名(generic signature)来决定参数类型和返回类型。但是,并不是所有的编译器中的签名都是为Lambda表达式生成的(本文写作时该文档仅完全适用在Milestone 2的Eclipse JDT编译器4.5及以前的编译器下)

4.2.1 类型信息对Java Lambda的改善

Flink的提交者之一(Timo Walther)活跃于Eclipse JDT 编译器社区和OpenJDK社区,并向编译器提交了改善类型信息对Java 8 Lambda表达式可用性的补丁。

Eclipse JDT 编译器在版本4.5 M4添加了对该方面的支持,而有关OpenJDK编译器的有关方面正在讨论当中。

4.3 POJO类型的序列化

类PojoTypeInfomation用于创建针对POJO中所有成员的序列化器。Flink自带了针对诸如int,long,String等标准类型的序列化器,对于所有其他的类型,我们交给Kryo处理。

如果Kryo不能处理某类型,则我们可以通过PojoTypeInfo来使用Avro来序列化POJO,为了达成该目的,我们需要调用如下接口:

1 1 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2 2 env.getConfig().enableForceAvro();

注意Flink自动序列化POJO对象,该对象由Avro用Avro序列化器产生。

如果我们想使得整个POJO类型都由Kryo序列化器处理,则我们如下设置:

1 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2 env.getConfig().enableForceKryo();

如果Kryo不能序列化该POJO对象,我们可以添加一个自定义序列化器到Kryo,使用代码如下:

1 env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

这些方法还有许多不同的重载形式可用

时间: 2024-09-30 21:09:15

Flink资料(4) -- 类型抽取和序列化的相关文章

Flink资料(6) -- 如何添加一个新的Operator

false false false false EN-US ZH-CN X-NONE /* Style Definitions */ table.MsoNormalTable {mso-style-name:普通表格; mso-tstyle-rowband-size:0; mso-tstyle-colband-size:0; mso-style-noshow:yes; mso-style-priority:99; mso-style-parent:""; mso-padding-alt

调用webservices中 枚举类型没有被序列化问题

引用服务后,代理类为自动为所有枚举类型生成了一个Bool类型相关字段,命名方式:比如枚举类名为“PayType”,生成的相关字段为“PayTypeSpecified”,此字段有何作用? PayTypeSpecified字段默认值为false,请求webservices的时候是不会序列化“PayType”这个字段的,当设定“PayTypeSpecified”为True的时候,就会被序列化了.

Flink资料(5) -- Job和调度

该文档翻译自Jobs and Scheduling ----------------------------------------------- 该文档简单描述了Flink是如何调度Job的,以及如何在JobManager上表现并跟踪Job状态. 一.调度 Flink通过任务槽(Task Slot)定义执行资源.每个TaskManager都有一或多个任务槽,每个任务槽都可以运行一个流水线并行任务.一个流水线包括多个连续的任务,如一个MapFunction的第n个并行实例与一个ReduceFun

Flink资料(8) -- Flink代码贡献的指导及准则

本文翻译自Contributing Code ----------------------------------------- Apache Flink是由自愿的代码贡献者维护.优化及扩展的.Apache Flink社区鼓励任何人贡献源代码.为了使得代码贡献者及复查者之便利,以及保存高质量的代码基础,我们遵循着一个贡献代码的过程,该过程将在本文档中详细描述. 本文包括有关向Flink贡献代码所需知晓的所有事宜,描述了从前期准备,测试以及代码提交的过程,同时解释了代码编写的准则以及Flink基础

第二节:使类型可序列化

设计一个类型时,设计人员必须郑重地决定是否允许类型的实例序列化.类型默认是不可序列化的.例如,以下代码可能不会像你希望的那样工作: internal struct Point { public Int32 x, y;} private static void OptInSerialization() { Point pt = new Point { x = 1, y = 2 }; using (var stream = new MemoryStream()) { new BinaryFormat

C# 序列化过程中的已知类型(Known Type)

WCF下的序列化与反序列化解决的是数据在两种状态之间的相互转化:托管类型对象和XML.由于类型定义了对象的数据结构,所以无论对于序列化还是反序列化,都必须事先确定对象的类型.如果被序列化对象或者被反序列化生成的对象包含不可知的类型,序列化或者反序列化将会失败.为了确保DataContractSerializer的正常序列化和反序列化,我们需要将“未知”类型加入DataContractSerializer“已知”类型列表中. 一.未知类型导致序列化失败 .NET的类型可以分为两种:声明类型和真实类

.NET提供了哪几种可进行序列化操作的类型

分析问题 为了序列化具体实例到某种专业的格式,.NET提供了三种对象序列格式化类型:BinaryFormatter.SoapFormatter和XmlSerializer. BinaryFormatter用于将可序列化的对象序列化成二进制的字节流,而SoapFormatter则致力于把可序列化的类型序列化成符合SOAP规范的XML文档,以供使用.对于那些使用Serializable特性来申明成可序列化类型的对象,BinaryFormatter和SoapFormatter可以方便地把它们序列化成特

利用&lt;JavascriptSerializer类&gt; 进行Json对象的序列化和反序列化

1. 首先, JavascriptSerializer类所在名空间: using System.Web.Script.Serialization; 2. 相关的3篇文章, 标记下: 使用JavaScriptSerializer进行JSON序列化 注意:    是复杂对象. JSON是Javascript中常用的数据格式,然而,在.NET 2.0中没有内置序列化JSON的类,原因估计是当时Ajax尚未兴起.后来就有人写了一个Json.NET类库..NET 3.5新增了一个把对象序列化为JSON字符

跟我一起学WCF(7)——WCF数据契约与序列化详解

一.引言 在前面博文介绍到,WCF的契约包括操作契约.数据契约.消息契约和错误契约,前面一篇博文已经结束了操作契约的介绍,接下来自然就是介绍数据契约了.所以本文要分享的内容就是数据契约. 二.数据契约的介绍 在WCF中,服务契约定义了可供调用的服务操作方法,而数据契约则是定义了服务端和客户端之间传送的自定义类型,在WCF项目中,必不可少地是传递数据,把客户端需要传递的数据传送到服务中,服务接收到数据再对其进行处理.然而在WCF中,传递的类型必须标记为DataContractAttribute属性