Flink 流处理API之一

1、 Environment

1.1 getExecutionEnvironment

  • 创建一个执行环境,表示当前执行程序的上下文。
  • 如果程序是独立调用的,则此方法返回本地执行环境
  • 如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境
  • 也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

批处理环境

val env = ExecutionEnvironment.getExecutionEnvironment

流式数据处理环境

val env = StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1

1.2 createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

1.3 createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。

需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")

2、Source

2.1 从集合中读取数据

def main(args: Array[String]): Unit = {
    val env1: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataList = List(
        ("1", 1231231L, 200),
        ("2", 1231231L, 201),
        ("3", 1231231L, 202)
    ).map{
        case (id, ts, vc) => {
            WaterSensor( id, ts, vc )
        }
    }
    val dataDS: DataStream[WaterSensor] = env1.fromCollection(dataList)
    dataDS.print()
    env1.execute()
}

case class WaterSensor(id:String, ts:Long, vc:Double)

    def main(args: Array[String]): Unit = {

        val env: StreamExecutionEnvironment =
 StreamExecutionEnvironment.getExecutionEnvironment

        val sensorDS: DataStream[WaterSensor] = env.fromCollection(
            List(
                WaterSensor("ws_001", 1577844001, 45.0),
                WaterSensor("ws_002", 1577844015, 43.0),
                WaterSensor("ws_003", 1577844020, 42.0)
            )
        )

        sensorDS.print()

        env.execute("sensor")
    }

2.2 从文件读取数据

// TODO 从文件中获取数据源
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;

// 相对路径
//val fileDS: DataStream[String] = env.readTextFile("input/word.txt")
// Flink默认无法识别hdfs协议,需要引入相关jar包
val fileDS: DataStream[String] = env.readTextFile("hdfs://linux1:9000/directory/app-20191213160742-0000")
fileDS.print("file>>>>")
env.execute()

2.3 从Kafka中读取数据

引入kafka连接器的依赖

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

// TODO 从文件中获取数据源
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;

// 使用kafka作为数据源
val properties = new java.util.Properties()
properties.setProperty("bootstrap.servers", "linux1:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val kafkaDS = env.addSource( new FlinkKafkaConsumer011[String]("waterSensor", new SimpleStringSchema(), properties) )
kafkaDS.print("kafka>>>>")
env.execute()

2.4 自定义source

def main(args: Array[String]): Unit = {

    // TODO 从文件中获取数据源
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;

    env.addSource( new MySource() ).print("mine>>>>")

    env.execute()

}
// 自定义数据源
// 1. 继承SourceFunction
// 2. 重写方法
class MySource extends SourceFunction[WaterSensor]{

    private var flg = true

    // 运行数据采集逻辑
     override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = {
         while ( flg ) {
             // 将数据由数据源环境进行采集
             ctx.collect(WaterSensor( "1", 1L, 1 ))
             Thread.sleep(200)
         }
     }

    // 取消数据采集
    override def cancel(): Unit = {
        flg = false
    }
}

3、Sink

  • Flink没有类似于spark中foreach方法,让用户进行迭代的操作。
  • 所有对外的输出操作都要利用Sink完成。
  • 最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx))

print方法其实就是一种Sink

public DataStreamSink<T> print() {
    PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
    return addSink(printFunction).name("Print to Std. Out");
}

官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

3.1 Kafka

增加依赖关系:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

主函数中添加sink:

//向kafka中写入数据
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
env.setParallelism(1)
val ds: DataStream[String] = env.readTextFile("input/word.txt")

ds.addSink( new FlinkKafkaProducer011[String]( "linux1:9092", "waterSensor", new SimpleStringSchema() ) )

env.execute()

通过kafka消费者控制台查看:

bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic sensor

3.2 Redis

增加依赖关系:

<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

定义一个redis的mapper类,用于定义保存到redis时调用的命令:

// TODO 向kafka中写入数据
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
env.setParallelism(1)

val ds: DataStream[String] = env.readTextFile("input/word.txt")

val conf = new FlinkJedisPoolConfig.Builder().setHost("linux4").setPort(6379).build()
ds.addSink( new RedisSink[String](conf, new RedisMapper[String] {
    override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET, "word")
    }

    override def getKeyFromData(t: String): String = {
        t.split(" ")(1)
    }

    override def getValueFromData(t: String): String = {
        t.split(" ")(0)
    }
}))

env.execute()

访问redis客户端查看数据:

HGETALL sensor

3.3 Elasticsearch

增加依赖关系:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

在主函数中调用:

// TODO 向kafka中写入数据
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
env.setParallelism(1)

val list = List(
    WaterSensor("sensor_1", 150000L, 25),
    WaterSensor("sensor_1", 150001L, 27),
    WaterSensor("sensor_1", 150005L, 30),
    WaterSensor("sensor_1", 150007L, 40)
)

val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list)

val httpHosts = new java.util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("linux1", 9200))
val esSinkBuilder = new ElasticsearchSink.Builder[WaterSensor]( httpHosts, new ElasticsearchSinkFunction[WaterSensor] {
    override def process(t: WaterSensor, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        println("saving data: " + t)
        val json = new java.util.HashMap[String, String]()
        json.put("data", t.toString)
        val indexRequest = Requests.indexRequest().index("water").`type`("readingData").source(json)
        requestIndexer.add(indexRequest)
        println("saved successfully")
    }
} )

waterSensorDS.addSink(esSinkBuilder.build())

env.execute()

在ES中查看:

  • 访问路径:http://linux1:9200/_cat/indices?v
  • 访问路径:http://linux1:9200/sensor/_search

3.4 JDBC

增加依赖关系:

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.44</version>
</dependency>

添加MyJdbcSink:

def main(args: Array[String]): Unit = {

    // TODO 向JDBC中写入数据
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
    env.setParallelism(1)

    val list = List(
        WaterSensor("sensor_1", 150000L, 25),
        WaterSensor("sensor_1", 150001L, 27),
        WaterSensor("sensor_1", 150005L, 30),
        WaterSensor("sensor_1", 150007L, 40)
    )

    val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list)

    waterSensorDS.addSink( new MyJDBCSink )

    env.execute()

}
// 自定义Sink
// 1. 继承 RichSinkFunction
// 2. 重写方法
class MyJDBCSink extends RichSinkFunction[WaterSensor] {

    private var conn : Connection = _
    private var pstat : PreparedStatement = _

    override def open(parameters: Configuration): Unit = {
        //Class.forName()
        conn = DriverManager.getConnection("jdbc:mysql://linux1:3306/rdd", "root", "000000")
        pstat = conn.prepareStatement("insert into user (id, name, age) values (?, ?, ?)")
    }
    override def invoke(ws: WaterSensor, context: SinkFunction.Context[_]): Unit = {
        pstat.setInt(1, 1)
        pstat.setString(2, ws.id)
        pstat.setInt(3, ws.vc)
        pstat.executeUpdate()
    }

    override def close(): Unit = {
        pstat.close()
        conn.close()
    }
}

3.5 HDFS

The BucketingSink has been deprecated since Flink 1.9 and will be removed in subsequent releases. Please use the StreamingFileSink instead.

3.5.1 BucketingSink:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-filesystem_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

val input: DataStream[String] = ...

input.addSink(new BucketingSink[String]("/base/path"))

By default the bucketing sink will split by the current system time when elements arrive and will use the datetime pattern "yyyy-MM-dd--HH" to name the buckets

There are two configuration options that specify when a part file should be closed and a new one started:

  • By setting a batch size (The default part file size is 384 MB)
  • By setting a batch roll over time interval (The default roll over interval is Long.MAX_VALUE)
// the SequenceFileWriter only works with Flink Tuples
import org.apache.flink.api.java.tuple.Tuple2
val input: DataStream[Tuple2[A, B]] = ... 

val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
sink.setWriter(new SequenceFileWriter[IntWritable, Text])
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink)

This will create a sink that writes to bucket files that follow this schema:

/base/path/{date-time}/part-{parallel-task}-{count}

3.5.2 StreamingFileSink

File Formats

The StreamingFileSink supports both row-wise and bulk encoding formats, such as Apache Parquet. These two variants come with their respective builders that can be created with the following static methods:

  • Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder)
  • Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)

Row-encoded Formats

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy

val input: DataStream[String] = ...

val sink: StreamingFileSink[String] = StreamingFileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
            .withMaxPartSize(1024 * 1024 * 1024)
            .build())
    .build()

input.addSink(sink)

Bulk-encoded Formats

Flink comes with three built-in BulkWriter factories:

  • ParquetWriterFactory
  • SequenceFileWriterFactory
  • CompressWriterFactory

Parquet format

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-parquet_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.avro.Schema

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...

val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
    .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
    .build()

input.addSink(sink)

Hadoop SequenceFile format

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sequence-file</artifactId>
  <version>1.10.0</version>
</dependency>

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.configuration.GlobalConfiguration
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.Text;

val input: DataStream[(LongWritable, Text)] = ...
val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
val sink: StreamingFileSink[(LongWritable, Text)] = StreamingFileSink
  .forBulkFormat(
    outputBasePath,
    new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
    .build()

input.addSink(sink)

原文地址:https://www.cnblogs.com/hyunbar/p/12632931.html

时间: 2024-10-07 21:39:43

Flink 流处理API之一的相关文章

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime

一.Flink流处理简介 Flink流处理的API叫做DataStream,可以在保证Exactly-Once的前提下提供高吞吐.低延时的实时流处理.用Flink作为流处理框架成功的案例可参考Flink母公司–Data Artisans官方blog中的2篇文章: How we selected Apache Flink as our Stream Processing Framework at the Otto Group Business Intelligence Department RBE

Flink流计算编程--在双流中体会joinedStream与coGroupedStream

一.joinedStream与coGroupedStream简介 在实际的流计算中,我们经常会遇到多个流进行join的情况,Flink提供了2个Transformations来实现. 如下图: 注意:Join(Cogroups) two data streams on a given key and a common window.这里很明确了,我们要在2个DataStream中指定连接的key以及window下来运算. 二.SQL比较 我们最熟悉的SQL语言中,如果想要实现2个表join,可以

Flink流计算随笔(1)

相比 Spark Stream.Kafka Stream.Storm 等,为什么阿里会选择 Flink 作为新一代流式计算引擎?前期经过了哪些调研和对比? 大沙:我们是 2015 年开始调研新一代流计算引擎的.我们当时的目标就是要设计一款低延迟.exactly once.流和批统一的,能够支撑足够大体量的复杂计算的引擎.Spark streaming 的本质还是一款基于 microbatch 计算的引擎.这种引擎一个天生的缺点就是每个 microbatch 的调度开销比较大,当我们要求越低的延迟

flink流计算随笔(3)

Stateful Computations over Data Streams(在数据流的有状态计算)Apache Flink是一个用于分布式流和批处理数据的开源平台.Flink的核心是一个流数据流引擎,它为数据流上的分布式计算提供数据分布.通信和容错能力.Flink在流引擎之上构建批处理,覆盖本地迭代支持.托管内存和程序优化.通常在程序中的转换和数据流中的操作符之间存在一对一的对应关系.然而,有时一个转换可能包含多个转换操作符. 在串流连接器和批处理连接器文档中记录了源和汇(Sources a

Flink流处理的动态实时亿级全端用户画像系统视频课程分享

基于Flink流处理的动态实时亿级全端用户画像系统课程下载: https://pan.baidu.com/s/1YtMs-XG5-PsTFV9_7-AlfA 提取码: 639m 项目中采用到的算法包含Logistic Regression.Kmeans.TF-IDF等,Flink暂时支持的算法比较少,对于以上算法,本课程将手把手带大家用Flink实现,并且结合真实场景,学完即用.本套教程的Flink算法部分属于国内课程首创. 系统包含所有终端的数据(移动端.pc端.小程序端,快应用等等),支持亿

jackson 流式API

http://www.cnblogs.com/lee0oo0/articles/2652528.html Jackson提供了三种可选的JSON处理方法 1.流式API     com.fasterxml.jackson.core.JsonParser读     com.fasterxml.jackson.core.JsonGenerator写 2.树模型:提供一个 JSON 文档可变内存树的表示形式     com.fasterxml.jackson.databind.ObjectMapper

Java8 流式 API(`java.util.stream`)

熟悉 ES6 的开发者,肯定对数组的一些方法不是很陌生:map.filter 等.在对一组对象进行统一操作时,利用这些方法写出来的代码比常规的迭代代码更加的简练.在 C? 中,有 LINQ 来实现.那么在 Java 中有这样的操作吗?答案是有的,Java8 中引入了大量新特性,其中一个就是 Java 的流式 API. 在 Java 8 中,流(Stream)与迭代器类似,都是用来对集合内的元素进行某些操作.它们之间最大的差别,是对迭代器的每个操作都会即时生效,而对流的操作则不是这样.流的操作有两

Flink流处理之窗口算子分析

窗口算子WindowOperator是窗口机制的底层实现,它几乎会牵扯到所有窗口相关的知识点,因此相对复杂.本文将以由面及点的方式来分析WindowOperator的实现.首先,我们来看一下对于最常见的时间窗口(包含处理时间和事件时间)其执行示意图: 上图中,左侧从左往右为事件流的方向.方框代表事件,事件流中夹杂着的竖直虚线代表水印,Flink通过水印分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWaterm