Kafka与Flink集成

Apache Flink是新一代的分布式流式数据处理框架,它统一的处理引擎既可以处理批数据(batch data)也可以处理流式数据(streaming data)。在实际场景中,Flink利用Apache Kafka作为上下游的输入输出十分常见,本文将给出一个可运行的实际例子来集成两者。

1. 目标

本例模拟中将集成Kafka与Flink:Flink实时从Kafka中获取消息,每隔10秒去统计机器当前可用的内存数并将结果写入到本地文件中。

2. 环境准备

  • Apache Kafka 0.11.0.0
  • Apache Flink 1.3.1
  • Gradle 3.5 (版本号不是强要求)

本例运行在Windows环境,但可以很容易地移植到其他平台上。

3. 创建Flink Streaming工程

本例使用Intellij IDEA作为项目开发的IDE。首先创建Gradle project,group为‘huxihx.flink.demo‘,artifact id为‘flink-kafka-demo’,version为‘1.0-SNAPSHOT’。整个项目结构如图所示:

4. 增加kafka和kafka-connector依赖

增加下列gradle依赖:

    compile group: ‘org.apache.flink‘, name: ‘flink-connector-kafka-0.10_2.11‘, version: ‘1.3.1‘
    compile group: ‘org.apache.flink‘, name: ‘flink-streaming-java_2.11‘, version: ‘1.3.1‘
    compile group: ‘org.apache.kafka‘, name: ‘kafka-clients‘, version: ‘0.11.0.0‘

设置gradle打包依赖

jar {
    manifest {
        attributes(
                "Manifest-Version": 1.0,
                "Main-Class": "huxihx.KafkaMessageStreaming")
    }
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    into(‘assets‘) {
        from ‘assets‘
    }
}

5. 启动Flink环境(本例使用local测试环境)

F:\SourceCode\flink-1.3.1
> bin\start-local.bat
Starting Flink job manager. Webinterface by default on http://localhost:8081/.
Don‘t close this batch window. Stop job manager by pressing Ctrl+C.

6. 启动Kafka单节点集群

启动Zookeeper:

cd F:\SourceCode\zookeeper
> bin\zkServer.cmd

启动Kafka broker:

> cd F:\SourceCode\kafka_1
> set JMX_PORT=9999
> bin\windows\kafka-server-start.bat F:\\SourceCode\\configs\\server.properties

7. 代码开发

代码主要由两部分组成:

  • MessageSplitter类、MessageWaterEmitter类和KafkaMessageStreaming类:Flink streaming实时处理Kafka消息类
  • KafkaProducerTest类和MemoryUsageExtrator类:构建Kafka测试消息

本例中,Kafka消息格式固定为:时间戳,主机名,当前可用内存数。其中主机名固定设置为machine-1,而时间戳和当前可用内存数都是动态获取。由于本例只会启动一个Kafka producer来模拟单台机器发来的消息,因此在最终的统计结果中只会统计machine-1这一台机器的内存。下面我们先来看flink部分的代码实现。

MessageSplitter类(将获取到的每条Kafka消息根据“,”分割取出其中的主机名和内存数信息)

public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
        if (value != null && value.contains(",")) {
            String[] parts = value.split(",");
            out.collect(new Tuple2<>(parts[1], Long.parseLong(parts[2])));
        }
    }
}

MessageWaterEmitter类(根据Kafka消息确定Flink的水位)

public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> {
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
        if (lastElement != null && lastElement.contains(",")) {
            String[] parts = lastElement.split(",");
            return new Watermark(Long.parseLong(parts[0]));
        }
        return null;
    }

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        if (element != null && element.contains(",")) {
            String[] parts = element.split(",");
            return Long.parseLong(parts[0]);
        }
        return 0L;
    }
}

KafkaMessageStreaming类(Flink入口类,封装了对于Kafka消息的处理逻辑。本例每10秒统计一次结果并写入到本地文件)

public class KafkaMessageStreaming {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 非常关键,一定要设置启动检查点!!
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "flink-group");

        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), props);
        consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());

        DataStream<Tuple2<String, Long>> keyedStream = env
                .addSource(consumer)
                .flatMap(new MessageSplitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))

                .apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {
                        long sum = 0L;
                        int count = 0;
                        for (Tuple2<String, Long> record: input) {
                            sum += record.f1;
                            count++;
                        }
                        Tuple2<String, Long> result = input.iterator().next();
                        result.f1 = sum / count;
                        out.collect(result);
                    }
                });

        keyedStream.writeAsText(args[1]);
        env.execute("Flink-Kafka demo");
    }
}

实现了这些代码之后我们已然可以打包进行部署了,不过在其之前我们先看下Kafka producer测试类的实现——该类每1秒发送一条符合上面格式的Kafka消息供下游Flink集群消费。

MemoryUsageExtrator类(很简单的工具类,提取当前可用内存字节数)

public class MemoryUsageExtrator {

    private static OperatingSystemMXBean mxBean =
            (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();

    /**
     * Get current free memory size in bytes
     * @return  free RAM size
     */
    public static long currentFreeMemorySizeInBytes() {
        return mxBean.getFreePhysicalMemorySize();
    }
}

KafkaProducerTest类(发送Kafka消息)

public class KafkaProducerTest {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        int totalMessageCount = 10000;
        for (int i = 0; i < totalMessageCount; i++) {
            String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize());
            producer.send(new ProducerRecord<>("test", value), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.out.println("Failed to send message with exception " + exception);
                    }
                }
            });
            Thread.sleep(1000L);
        }
        producer.close();
    }

    private static long currentMemSize() {
        return MemoryUsageExtrator.currentFreeMemorySizeInBytes();
    }
}

8. 部署Flink jar包

8.1 打包Flink jar包

> cd flink-kafka-demo
> gradle clean build

生成的jar包在项目目录下的build/libs/下,本例中是flink-kafka-demo-1.0-SNAPSHOT.jar

8.2 部署jar包

> bin\flink.bat run -c huxihx.KafkaMessageStreaming  F:\\Projects\\flink-kafka-demo\\build\\libs\\flink-kafka-demo-1.0-SNAPSHOT.jar test F:\\temp\result.txt  

KafkaMessageStreaming类接收两个命令行参数,第一个是Kafka topic名字,第二个是输出文件路径

部署成功之后,可以在Flink控制台(本例中是http://localhost:8081/)中看到job已成功部署,如下图所示:

8. 运行KafkaProducerTest

运行Kafka producer,给Flink job创建输入数据,然后启动一个终端,监控输出文件的变化,

> cd F:\temp
> tail -f result.txt
(machine-1,3942129078)
(machine-1,3934864179)
(machine-1,4044071321)
(machine-1,4091437056)
(machine-1,3925701836)
(machine-1,3753678438)
(machine-1,3746314649)
......

可以看到,Flink每隔10s就会保存一条新的统计记录到result.txt文件中,该记录会统计主机名为machine-1的机器在过去10s的平均可用内存字节数。

9. 总结

本文给出了一个可运行的Flink + Kafka的项目配置及代码实现。值得注意的是,上面例子中用到的Flink Kafka connector使用了Kafka新版本consumer的API,因此不再需要连接Zookeeper信息。

时间: 2024-12-30 03:27:49

Kafka与Flink集成的相关文章

spring+springmvc+kafka分布式消息中间件集成方案

Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案: kafka消息平台使用spring+kafka的集成方案,详情如下: 1. 使用最高版本2.1.0.RELEASE集成jar包:spring-integration-kafka 2. Zookeeper.Kafka分布式集群使用init.properties配置化方案. Java代码 kafka.servers=127.0.0.1:9092 kafka.topic=xxxooo [j

Flink KAFKA

https://data-artisans.com/blog/kafka-flink-a-practical-how-to https://github.com/dataArtisans/kafka-example http://training.data-artisans.com/exercises/toFromKafka.html Kafka与Flink集成 原文地址:https://www.cnblogs.com/WCFGROUP/p/9074852.html

Flink读写Kafka

Flink 读写Kafka 在Flink中,我们分别用Source Connectors代表连接数据源的连接器,用Sink Connector代表连接数据输出的连接器.下面我们介绍一下Flink中用于读写kafka的source & sink connector. Apache Kafka Source Connectors Apache Kafka 是一个分布式的流平台,其核心是一个分布式的发布-订阅消息系统,被广泛用于消费与分发事件流. Kafka将事件流组织成为topics.一个topic

整合 KAFKA+Flink 实例(第一部分,趟坑记录)

2017年后,一大波网络喧嚣,说流式处理如何牛叉,如何高大上,抱歉,工作满负荷,没空玩那个: 今年疫情隔离在家,无聊,开始学习 KAFKA+Flink ,目前的打算是用爬虫抓取网页数据,传递到Kafka中,再用Flink计算. 个人性格原因,我不愿意过分沉迷于纸质或者电子教程材料,也不是特别喜欢网上某些培训机构已经过时了的所谓培训视频, 喜欢动手直接写代码,所以简单翻看一点PDF教程,看了两集“培训视频”,也没说Kafka.flink两组件咋结合使用,不耐烦,直接开码(码农的糙性): 之前我写过

新一代大数据处理引擎 Apache Flink

https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/index.html 大数据计算引擎的发展 这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有 Hadoop.Storm,以及后来的 Spark,他们都有着各自专注的应用场景.Spark 掀开了内存计算的先河,也以内存为赌注,赢得了内存计算的飞速发展.Spark 的火热或多或少的掩盖了其他分布式计算的系统身影.就像 Flink,也就在这个时候默默的发

使用ApacheFlink和Kafka进行大数据处理

Flink是一个开源流处理框架,注意它是一个处理计算框架,类似Spark框架,Flink在数据摄取方面非常准确,在保持状态的同时能轻松地从故障中恢复. Flink内置引擎是一个分布式流数据流引擎,支持 流处理和批处理 ,支持和使用现有存储和部署基础架构的能力,它支持多个特定于域的库,如用于机器学习的FLinkML.用于图形分析的Gelly.用于复杂事件处理的SQL和FlinkCEP.Flink的另一个有趣的方面是现有的大数据作业(Hadoop M / R,Cascading,Storm)可以 通

flink与hbase交互

1. HBase连接的方式概况 主要分为: 纯Java API读写HBase的方式: Spark读写HBase的方式: Flink读写HBase的方式: HBase通过Phoenix读写的方式: 第一种方式是HBase自身提供的比较原始的高效操作方式,而第二.第三则分别是Spark.Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark.Flink中调用. 注意: 这里我们使用HBase2.1.2版本,flink

Idea下Kafka源码阅读编译环境搭建

Kafka源码编译阅读环境搭建 开发环境: Oracle Java 1.7.0_25 + Idea + Scala 2.10.5 +Gradle 2.1 + Kafka 0.9.0.1 一.Gradle安装配置 Kafka代码自0.8.x之后就使用Gradle来进行编译和构建了,因此首先需要安装Gradle.Gradle集成并吸收了Maven主要优点的同时还克服了Maven自身的一些局限性--你可以访问https://www.gradle.org/downloads/ 下载最新的Gradle版本

kafka教程

一.理论介绍(一)相关资料1.官方资料,非常详细:?? http://kafka.apache.org/documentation.html#quickstart2.有一篇翻译版,基本一致,有些细节不同,建议入门时先读此文,再读官方文档.若自认英语很强,请忽视:?? http://www.linuxidc.com/Linux/2014-07/104470.htm3.还有一文也可以:http://www.sxt.cn/info-2871-u-324.html其主要内容来源于以下三篇文章:日志:每个