Kafka Streams入门指南

应该会有不少人觉得我标题打错了,是不是想讲SparkStreaming或者Kafka+SparkStreaming啊?实际上这不是笔误,Kafka Streams是Kafka 0.10提供的新能力,用于实时处理Kafka中的数据流,和现有的流处理技术如SparkStreaming,Storm,Flink还是有些区别的。

1 概况

Kafka Streams是一套处理分析Kafka中存储数据的客户端类库,处理完的数据或者写回Kafka,或者发送给外部系统。它构建在一些重要的流处理概念之上:区分事件时间和处理时间、开窗的支持、简单有效的状态管理等。Kafka Streams入门的门槛很低:很容易编写单机的示例程序,然后通过在多台机器上运行多个实例即可水平扩展从而达到高吞吐量。Kafka Streams利用Kafka的并发模型以实现透明的负载均衡。

一些亮点:

  • 设计成简单和轻量级的客户端类库,可以和现有Java应用、部署工具轻松整合。
  • 除了Kafka自身外不依赖其他外部系统。利用Kafka的分区模型来实现水平扩展并保证有序处理。
  • 支持容错的本地状态,这使得快速高效处理一些有状态的操作(如连接和开窗聚合)成为可能。
  • 支持一次一条记录的处理方式以实现低延迟,也支持基于事件时间的开窗操作。
  • 提供了两套流处理原语:高层的流DSL和低层的处理器API。

2 开发

2.1 核心概念

流处理拓扑

  • “流”是Kafka Streams最重要的抽象,代表了一个无边界的、持续更新的数据集。流是一种有序的、可回放的、容错的、不可变的数据记录序列,“数据记录”指一个键值对。
  • 一个流处理应用程序通过一或多个“处理器拓扑”来定义其计算逻辑,一个处理器拓扑就是一张以流处理器(节点)和流(边)构成的图。(实际为DAG,太熟悉了吧)
  • “流处理器”是处理器拓扑中的节点,表示一个转换流中数据的处理步骤,它从上游处理器一次接受一条输入记录,操作记录,然后输出一或多条输出记录到下游处理器。

Kafka Streams提供两种定义流处理拓扑的方式(上面已提到):流DSL提供最常用的数据变换操作如map和filter;低层的处理器API允许随意连接自定义处理器并与“状态仓库”交互。

时间

时间的概念在流处理中很关键,比如开窗这种操作就是根据时间边界来定义的。

上面也提到过两个常见概念:

  • 事件时间:事件或数据记录发生的时刻。
  • 处理时间:事件或数据记录被流处理应用开始处理的时刻,比如记录开始被消费。处理时间可能比事件时间晚几毫秒到几天不等。

Kafka Streams通过TimestampExtractor接口给每个数据记录赋一个时间戳。可以根据不同的需要来确定时间戳的实现,如使用数据记录的内置时间戳来实现事件时间的语义,或者打上处理器开始消费的时间来实现处理时间的语义。开发者可以根据业务需求来选择一种。

状态

一些流处理应用不需要管理状态,这意味着一条消息和另一条消息是独立的。(如Storm,不过需要区分Acker中的“状态”,那个是用来确保单条消息exactly once语义的而不是消息间的)如果管理状态的话可以提供很多比较复杂的流处理应用:如在流中连接、分组或聚合数据记录等。大量有状态的操作方法在流DSL中提供。

Kafka Streams提供了一种“状态仓库”,可被流处理应用用来存储和查询状态数据。这对实现有状态的操作很关键。每个Kafka Streams使用一或多个状态仓库,可通过API来存取数据。这种状态仓库可以是持久化的键值对、内存中的hashmap、或其他各类数据结构。Kafka Streams对本地状态仓库提供了容错和自动恢复。

2.2 低层处理器API

处理器

可通过实现Processor接口来自定义处理逻辑,该接口有两个主要方法,process方法会被作用于每条收到的记录,punctuate方法基于时间的流逝周期性地执行。另外,处理器可使用init方法中创建的ProcessorContext实例来维护当前上下文,并使用上下文来调用周期性任务(context().schedule),或将修改的、新的键值对推送给下游处理器(context().forward),或提交当前的处理进度(context().commit),等等。

<span style="font-size:12px;">    public class MyProcessor extends Processor {
        private ProcessorContext context;
        private KeyValueStore kvStore;

        @Override
        @SuppressWarnings("unchecked")
        public void init(ProcessorContext context) {
            this.context = context;
            this.context.schedule(1000);
            this.kvStore = (KeyValueStore) context.getStateStore("Counts");
        }

        @Override
        public void process(String dummy, String line) {
            String[] words = line.toLowerCase().split(" ");

            for (String word : words) {
                Integer oldValue = this.kvStore.get(word);

                if (oldValue == null) {
                    this.kvStore.put(word, 1);
                } else {
                    this.kvStore.put(word, oldValue + 1);
                }
            }
        }

        @Override
        public void punctuate(long timestamp) {
            KeyValueIterator iter = this.kvStore.all();

            while (iter.hasNext()) {
                KeyValue entry = iter.next();
                context.forward(entry.key, entry.value.toString());
            }

            iter.close();
            context.commit();
        }

        @Override
        public void close() {
            this.kvStore.close();
        }
    };</span>

处理器拓扑

有了在处理器API中自定义的处理器,然后就可以使用TopologyBuilder来将处理器连接到一起从而构建处理器拓扑:

<span style="font-size:12px;">    TopologyBuilder builder = new TopologyBuilder();

    builder.addSource("SOURCE", "src-topic")

        .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
        .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

        .addSink("SINK1", "sink-topic1", "PROCESS1")
        .addSink("SINK2", "sink-topic2", "PROCESS2")
        .addSink("SINK3", "sink-topic3", "PROCESS3");</span>

以上代码中包含了这些步骤:

  • 首先一个名为“SOURCE”的源节点被加入拓扑,使用addSource方法,且实用“src-topic”这一Kafka topic来提供数据。
  • 随后使用addProcessor方法加入三个处理器节点。这里第一个处理器是“SOURCE”节点的子节点,且是后两个节点的父节点。
  • 最后使用addSink方法加入三个sink节点,每个都从一个父处理器节点中获取数据病写到一个topic中。

本地状态仓库

处理器API不仅可以处理当前到达的记录,也可以管理本地状态仓库以使得已到达的记录都可用于有状态的处理操作中(如聚合或开窗连接)。为利用本地状态仓库的优势,可使用TopologyBuilder.addStateStore方法以便在创建处理器拓扑时创建一个相应的本地状态仓库;或将一个已创建的本地状态仓库与现有处理器节点连接,通过TopologyBuilder.connectProcessorAndStateStores方法。

    TopologyBuilder builder = new TopologyBuilder();

    builder.addSource("SOURCE", "src-topic")

        .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
        // create the in-memory state store "COUNTS" associated with processor "PROCESS1"
        .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
        .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

        // connect the state store "COUNTS" with processor "PROCESS2"
        .connectProcessorAndStateStores("PROCESS2", "COUNTS");

        .addSink("SINK1", "sink-topic1", "PROCESS1")
        .addSink("SINK2", "sink-topic2", "PROCESS2")
        .addSink("SINK3", "sink-topic3", "PROCESS3");

2.3 高层流DSL

为使用流DSL来创建处理器拓扑,可使用KStreamBuilder类,其扩展自TopologyBuilder类。Kafka的源代码中在streams/examples包中提供了一个示例。

从Kafka创建源端流

Kafka Streams为高层流定义了两种基本抽象:记录流(定义为KStream)可从一或多个Kafka topic源来创建,更新日志流(定义为KTable)可从一个Kafka topic源来创建。两者的区别是,前者更像是传统意义上的流,每一个键值对可以看成独立的,而后者更接近Map的概念,同一个key输入两次的话,后者会将前者覆盖。

    KStreamBuilder builder = new KStreamBuilder();

    KStream source1 = builder.stream("topic1", "topic2");
    KTable source2 = builder.table("topic3");

转换一个流

KStream和KTable相应地都提供了一系列转换操作。每个操作可产生一或多个KStream和KTable对象,可被翻译成一或多个相连的处理器。所有这些转换方法连接在一起形成一个复杂的处理器拓扑。因为KStream和KTable是强类型的,这些转换操作都被定义为通用函数,使得用户可指定输入和输出数据类型。

这些转换中,filter、map、mapValues等是无状态的,可用于KStream和KTable两者,通常用户会传一个自定义函数给这些函数作为参数,例如Predicate给filter,KeyValueMapper给map等:

    // written in Java 8+, using lambda expressions
    KStream mapped = source1.mapValue(record -> record.get("category"));

无状态的转换不依赖于处理的状态,因此不需要状态仓库。有状态的转换则需要存取相应状态以处理和生成结果。例如,在join和aggregate操作里,一个窗口状态用于保存当前预定义窗口中收到的记录。于是转换可以获取状态仓库中累积的记录,并执行计算。

    // written in Java 8+, using lambda expressions
    KTable, Long> counts = source1.aggregateByKey(
        () -> 0L,  // initial value
        (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
        HoppingWindows.of("counts").with(5000L).every(1000L), // intervals in milliseconds
    );

    KStream joined = source1.leftJoin(source2,
        (record1, record2) -> record1.get("user") + "-" + record2.get("region");
    );

将流写回Kafka

在处理的最后,用户可选择连续地将最终结果写回某个Kafka topic,通过KStream.to或KTable.to:

    joined.to("topic4");

如果应用需要在记录被物化到topic中继续读和处理它们,你可能会想到创建一个新的流从这个输出topic中读取。Kafka Streams提供了一个方便的函数称为through:

    // equivalent to
    //
    // joined.to("topic4");
    // materialized = builder.stream("topic4");
    KStream materialized = joined.through("topic4");

2.4 配置参数

所有参数见下表。


名称

描述

类型

默认值

application.id

流处理应用的标识,对同一个应用需要一致,因为它是作为消费的group_id的

string

bootstrap.servers

host1:port1,host2:port2 这样的列表,是用来发现所有Kafka节点的种子,因此不需要配上所有的Kafka节点

list

client.id

应用的一个客户端的逻辑名称,设定后可以区分是哪个客户端在请求

string

“"

zookeeper.connect

zookeeper连接串

string

“"

key.serde

键的序列化/反序列化类

class

class org.apache.kafka.common.serialization.Serdes$ByteArraySerde

partition.grouper

用于分区组织的类,需要实现PartitionGrouper接口

class

class org.apache.kafka.streams.processor.DefaultPartitionGrouper

replication.factor

流处理应用会创建change log topic和repartition topic用于管理内部状态,这个参数设定这些topic的副本数

int

1

state.dir

状态仓库的存储路径

string

/tmp/kafka-streams

timestamp.extractor

时间戳抽取类,需要实现TimestampExtractor接口

class

class org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor

value.serde

值的序列化/反序列化类

class

class org.apache.kafka.common.serialization.Serdes$ByteArraySerde

buffered.records.per.partition

每个分区缓存的最大记录数

int

1000

commit.interval.ms

存储处理器当前位置的间隔毫秒数

long

30000

metric.reporters

用于性能报告的类列表。需要实现MetricReporter接口。JmxReporter会永远开启不需要指定

list

[]

metric.num.samples

计算性能需要的采样数

int

2

metric.sample.window.ms

性能采样的时间间隔

long

30000

num.standby.replicas

每个任务的后备副本数

int

0

num.stream.threads

执行流处理的线程数

int

1

poll.ms

等待输入的毫秒数

long

100

state.cleanup.delay.ms

一个分区迁移后,在删除状态前等待的毫秒数

long

60000

3 总结

综上,Kafka Streams的价值体现在以下几点,首先它提供了两套轻量且易用的API有效降低了Kafka数据流处理的开发成本,在这之前可以使用SparkStreaming(不支持单条消费)、Storm(必须使用Trident才支持时间窗),或者自己写consumer(以前高层API还好,低层API是初学者的噩梦,最欢乐的是官方将低层API称为“Simple API”),现在至少又多了一种选择。其次用它开发的应用支持跑在Yarn、Mesos、Docker或者纯Java应用内,比较灵活。再次是数据流的两种抽象比较有意思,目前我还没有深入研究,但觉得用来处理不去重/去重的场景简直太方便了。当然缺点也有,首先目前不支持异步操作,这就需要开发者小心在处理方法中不能有高开销动作,否则整个处理线程阻塞。另外如果需要SQL接口或者ML能力,那还是去找SparkStreaming吧。

时间: 2024-10-05 12:19:33

Kafka Streams入门指南的相关文章

Microsoft Orleans 之 入门指南

Microsoft Orleans 在.net用简单方法构建高并发.分布式的大型应用程序框架. 原文:http://dotnet.github.io/orleans/ 在线文档:http://dotnet.github.io/orleans/What's-new-in-Orleans 源码地址:https://github.com/dotnet/orleans 简介:Orleans 框架可以构建大规模.高并发.分布式应用程序,而不需要学习专业分布式以及并发知识框架.它是由微软研究和设计应用于云计

分布式消息系统Jafka入门指南之二

分布式消息系统Jafka入门指南之二 作者:chszs,转载需注明.博客主页:http://blog.csdn.net/chszs 三.Jafka的文件夹结构 1.安装tree命令 $ sudo yum install tree 2.查看文件夹 $ tree -L 1 . ?..? ? bin ? ..?? conf ?..?? data ? ..?? lib ? ..?? LICENSE ?..? ? logs ?..?? VERSION 说明:bin文件夹:命令行脚本conf文件夹:存放配置

分布式消息系统Jafka入门指南

分布式消息系统Jafka入门指南 作者:chszs,转载需注明.博客主页:http://blog.csdn.net/chszs 一.JafkaMQ简介 JafkaMQ是一个分布式的发布/订阅消息系统,它是Apache Kafka的Java移植版. 2013年11月28日,JafkaMQ发布了1.2.3版. JafkaMQ的特征如下: 1)消息持久化到磁盘的算法时间复杂度为O(1),即使是TB级的消息存储,也能保证常量时间的执行性能.2)高吞吐量:即使是低配制的硬件条件,单个Broker也能支持每

Apache NiFi 入门指南

本指南使用于谁? 本指南适用于从未使用过,在NiFi中有限度接触或仅完成特定任务的用户.本指南不是详尽的说明手册或参考指南.“ 用户指南”提供了大量信息,旨在提供更加详尽的资源,并且作为参考指南非常有用.相比之下,本指南旨在为用户提供所需的信息,以便了解如何使用NiFi,以便快速轻松地构建强大而灵活的数据流. 一些因为本指南中的某些信息仅适用于初次使用的用户,而其他信息可能适用于那些使用过NiFi的人,本指南分为几个不同的部分,其中一些可能对某些部分没用读者.随意跳转到最适合您的部分. 本指南确

Quartz.NET简介及入门指南

Quartz.NET简介 Quartz.NET是一个功能完备的开源调度系统,从最小的应用到大规模的企业系统皆可适用. Quartz.NET是一个纯净的用C#语言编写的.NET类库,是对非常流行的JAVA开源调度框架 Quartz 的移植. 入门指南 本入门指南包括以下内容: 下载 Quartz.NET 安装 Quartz.NET 根据你的特定项目配置 Quartz 启动一个样例程序 下载和安装 你可以下载 zip 文件或使用 Nuget 程序包.Nuget 程序包只包含 Quartz.NET 运

Java程序员的Golang入门指南(上)

Java程序员的Golang入门指南 1.序言 Golang作为一门出身名门望族的编程语言新星,像豆瓣的Redis平台Codis.类Evernote的云笔记leanote等. 1.1 为什么要学习 如果有人说X语言比Y语言好,两方的支持者经常会激烈地争吵.如果你是某种语言老手,你就是那门语言的"传道者",下意识地会保护它.无论承认与否,你都已被困在一个隧道里,你看到的完全是局限的.<肖申克的救赎>对此有很好的注脚: [Red] These walls are funny.

【翻译Autofac的帮助文档】1.入门指南

[写在前面]尝试做完一件工作之外自我觉得有意义的一件事,那就从翻译Autofac的帮助文档吧. 入门指南 将Autofac集成你的应用程序的步骤通常很简单,一般是: 时刻以IOC(控制反转)的思想来规划你的应用程序 在你的Porject中添加Autofac引用 按照如下步骤设计应用程序的启动环节 创建一个ContainerBuilder 向ContainerBuilder注册组件 通过ContainerBuilder的Build()方法获得Container(后续需用到) 在应用程序运行环节时,

Markdown——入门指南

导语: Markdown 是一种轻量级的「标记语言」,它的优点很多,目前也被越来越多的写作爱好者,撰稿者广泛使用.看到这里请不要被「标记」.「语言」所迷惑,Markdown 的语法十分简单.常用的标记符号也不超过十个,这种相对于更为复杂的 HTML 标记语言来说,Markdown 可谓是十分轻量的,学习成本也不需要太多,且一旦熟悉这种语法规则,会有一劳永逸的效果. Ulysses for Mac 一,认识 Markdown 在刚才的导语里提到,Markdown 是一种用来写作的轻量级「标记语言」

Win32编程API 基础篇 -- 1.入门指南 根据英文教程翻译

入门指南 本教程是关于什么的 本教程的目的是向你介绍使用win32 API编写程序的基础知识(和通用的写法).使用的语言是C,但大多数C++编译器也能成功编译,事实上,教程中的绝大多数内容都适用于任何可以连接API的语言,包括Java.Assembly和Visual Basic:我不会向你呈现任何跟这些语言相关的代码,这需要你在本教程的指导下自己去完成,有一些人在本API的基础上使用其他语言进行编程取得了相当的成功. 本教程不会教你C语言,也不会告诉你怎样去运行你特定的编译器(Borland C