Flink入门(五)——DataSet Api编程指南

Apache Flink

Apache Flink 是一个兼顾高吞吐、低延迟、高性能的分布式处理框架。在实时计算崛起的今天,Flink正在飞速发展。由于性能的优势和兼顾批处理,流处理的特性,Flink可能正在颠覆整个大数据的生态。

DataSet API

首先要想运行Flink,我们需要下载并解压Flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html

我们可以选择Flink与Scala结合版本,这里我们选择最新的1.9版本Apache Flink 1.9.0 for Scala 2.12进行下载。

下载成功后,在windows系统中可以通过Windows的bat文件或者Cygwin来运行Flink。

在linux系统中分为单机,集群和Hadoop等多种情况。

请参考:Flink入门(三)——环境与部署

Flink的编程模型,Flink提供了不同的抽象级别以开发流式或者批处理应用,本文我们来介绍DataSet API ,Flink最常用的批处理编程模型。

Flink中的DataSet程序是实现数据集转换的常规程序(例如,Filter,映射,连接,分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

示例程序

以下程序是WordCount的完整工作示例。您可以复制并粘贴代码以在本地运行它。

Java

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

Scala

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

数据集转换

数据转换将一个或多个DataSet转换为新的DataSet。程序可以将多个转换组合到复杂的程序集中。

DataSet API 中最重要的就是这些算子,我们将数据接入后,通过这些算子对数据进行处理,得到我们想要的结果。

Java版算子如下:

转换 描述
Map 采用一个数据元并生成一个数据元。data.map(new MapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
FlatMap 采用一个数据元并生成零个,一个或多个数据元。data.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String value, Collector<String> out) { for (String s : value.split(" ")) { out.collect(s); } } });
MapPartition 在单个函数调用中转换并行分区。该函数将分区作为Iterable流来获取,并且可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。data.mapPartition(new MapPartitionFunction<String, Long>() { public void mapPartition(Iterable<String> values, Collector<Long> out) { long c = 0; for (String s : values) { c++; } out.collect(c); } });
Filter 计算每个数据元的布尔函数,并保存函数返回true的数据元。 重要信息:系统假定该函数不会修改应用谓词的数据元。违反此假设可能会导致错误的结果。data.filter(new FilterFunction<Integer>() { public boolean filter(Integer value) { return value > 1000; } });
Reduce 通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce可以应用于完整数据集或分组数据集。data.reduce(new ReduceFunction<Integer> { public Integer reduce(Integer a, Integer b) { return a + b; } });如果将reduce应用于分组数据集,则可以通过提供CombineHintto 来指定运行时执行reduce的组合阶段的方式 setCombineHint。在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如1/10)。
ReduceGroup 将一组数据元组合成一个或多个数据元。ReduceGroup可以应用于完整数据集或分组数据集。data.reduceGroup(new GroupReduceFunction<Integer, Integer> { public void reduce(Iterable<Integer> values, Collector<Integer> out) { int prefixSum = 0; for (Integer i : values) { prefixSum += i; out.collect(prefixSum); } } });
Aggregate 将一组值聚合为单个值。聚合函数可以被认为是内置的reduce函数。聚合可以应用于完整数据集或分组数据集。Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);您还可以使用简写语法进行最小,最大和总和聚合。Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
Distinct 返回数据集的不同数据元。它相对于数据元的所有字段或字段子集从输入DataSet中删除重复条目。data.distinct();使用reduce函数实现Distinct。您可以通过提供CombineHintto 来指定运行时执行reduce的组合阶段的方式 setCombineHint。在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如1/10)。
Join 通过创建在其键上相等的所有数据元对来连接两个数据集。可选地使用JoinFunction将数据元对转换为单个数据元,或使用FlatJoinFunction将数据元对转换为任意多个(包括无)数据元。请参阅键部分以了解如何定义连接键。result = input1.join(input2) .where(0) // key of the first input (tuple field 0) .equalTo(1); // key of the second input (tuple field 1)您可以通过Join Hints指定运行时执行连接的方式。提示描述了通过分区或广播进行连接,以及它是使用基于排序还是基于散列的算法。有关可能的提示和示例的列表,请参阅“ 转换指南”。 如果未指定提示,系统将尝试估算输入大小,并根据这些估计选择最佳策略。// This executes a join by broadcasting the first data set // using a hash table for the broadcast data result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST) .where(0).equalTo(1);请注意,连接转换仅适用于等连接。其他连接类型需要使用OuterJoin或CoGroup表示。
OuterJoin 在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有数据元对。此外,如果在另一侧没有找到匹配的Keys,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配数据元对(或一个数据元和null另一个输入的值)被赋予JoinFunction以将数据元对转换为单个数据元,或者转换为FlatJoinFunction以将数据元对转换为任意多个(包括无)数据元。请参阅键部分以了解如何定义连接键。input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins .where(0) // key of the first input (tuple field 0) .equalTo(1) // key of the second input (tuple field 1) .with(new JoinFunction<String, String, String>() { public String join(String v1, String v2) { // NOTE: // - v2 might be null for leftOuterJoin // - v1 might be null for rightOuterJoin // - v1 OR v2 might be null for fullOuterJoin } });
CoGroup reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅keys部分以了解如何定义coGroup键。data1.coGroup(data2) .where(0) .equalTo(1) .with(new CoGroupFunction<String, String, String>() { public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) { out.collect(...); } });
Cross 构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用CrossFunction将数据元对转换为单个数据元DataSet<Integer> data1 = // [...] DataSet<String> data2 = // [...] DataSet<Tuple2<Integer, String>> result = data1.cross(data2);注:交叉是一个潜在的非常计算密集型 算子操作它甚至可以挑战大的计算集群!建议使用crossWithTiny()crossWithHuge()来提示系统的DataSet大小。
Union 生成两个数据集的并集。DataSet<String> data1 = // [...] DataSet<String> data2 = // [...] DataSet<String> result = data1.union(data2);
Rebalance 均匀地Rebalance 数据集的并行分区以消除数据偏差。只有类似Map的转换可能会遵循Rebalance 转换。DataSet<String> in = // [...] DataSet<String> result = in.rebalance() .map(new Mapper());
Hash-Partition 散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionByHash(0) .mapPartition(new PartitionMapper());
Range-Partition Range-Partition给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionByRange(0) .mapPartition(new PartitionMapper());
Custom Partitioning 手动指定数据分区。 注意:此方法仅适用于单个字段键。DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)
Sort Partition 本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接sortPartition()调用来完成对多个字段的排序。DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING) .mapPartition(new PartitionMapper());
First-n 返回数据集的前n个(任意)数据元。First-n可以应用于常规数据集,分组数据集或分组排序数据集。分组键可以指定为键选择器函数或字段位置键。DataSet<Tuple2<String,Integer>> in = // [...] // regular data set DataSet<Tuple2<String,Integer>> result1 = in.first(3); // grouped data set DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0) .first(3); // grouped-sorted data set DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0) .sortGroup(1, Order.ASCENDING) .first(3);

数据源

数据源创建初始数据集,例如来自文件或Java集合。创建数据集的一般机制是在InputFormat后面抽象的 。Flink附带了几种内置格式,可以从通用文件格式创建数据集。他们中的许多人在ExecutionEnvironment上都有快捷方法。

基于文件的:

  • readTextFile(path)/ TextInputFormat- 按行读取文件并将其作为字符串返回。
  • readTextFileWithValue(path)/ TextValueInputFormat- 按行读取文件并将它们作为StringValues返回。StringValues是可变字符串。
  • readCsvFile(path)/ CsvInputFormat- 解析逗号(或其他字符)分隔字段的文件。返回元组或POJO的DataSet。支持基本java类型及其Value对应作为字段类型。
  • readFileOfPrimitives(path, Class)/ PrimitiveInputFormat- 解析新行(或其他字符序列)分隔的原始数据类型(如String或)的文件Integer
  • readFileOfPrimitives(path, delimiter, Class)/ PrimitiveInputFormat- 解析新行(或其他字符序列)分隔的原始数据类型的文件,例如StringInteger使用给定的分隔符。
  • readSequenceFile(Key, Value, path)/ SequenceFileInputFormat- 创建一个JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们作为Tuple2 <Key,Value>返回。

基于集合:

  • fromCollection(Collection) - 从Java Java.util.Collection创建数据集。集合中的所有数据元必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。
  • fromElements(T ...) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 并行地从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。
  • generateSequence(from, to) - 并行生成给定间隔中的数字序列。

通用:

  • readFile(inputFormat, path)/ FileInputFormat- 接受文件输入格式。
  • createInput(inputFormat)/ InputFormat- 接受通用输入格式。

例子

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");

// read text file from a HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                           .types(Integer.class, String.class, Double.class);

// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  // take the first and the fourth field
                           .types(String.class, Double.class);

// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");

// read a file from the specified path of type SequenceFileInputFormat
DataSet<Tuple2<IntWritable, Text>> tuples =
 env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

// creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");

// generate a number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000);

// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                     .setDBUrl("jdbc:derby:memory:persons")
                     .setQuery("select name, age from persons")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );

// Note: Flink's program compiler needs to infer the data types of the data items which are returned
// by an InputFormat. If this information cannot be automatically inferred, it is necessary to
// manually provide the type information as shown in the examples above.

收集数据源和接收器

通过创建输入文件和读取输出文件来完成分析程序的输入并检查其输出是很麻烦的。Flink具有特殊的数据源和接收器,由Java集合支持以简化测试。一旦程序经过测试,源和接收器可以很容易地被读取/写入外部数据存储(如HDFS)的源和接收器替换。

在开发中,我们经常直接使用接收器对数据源进行接收。

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

// Create a DataSet from a list of elements
DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataSet from any Java collection
List<Tuple2<String, Integer>> data = ...
DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataSet from an Iterator
Iterator<Long> longIt = ...
DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);

广播变量

除了常规的 算子操作输入之外,广播变量还允许您为 算子操作的所有并行实例提供数据集。这对于辅助数据集或与数据相关的参数化非常有用。然后,算子可以将数据集作为集合访问。

// 1. The DataSet to be broadcast
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcast DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }

    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

分布式缓存

Flink提供了一个分布式缓存,类似于Apache Hadoop,可以在本地访问用户函数的并行实例。此函数可用于共享包含静态外部数据的文件,如字典或机器学习的回归模型。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

以上就是DataSet API 的使用,其实和spark非常的相似,我们将数据接入后,可以利用各种算子对数据进行处理。

Flink Demo代码

Flink系列文章:

Flink入门(一)——Apache Flink介绍
Flink入门(二)——Flink架构介绍

Flink入门(三)——环境与部署

Flink入门(四)——编程模型

更多实时计算,Flink,Kafka等相关技术博文,欢迎关注实时流式计算

原文地址:https://www.cnblogs.com/tree1123/p/12155955.html

时间: 2024-08-02 06:10:26

Flink入门(五)——DataSet Api编程指南的相关文章

&lt;译&gt;Flink编程指南

Flink 的流数据 API 编程指南 Flink 的流数据处理程序是常规的程序 ,通过再流数据上,实现了各种转换 (比如 过滤, 更新中间状态, 定义窗口, 聚合).流数据可以来之多种数据源 (比如, 消息队列, socket 流, 文件). 通过sink组件落地流计算的最终结果,比如可以把数据落地文件系统,标准输出流比如命令行界面, Flink 的程序可以运行在多种上下文环境 ,可以单独只是Flink api,也可以嵌入其他程序. execution可以运行在本地的 JVM里, 也可以 运行

Flink Program Guide (2) -- DataStream API编程指导 -- For Java

v\:* {behavior:url(#default#VML);} o\:* {behavior:url(#default#VML);} w\:* {behavior:url(#default#VML);} .shape {behavior:url(#default#VML);} 张安 张安 2 1 2016-08-02T10:56:00Z 2016-08-02T10:56:00Z 1 2945 16790 139 39 19696 16.00 false false false false

Maya API编程快速入门

一.Maya API编程简介 Autodesk? Maya? is an open product. This means that anyone outside of Autodesk can change Maya's existing features or add entirely new features. There are several ways you can modify Maya: · MEL?-(Maya Embedded Language) is a powerful

Mysql C语言API编程入门讲解

原文:Mysql C语言API编程入门讲解 软件开发中我们经常要访问数据库,存取数据,之前已经有网友提出让鸡啄米讲讲数据库编程的知识,本文就详细讲解如何使用Mysql的C语言API进行数据库编程.  API,全称Application Programming Interfaces,即应用程序编程接口,我们可以调用这些接口,执行API函数提供的功能.  Mysql C语言API就是用C语言编写的Mysql编程接口,使用这些接口函数可以实现对Mysql数据库的查询等操作.  Mysql的安装  要进

Android修行笔记(五):ListView 良好编程指南

构成Android应用的一个重要的UI构件就要算ListView了,我们项目中用得很多,所以总结了以下的经验进行分享探讨 1.重用Adapter getView函数中的ConvertView 列表的每一行Item通常都是可复用的,重用可以避免多次创建,让滚动更加顺滑. (注意:2.3系统在复用View的时候有些状态会被保留,比如GONE的状态,所以在getView中重用convertView时要对Visibility属性重新进行设置) 2.使用ViewHolder来保存通过findViewByI

mybatis从入门到精通(五) sqlSession API的使用

mybatis从入门到精通(五) sqlSession API的使用  一丶简介 SqlSession类似于mybatis对外的接口层, 它几乎囊括了所有对外的api, 因此, 学习SqlSession的使用方法对于了解mybatis还是有必要的. 对应官方文档 二丶配置SqlSession的环境<environment/> <environment/> 主要包括了两种配置, 事务管理和数据源. 这里的配置仅仅是用于学习, 实际应用一般是将事务交由Spring容器管理, 数据源一般

Flink入门宝典(详细截图版)

本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本.需要安装Netcat进行简单调试. 这里简述安装过程,并使用IDEA进行开发一个简单流处理程序,本地调试或者提交到Flink上运行,Maven与JDK安装这里不做说明. 一.Flink简介 Flink诞生于欧洲的一个大数据研究项目StratoSphere.该项目是柏林工业大学的一个研究性项目.早期,Flink是做Batch计算的,但是在2014年,StratoSphere里面的核心成员孵化出

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Posix线程编程指南(5)

Posix线程编程指南(5) 杨沙洲 原文地址:http://www.ibm.com/developerworks/cn/linux/thread/posix_threadapi/part5/ 杂项 这是一个关于Posix线程编程的专栏.作者在阐明概念的基础上,将向您详细讲述Posix线程库API.本文是第五篇将向您讲述pthread_self().pthread_equal()和pthread_once()等杂项函数. 在Posix线程规范中还有几个辅助函数难以归类,暂且称其为杂项函数,主要包