Flink Program Guide (2) -- DataStream API编程指导 -- For Java

v\:* {behavior:url(#default#VML);}
o\:* {behavior:url(#default#VML);}
w\:* {behavior:url(#default#VML);}
.shape {behavior:url(#default#VML);}

张安
张安
2
1
2016-08-02T10:56:00Z
2016-08-02T10:56:00Z
1
2945
16790
139
39
19696
16.00

false

false
false
false

EN-US
ZH-CN
X-NONE

/* Style Definitions */
table.MsoNormalTable
{mso-style-name:普通表格;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-noshow:yes;
mso-style-priority:99;
mso-style-parent:"";
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.0pt;
font-family:"Times New Roman",serif;}

DataStream API编程指导

文档翻译自Flink DataStream API Programming Guide

-----------------------------------------------------------------------

Flink中的DataStream程序是实现在数据流上的transformation(如filtering,updating state, defining windows,aggregating)的普通程序。创建数据流的来源多种多样(如消息队列,socket流,文件等)。程序通过data sink返回结果,如将数据写入文件,或发送到标准输出(如命令行终端)。Flink程序可以在多种上下文中运行,如独立运行或是嵌入在其他程序中执行。程序的执行可以发生在本地JVM,或者在一个拥有许多设备的集群上。

有关介绍Flink API基础概念的文档,请见basic concepts

为了创建你自己的Flink DataStream程序,我们鼓励你从文档anatomy of a Flink Program开始,且欢迎你添加自己的transformations。该文档接下来的部分是额外的operation和进阶特性的参考文档。

一、示例程序

下面的程序是一个完整的流式窗口word count应用,它计算出在web socket的大小为5秒的窗口中的出现各个单词的数量。你可以复制 & 粘贴代码并在本地运行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public
class
WindowWordCount {

public
static
void main(String[]
args) throws Exception {

StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String,
Integer>> dataStream = env
  .socketTextStream("localhost",
9999)
  .flatMap(new
Splitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1);

dataStream.print();

env.execute("Window
WordCount");

}

public
static class
Splitter implements
FlatMapFunction<String, Tuple2<String,
Integer>> {
@Override

public
void flatMap(String
sentence, Collector<Tuple2<String, Integer>>
out) throws Exception {

for
(
String word: sentence.split("
")) {

out.collect(new
Tuple2<String, Integer>(word, 1));

}

}

}

}

要运行该示例程序,首先从终端运行netcat来开始输入流

nc -lk
9999

仅需要输入一些单词,这些将是word count程序的输入数据。如果你想看到count大于1的结果,在5秒内重复输入同一个单词。

二、DataStream
Transformations

Data
transformation会将一或多个DataStream转换成一个新的DataStream。程序可以将多个transformation结合形成复杂的拓扑结构(topology)。

本小节给出了所有可用的transformation的描述。


Transformation


描述


Map

DataStream ->
DataStream


获取一个element并产出一个element。下例是一个将输入*2的map方法:

DataStream<Integer>
dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer map(Integer
value) throws Exception {
    return 2 *
value;
  }
});


FlapMap

DataStream -> DataStream


获取一个element,并产生出0、1或多个element。下例是一个为句子分词的flatmap方法

dataStream.flatMap(new FlatMapFunction<String, String>() {
  @Override
  public void flatMap(String value,
Collector<String> outthrows Exception {
    for(String word: value.split("
")){
    out.collect(word);
    }
  }
});


Filter

DataStream -> DataStream


在每个获取的element上运行一个boolean方法,留下那些方法返回true的element。下例是一个过滤掉0值的filter

dataStream.filter(new FilterFunction<Integer>() {
  @Override
  public boolean filter(Integer value) throws
Exception {
    return value != 0;
  }
});


KeyBy
DataStream
-> KeyedStream


将流逻辑分为不相交的分区,每个分区包含的都是具有相同key的element,该分区方法使用hash分区实现。定义key的方法见于Keys。下例是一个返回KeyedDataStream的transformation。

dataStream.keyBy("someKey") // Key by field
"someKey"

dataStream.keyBy(0) // Key by the first element of a Tuple


Reduce

KeyedStream -> DataStream


一个在keyed data stream上“滚动”进行的reduce方法。将上一个reduce过的值和当前element结合,产生新的值并发送出。下例是一个创建部分和的reduce方法。

keyedStream.reduce(new ReduceFunction<Integer>() {
  @Override
  public Integer reduce(Integer
value1, Integer value2throws Exception {
    return value1 +
value2;
  }
});


Fold

KeyedStream -> DataStream


一个在带有初始值的数据流上“滚动”进行的fold方法。将上一个fold的值和当前element结合,产生新的值并发送出。下例是一个fold方法,当应用于序列{1, 2,
3, 4, 5}时,它发出序列{"start-1",
"start-1-2", "start-1-2-3" …}。

DataStream<String>
result keyedStream.fold("start", new FoldFunction<Integer,
String>() {
  @Override
  public String fold(String
current, Integer value) {
    return current + "-" + value;
  }
});


Aggregations

KeyedStream -> DataStream


在一个keyed DataStream上“滚动”进行聚合的方法。其中,min和minBy的区别在于min返回最小值,而minBy返回的是带有在此域中最小值的element(max和maxBy一样如此)。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");


Window

KeyedStream - >
WindowedStream


Window可以定义在已经分区的KeyedStream上。窗口将根据一些特征(如最近5秒到达的数据)将数据按其各自的key集合在一起。有关窗口的完整描述见于windows

//
Last 5 seconds of data

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));


WindowAll

DataStream -> AllWindowedStream


Window可以定义在普通的DataStream上。窗口将根据一些特征(如最近5秒到达的数据)将所有Stream事件集合在一起。有关窗口的完整描述见于windows
警告:该transformation在很多情况下都不是并行化的,所有数据将被收集到一个运行windowAll
Operator的任务上。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data


Window Apply

WindowedStream
-> DataStream

AllWindowedStream
-> DataStream


将一个一般函数应用到window整体上去,下面是一个人工计算window中所有element的总和的应用。
注意:如果你正在使用一个windowAll的transformation,你需要使用AllWindowFunction来代替下例中的参数。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>,
Integer, Tuple, Window>() {
public void apply (Tuple tuple,
  Window window,
  Iterable<Tuple2<String,
Integer>> values,
  Collector<Integer>
out) throws Exception {
    int sum = 0;
    for (value t: values)
{

      sum += t.f1;
    }
    out.collect (new Integer(sum));
  }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>,
Integer, Window>() {
public void apply (Window window,
  Iterable<Tuple2<String,
Integer>> values,
  Collector<Integer>
out) throws Exception {
    int sum = 0;
    for (value t: values)
{

      sum += t.f1;
    }
    out.collect (new Integer(sum));
  }
});


Window Reduce

WindowedStream
-> DataStream


对窗口应用一个功能性reduce方法并返回reduce的结果

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>()
{

  public Tuple2<String,
Integer> reduce(Tuple2<String,
Integer> value1, Tuple2<String, Integer>
value2) throws Exception {
  return new Tuple2<String,Integer>(value1.f0,
value1.f1 + value2.f1);
  }
};


Window Fold

Windowed Stream
-> DataStream


对窗口应用一个功能性fold方法。下例代码在应用到序列(1, 2,
3, 4, 5)时,它将该序列fold成为字符串"start-1-2-3-4-5"

windowedStream.fold("start-", new FoldFunction<Integer,
String>() {
  public String fold(String
current, Integer value) {
    return current + "-" + value;
  }
};


Aggregations on
windows

WindowedStream
-> DataStream


对窗口中的内容聚合。其中,min和minBy的区别在于min返回最小值,而minBy返回的是带有在此域中最小值的element(max和maxBy一样如此)。

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");


Union

DataStream*
-> DataStream


将2个或多个data stream合并创建出一个新的包含所有stream的element的stream。注意:如果你对一个data
stream自己进行union操作,则在返回的结果中,每个element都会出现2个。

dataStream.union(otherStream1,
otherStream2, ...);


Window Join

DataStream,
DataStream -> DataStream


在给定key和普通window中,将2个DataStream进行Join操作

dataStream.join(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});


Window CoGroup

DataStream,
DataStream -> DataStream


在给定key和普通window中,对2个DataStream进行CoGroup操作。

dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});


Connect

DataStream,
DataStream -> ConnectedStreams


在保留两个DataStream的类型的情况下,将二者"连接"起来。Connect使我们可以共享两个Stream的状态

DataStream<Integer>
someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String>
connectedStreams = someStream.connect(otherStream);


CoMap, CoFlatMap

ConnectedStreams
-> DataStream


该操作类似于map和flatMap针对连接的Data
Stream版本。Sd

connectedStreams.map(new
CoMapFunction<Integer, String, Boolean>() {
  @Override
  public Boolean map1(Integer
value) {
    return true;
  }

  @Override
  public Boolean map2(String
value) {
    return false;
  }
});

connectedStreams.flatMap(new CoFlatMapFunction<Integer,
String, String>() {

  @Override
  public void flatMap1(Integer value,
Collector<String> out) {
    out.collect(value.toString());
  }

  @Override
  public void flatMap2(String value,
Collector<String> out) {
    for (String word: value.split("
")) {
      out.collect(word);
    }
  }
});


Split

DataStream ->
SplitStream


根据某些标准将Stream分割成2个或更多的stream

SplitStream<Integer>
split = someDataStream.split(new OutputSelector<Integer>()
{

  @Override
  public Iterable<String>
select(Integer
value) {
    List<String>
output = new ArrayList<String>();
    if (value % 2 == 0) {
      output.add("even");
    }
    else {
      output.add("odd");
    }
    return output;
  }
});


Select

SplitStream ->
DataStream


从SplitStream中选择1个或多个stream

SplitStream<Integer>
split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");


Iterate

DataStream ->
IterativeStream -> DataStream


通过将一个Operator的输出重定向到前面的某个Operator的方法,在数据流图中创建一个“反馈”循环。这在定义持续更新模型的算法时十分有用。下面的例子从一个Stream开始,并持续应用迭代体(Iteration
body)。大于0的element被送回到反馈通道,而其他的element则被转发到下游。相关完整描述请见Iterations

IterativeStream<Long>
iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
  @Override
  public boolean filter(Integer value) throws
Exception {
    return value > 0;
  }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
  @Override
  public boolean filter(Integer value) throws
Exception {
    return value <= 0;
  }
});


Extract Timestamps

DataStream ->
DataStream


通过从数据中抽取时间戳来使得通过使用事件时间语义的窗口可以工作。详情见于Event Time

stream.assignTimestamps (new TimeStampExtractor()
{...});

接下来的Transformation是对Tuple类型的data
stream可用的Transformation:


Transformation


描述


Project

DataStream ->
DataStream


从tuple中选择出域的子集而产生新的DataStream

DataStream<Tuple3<Integer,
Double, String>> in = //
[...]

DataStream<Tuple2<String, Integer>>
out = in.project(2,0);

物理级分割(Physical
Partitioning)

如果需要,Flink同样提供了在进行一次transformation后针对精确stream分割的低层次的控制(low-level
control),它们通过以下几个方法实现。


Transformations


描述


Custom partitioning

DataStream ->
DataStream


使用一个用户自定义的Partitioner来对每个element选择目标任务sd

dataStream.partitionCustom(partitioner,
"someKey");
dataStream.partitionCustom(partitioner, 0);


Random partitioning

DataStream ->
DataStream


根据均匀分布来随机分割element

dataStream.shuffle();


Rebalancing(轮询分割)

DataStream ->
DataStream


轮询分割element,创建相同负荷的分割。对数据变形(data
skew)时的性能优化十分有用s

dataStream.rebalance();


Rescaling

DataStream ->
DataStream


将element轮询分割到下游Operator子集中去。这在你想流水线并行时十分有用,例如,需要从每个并行的source实例中将数据fan out到一个有着一些mapper来分发负载,但是又不想要函数rebalance()那样引起的完全rebalance的效果时。这就需要仅在本地传输数据,而不是需要从网络传输,这需要依赖其他诸如TaskManager的任务槽数量等等configuration值。
上游Operation发送element的下游Operation子集同时依赖于上游和下游两方Operation的并行度。例如,若上游Operation的并行度为2,下游Operation并行度为4,则1个上游Operation将会把它的element分发给2个下游Operation。另一方面,若下游并行度为2而上游并行度为4,则2个上游Operation将会把它们的element分发给1个下游Operation,而另外两个上游Operation则分发给另一个下游Operation。
当一个或是多个上下游Operation的并行度不是倍数关系时,下游的Operation将拥有不同的从上游获得的输入的数量。
下图是上面例子的连接模式图:

dataStream.rescale();


Broadcasting

DataStream ->
DataStream


将element广播到每一个分割中去

dataStream.broadcast();

链接任务以及资源组(Task
chaining & resource groups)

将两个transformation链接起来意味着将它们部署在一起(co-locating),共享同一个线程来获得更好的性能。Flink默认地尽可能地链接Operator(如两个连续的map transformation)。如有需要,API还给出了细粒度的链接控制:

使用StreamExecutionEnvironment.disableOperatorChaining()来关闭整个Job的链接操作。下面表格中的方法则是更加细粒度的控制函数,注意,由于这些函数引用的是前一个transformation,所以它们仅仅在一个DataStream的transformation后使用才是正确的,例如someStream.map(
… ).startNewChain()是正确的,而someStream.startNewChain()是错误的。

一个资源组就是Flink中的一个任务槽,如有需要,你可以人工孤立某个Operator到一个独立的任务槽中。


Transformation


描述


startNewChain()


以当前Operator起点,开始一个新的链接。在下例中,两个mapper将会被链接而filter则不会与第一个mapper链接

someStream.filter(...).map(...).startNewChain().map(...);


disableChaining()


下例中,将不会链接mapOperator。

someStream.map(...).disableChaining();


slotSharingGroup()


设置一个Operation的共享任务槽的分组。Flink将会把同一个任务槽共享组的Operation放到同一个任务槽中,而不在同一个任务槽共享组的Operation放到其他任务槽中。这可以用来孤立任务槽。如果所有的输入Operation都在同一个任务槽共享组中,则该任务槽共享组会继承下来。任务槽共享组的默认名为"default",Operation可以通过调用slotSharingGroup("default")来定义其名称。

someStream.filter(...).slotSharingGroup("name");

三、数据源

数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)来创建数据源。你可以使用Flink提供的source方法,也可以通过实现SourceFunction来编写自定义的非并行数据源,也可以通过实现ParallelSourceFunction接口或继承RichParallelSourceFunction来编写自定义并行数据源。

以下是几个预定义的数据流源,可以通过StreamExecutionEnvironment来访问:

1.    基于文件的:

·       
readTextFile(path) / TextInputFormat
- 以行读取方式读文件并返回字符串

·       
readFile(path) / 任意输入格式 - 按用输入格式的描述读取文件

·       
readFileStream - 创建一个stream,在文件有改动时追加element

2.    基于Socket的:

·       
socketTextStream - 从socket读取,element可以通过分割符来分开

3.    基于Collection的:

·       
fromCollection(Collection) - 从Java.util.Collection创建一个数据流。collection中所有的element都必须是同一类型的。

·       
fromCollection(Iterator, Class) - 从一个迭代器中创建一个数据流。class参数明确了迭代器返回的element的类型。

·       
fromElement(T …) - 从一个给定的对象序列创建一个数据流。所有对象都必须是同一类型的。

·       
fromParallelCollection(SplittableIterator,
Class)

- 从一个迭代器中创建一个并行数据流。class参数明确了迭代器返回的element的类型。

·       
generateSequence(from, to) - 从一个给定区间中生成一个并行数字序列。

4.    自定义:

·       
addSource - 附上一个新的source方法。例如,通过调用addSource(new
FlinkKafkaConsumer08<>(…))来从Apache Kafka读取数据,更多信息见于connector

四、Data
Sink

Data Sink消耗DataStream并将它们转发到文件、socket、外部系统或打印它们。Flink自带了许多内置的输出格式,封装为DataStream的operation中:

·       
writeAsText() / TextOutputFormat
- 以行字符串的方式写文件,字符串通过调用每个element的toString()方法获得。

·       
writeAsCsv(…) / CsvOutputFormat
- 以逗号分隔的值来讲Tuple写入文件,行和域的分隔符是可以配置的。每个域的值是通过调用object的toString()方法获得的。

·       
print() / printToErr()
- 将每个element的toString()值打印在标准输出 / 标准错误流中。可以提供一个前缀(msg)作为输出的前缀,使得在不同print的调用可以互相区分。如果并行度大于1,输出也会以task的标识符(identifier)为产生的输出的前缀。

·       
writeUsingOutputFormat() / FileOutputFormat
- 自定义文件输出所用的方法和基类,支持自定义object到byte的转换。

·       
writeToSocket - 依据SerializationSchema将element写到socket中。

·       
addSink - 调用自定义sink方法,Flink自带连接到其他系统的connector(如Apache
Kafka),这些connector都以sink方法的形式实现。

注意DataStream的write*()函数主要用于debug,它们不参与Flink的检查点,这意味着这些方法通常处于“至少一次(at-least-once)“的执行语义下。flush到目标系统的数据依赖于OutputFormat的实现,这意味着不是所有发送到OutputFormat的element都会立即出现在目标系统中,此外,在失效的情况下,这些数据很可能会丢失。

故为了可靠性以及将stream“恰好一次(exact once)”地传入文件系统,我们应当使用flink-connector-filesystem。此外,通过实现“.addSink(…)”的自定义内容会参加Flink的检查点机制,故会保证“恰好一次”的执行语义。

五、迭代(Iterations)

迭代流程序实现了一个阶段方法并将之嵌入到一个IterativeStream中。作为一个可能永远不会结束的程序,它没有最大迭代数,反之,你需要使用splitfilter的transformation来明确流的哪一部分会被反馈到迭代中,哪一部分则继续转发到下游。这里,我们使用filter作为例子,我们定义IterativeStream

IterativeStream<Integer>
iteration = input.iterate();

然后,我们定义在循环中将要进行的逻辑处理,我们通过一系列transformation来实现(这里用了一个简单的map
transformation):

DataStream<Integer>
iterationBody = iteration.map(/*
this is executed many times */
);

我们可以调用IterativeStreamcloseWith(feedbackStream)函数来关闭一个迭代并定义迭代尾。传递给closeWith方法的DataStream将会反馈回迭代头。分割出用来反馈的stream的部分和向前传播的stream部分通常的方法便是使用filter来进行分割。这些filter可以定义诸如"termination"逻辑,即element将会传播到下游,而不是被反馈回去。

iteration.closeWith(iterationBody.filter(/*
one part of the stream */
));
DataStream<Integer> output = iterationBody.filter(/*
some other part of the stream */
);

默认地,反馈的那部分流将会自动设置为迭代头的输入,要想重载该行为,用户需要设置closeWith函数中的一个boolean参数。例如,下面是一个持续将整数序列中的数字减1知道它们变为0的程序:

DataStream<Long>
someIntegers = env.generateSequence(0,
1000);

IterativeStream<Long>
iteration = someIntegers.iterate();

DataStream<Long>
minusOne = iteration.map(new
MapFunction<Long, Long>() {
  @Override
  public Long map(Long
value) throws Exception {
    return value - 1
;
  }
});

DataStream<Long>
stillGreaterThanZero = minusOne.filter(new
FilterFunction<Long>() {
  @Override
  public boolean
filter(Long
value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long>
lessThanZero = minusOne.filter(new
FilterFunction<Long>() {
  @Override
  public boolean
filter(Long
value) throws Exception {
    return (value <= 0);
  }
});

六、执行参数

StreamExecutionEnvironment包含ExecutionConfig,它可以使用户设置job的确切运行时配置值。

请参考execution
configuration
来查看参数的解释。特别的,以下这些参数仅适用于DataStream
API:

enableTimestamps()
/ disableTimestamps():在每一个source发出的事件上附加上一个时间戳。函数areTimestampsEnabled()可以返回该状态的当前值。

setAutoWatermarkInterval(long
milliseconds):设置自动水印发布(watermark
emission)区间。你可以通过调用函数getAutoWatermarkInterval()来获取当前值。

6.1 容错

文档Fault Tolerance
Documentation
描述了打开并配置Flink的检查点机制的选项和参数

6.2 控制执行时间

默认的,element在网络传输时不是一个个单独传输的(这会导致不必要的网络流量),而是缓存后传输。缓存(是在设备间传输的实际单位)的大小可以在Flink的配置文件中设置。尽管该方法有益于优化吞吐量,他会在stream到达不够快时导致执行时间方面的问题。为了控制吞吐量和执行时间,你可以在执行环境(或独立的Operator)中调用env.setBufferTimeout(timeoutMillis)来设置等待装满buffer的最大等待时间,在这个时间过后,不管buffer是否已满,它都会自动发出。该默认超时时间是100ms。下例是设置API的用法:

LocalStreamEnvironment
env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.genereateSequence(1,10).map(new
MyMapper()).setBufferTimeout(timeoutMillis);

要最大化吞吐量,设置setBufferTimeout(-1)来去除超时时间,则buffer仅在它满后才会被flush。要最小化执行时间,设置timeout为一个接近0的数字(如5ms或10ms)。应当避免设置Timeout为0,因为它会造成严重的性能下降。

七、Debugging

在分布式集群上运行流程序之前,确保算法正确执行很重要。因此,实现数据分析程序通常需要递增的检查结果、debug、优化的过程。

Flink提供了可以显著简化数据分析程序的开发过程的特性,即可以在IDE中本地进行debug、注入测试数据、以及结果数据的收集等。本节对如何简化Flink程序开发提出几点建议。

7.1 本地执行环境

LocalStreamEnvironment在创建它的同一个JVM进程下创建Flink系统。如果你从IDE中启动一个LocalEnvironment,你可以在代码中设置断点来简单地debug你的程序。下例为LocalEnvironment是如何创建并使用的:

final
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String>
lines = env.addSource(/*
some source */
);
// build your program

env.execute();

7.2 Collection数据源

Flink提供基于Java
collection的特殊数据源来方便测试。一旦程序测试之后,source和sink可以简单地替代为对外部系统的读取/写出的source和sink。Collection数据源使用方法如下:

// Create a DataStream from a list of
elements

DataStream<Integer> myInts = env.fromElements(1,
2, 3,
4, 5);

//
Create a DataStream from any Java collection

List<Tuple2<String, Integer>> data =
...

DataStream<Tuple2<String, Integer>>
myTuples = env.fromCollection(data);

//
Create a DataStream from an Iterator

Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt,
Long.class);

注意:当前Collection数据源需要实现Serializable接口的数据类型和迭代器。此外,Collection数据源无法并行执行(并行度=1)

7.3 迭代器Data Sink

Flink同样提供了一个收集测试和debug的DataStream结果的sink,它的使用方式如下:

import org.apache.flink.contrib.streaming.DataStreamUtils

DataStream<Tuple2<String,
Integer>> myResult = ...
Iterator<Tuple2<String, Integer>>
myOutput = DataStreamUtils.collect(myResult)

时间: 2024-10-13 05:47:06

Flink Program Guide (2) -- DataStream API编程指导 -- For Java的相关文章

Flink Program Guide (10) -- Savepoints (DataStream API编程指导 -- For Java)

Savepoint 本文翻译自文档Streaming Guide / Savepoints ------------------------------------------------------------- 使用DataStream API编写的程序可以从一个savepoint处恢复执行.savepoint可以同时更新你的程序和Flink集群而不丢失任何状态.该文档包括了从触发.存储以及销毁(dispose)savepoint的所有内容.有关Flink如何处理状态和失效的详细内容,请见文

Flink Program Guide (3) -- Event Time (DataStream API编程指导 -- For Java)

Event Time 本文翻译自DataStream API Docs v1.2的Event Time ------------------------------------------------------- 一.事件时间 / 处理时间 / 提取时间 Flink支持流程序不同的time概念. ·        Processing time:处理时间指执行对应Operation的设备的系统时间. 当一个流程序以处理时间运行,所有基于时间的operation(如time窗口)将使用运行对应O

Flink Program Guide (6) -- 窗口 (DataStream API编程指导 -- For Java)

窗口(Window) 本文翻译自文档Windows ----------------------------------- Flink使用窗口的概念,根据element的时间戳或者其他指标,将可能无限的DataStream分割为有限的数据切片(slice).我们在处理无限数据流以及进行聚合element的transformation时需要此种窗口分割. 注意:我们在此文档中讨论的大多是keyed windowing,即window是应用在KeyedStream上的.关键字下的窗口具有一定的优势,

Flink Program Guide (5) -- 预定义的Timestamp Extractor / Watermark Emitter (DataStream API编程指导 -- For Java)

本文翻译自Pre-defined Timestamp Extractors / Watermark Emitter ------------------------------------------------------------------------------------------ 正如timestamps and watermark handling中所述,Flink提供了抽象类来让开发者赋值自己的时间戳并发送他们自己的Watermark.更具体来说,开发者需要依照不同用例情况来

Flink Program Guide (7) -- 容错 Fault Tolerance(DataStream API编程指导 -- For Java)

false false false false EN-US ZH-CN X-NONE /* Style Definitions */ table.MsoNormalTable {mso-style-name:普通表格; mso-tstyle-rowband-size:0; mso-tstyle-colband-size:0; mso-style-noshow:yes; mso-style-priority:99; mso-style-parent:""; mso-padding-alt

Flink Program Guide (9) -- StateBackend : Fault Tolerance(Basic API Concepts -- For Java)

State Backends 本文翻译自文档Streaming Guide / Fault Tolerance / StateBackend ----------------------------------------------------------------------------------------- 使用Data Stream API编写的程序通常以多种形式维护状态: ·  窗口将收集element或在它被触发后聚合element ·  Transformation方法可能会

Flink(五) —— DataStream API

package flink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ case class SensorReading(id: String, timestamp: Long, temperature: Double) object SourceTest { def main(args: Array[String]): Uni

Flink DataStream API Programming Guide

Example Program The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows.   public class WindowWordCount { public static void main(String[] arg

Flink入门(五)——DataSet Api编程指南

Apache Flink Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾批处理,流处理的特性,Flink可能正在颠覆整个大数据的生态. DataSet API 首先要想运行Flink,我们需要下载并解压Flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html 我们可以选择Flink与Scala结合版本,这里我们选择最新的1.9版本Apache