浅谈kafka streams

  随着数据时代的到来,数据的实时计算也越来越被大家重视。实时计算的一个重要方向就是实时流计算,目前关于流计算的有很多成熟的技术实现方案,比如Storm、Spark Streaming、flink等。
我今天要讲的kafka streams体量上来说没有那么大,都算不上一个框架,只是kafka的一个类库。麻雀虽小,五脏俱全。kafka streams能提供强大的流处理的功能,并且具备一些大框架不具备的灵活特点。
这篇文章的目标是把流计算这个事讲清楚,并介绍kafka streams是如何来做流计算的如有欠妥之处,欢迎指出。

大纲

  • 什么是流计算
  • 什么是kafka streams
  • kafka streams的特点、架构、优缺点
  • word count示例

一、什么是流计算

在介绍流计算之前,我们先把在它之前的批计算讲一下。

批计算是在计算之前将这次计算的源数据一次性到位,按数据块来处理数据,每一个task接收一定大小的数据块,然后经过批计算在这次计算的结果一次性返还给调用者。

批计算的处理的对象是有限数据(bound data),得到的结果也是一个有限结果集,因此批量计算中的每个任务都是短任务,任务在处理完其负责的数据后关闭。
流计算与之相反,流计算处理的对象是无限数据(unbound data),流式计算的上游算子处理完一条数据后,会立马发送给下游算子,所以一条数据从进入流式系统到输出结果的时间间隔较短,经过流计算得到的结果也是无限的结果集。

流式计算往往是长任务,每个work一直运行,持续接受数据源传过来的数据。

二、什么是kafka streams

  说到流计算,很多人会想到Storm、Spark Streaming、Flink。确实这些框架目前都已经完美的支持流计算,并且很多大厂都有相应的使用案例,但是这些框架使用起来门槛很高,首先要学习框架的使用,各种规范,然后要讲业务迁移到框架中,其次线上使用这些流计算框架,部署也是一个很头疼的事。但是今天要讲的主角Kafka Streams,是Kafka 在0.10版本加入的一个新的类库,官方定位是轻量级的流计算类库。简单体现在以下几个方面:

  1)由于Kafka Streams是Kafka的一个lib,所以实现的程序不依赖单独的环境

  2)基于功能实现时比较简洁,只需要基于规范实现业务逻辑即可,规模和Failover等问题有Kafka本身的特性保证。

                   

三、Kafka Streams的特点、组件架构

  1、Kafka Streams的特点   

    1) 轻量级java应用,除了kafka,无需依赖资源调度框架

      2) 毫秒级延迟

    3)支持stateful(有状态的)处理,如join,aggregation等。

    4)试错成本很低,相比较其他框架,

    5)支持exactly-once语义支持

  2、组件  

    1)Stream Topology:Processor 处理后后结果输出

    2)Processor: Stream Topology中的节点,是一个基本的计算节点

    3)State Store:本地信息存储

      类型:
        1)Key-value based
        2)Window based
      容错性
        1)本地RocksDB备份
        2)远程由changelog topic备份在broker上

        

    3、架构图

           

                          (该图摘自Confluent官网)

  这里简单介绍下Kafka Stream如何实现有状态的处理,为了实现状态的概念,Kafka Streams有两个重要抽象:KStream 和 KTable。分别对应数据流和数据库,区别在于key-value对值如何被解释。Kafka Streams作为流处理技术,很好的将存储状态的表(table)和作为记录的流(stream)无缝地结合在了一起,这个也叫流-表对偶性。

四、来一个wordcount例子

  一般编程领域学习一个新技术都会以hello-world开始,但是在大数据计算,则是以word-count开始,顾名思义,统计单词数量。

  1、启动zookeeper

    zkServer.cmd

  2、启动kafka

    kafka-server-start.bat d:\soft\tool\Kafka\kafka_2.12-2.1.0\config\server.properties

  3、创建一个用于存储输入数据的topic

    kafka-console-producer.bat --broker-list localhost:9092 --topic streams-file-input < file-input.txt

    为了方便演示,其中file-input.txt我是直接放到kafka的bin目录下

  4、在idea中创建一个简单的项目,书写以下代码:

    

/**
 * ymm56.com Inc.
 * Copyright (c) 2013-2019 All Rights Reserved.
 */
package wikiedits;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;
import java.util.Properties;

/**
 * @author LvHuiKang
 * @version $Id: KafkaStreamTest.java, v 0.1 2019-03-26 19:45 LvHuiKang Exp $$
 */
public class KafkaStreamTest {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        Serde<String> sdeStr = Serdes.String();
        Serde<Long> sdeLong = Serdes.Long();
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> inputLines = builder.stream(sdeStr, sdeStr, "streams-file-input");
        KTable<String, Long> wordCounts = inputLines.flatMapValues(inputLine -> Arrays.asList(inputLine.toLowerCase().split("\\W+"))).groupBy((key, word) -> word).count("Counts");
        wordCounts.to(sdeStr, sdeLong, "streams-wordcount-output");
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
        System.out.println();

    }
}

pom 依赖如下:

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>0.11.0.0</version>
</dependency>
<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.11.0.0</version>
</dependency>

然后启动main方法,运行如下:

  5、启动consumer:

    kafka-console-consumer.bat  --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

  展示如下:

  

五、总结

  本文简单介绍了kafka streams这个作为轻量级流计算引擎的架构、主要组件已经区别其他流计算引擎的特点,并通过word-count简单演示了kafka streams的使用。本文也是在我研究流计算时无意发现的一个技术,人有很多关键的技术点没有吃透并给大家讲解,后续研究后会追加。感谢你的阅读,欢迎指正不足,并进行讨论。

原文地址:https://www.cnblogs.com/hklv/p/10692999.html

时间: 2024-11-05 03:26:16

浅谈kafka streams的相关文章

浅谈分布式消息技术 Kafka

http://www.linkedkeeper.com/1016.html Kafka的基本介绍 Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志.访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目. 主要应用场景是:日志收集系统和消息系统. Kafka主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化

浅谈分布式消息技术 Kafka(转)

一只神秘的程序猿. Kafka的基本介绍 Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志.访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目. 主要应用场景是:日志收集系统和消息系统. Kafka主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能.

搞懂分布式技术21:浅谈分布式消息技术 Kafka

搞懂分布式技术21:浅谈分布式消息技术 Kafka 浅谈分布式消息技术 Kafka 本文主要介绍了这几部分内容: 1基本介绍和架构概览 2kafka事务传输的特点 3kafka的消息存储格式:topic和parition 4副本(replication)策略:主从broker部署和partition备份,以及选主机制 5kafka消息分组,通过comsumergroup实现主体订阅 6push和pull的区别,顺序写入和消息读取,零拷贝机制 Kafka的基本介绍 Kafka是最初由Linkedi

[翻译和注解]Kafka Streams简介: 让流处理变得更简单

Introducing Kafka Streams: Stream Processing Made Simple 这是Jay Kreps在三月写的一篇文章,用来介绍Kafka Streams.当时Kafka Streams还没有正式发布,所以具体的API和功能和0.10.0.0版(2016年6月发布)有所区别.但是Jay Krpes在这简文章里介绍了很多Kafka Streams在设计方面的考虑,还是很值得一看的. 以下的并不会完全按照原文翻译,因为那么搞太累了……这篇文件的确很长,而且Jay

浅谈ELK日志分析平台

作者:珂珂链接:https://zhuanlan.zhihu.com/p/22104361来源:知乎著作权归作者所有.商业转载请联系作者获得授权,非商业转载请注明出处. 小编的话 "技术干货"系列文章意在分享技术牛人的知识干货,每期主题都不一样哟!期待各位读者在文后发表留言,来一场技术上的交流和思想上的碰撞! 2016年7月20日,本期品高云公开课由叶春草带来"可视化案发现场--浅谈ELK日志分析平台"的分享. 分享嘉宾 叶春草现就职于品高云软件技术支持工程师.就职

【转】浅谈分布式服务协调技术 Zookeeper

非常好介绍Zookeeper的文章, Google的三篇论文影响了很多很多人,也影响了很多很多系统.这三篇论文一直是分布式领域传阅的经典.根据MapReduce,于是我们有了Hadoop:根据GFS,于是我们有了HDFS:根据BigTable,于是我们有了HBase.而在这三篇论文里都提及Google的一个Lock Service —— Chubby,哦,于是我们有了Zookeeper. 随着大数据的火热,Hxx们已经变得耳熟能详,现在作为一个开发人员如果都不知道这几个名词出门都好像不好意思跟人

[转帖]浅谈响应式编程(Reactive Programming)

浅谈响应式编程(Reactive Programming) https://www.jianshu.com/p/1765f658200a 例子写的非常好呢. 0.9312018.02.14 21:22:16字数 1877阅读 9816 这是告别CSDN后第一次使用简书写IT类的博客,还在适应.最不适应的就是不能直接手输markdown语法标记.(好像原因是我没有切换编辑器) 什么是响应式编程(Reactive Programming) In computing, reactive program

.net中对象序列化技术浅谈

.net中对象序列化技术浅谈 2009-03-11 阅读2756评论2 序列化是将对象状态转换为可保持或传输的格式的过程.与序列化相对的是反序列化,它将流转换为对象.这两个过程结合起来,可以轻松地存储和传输数 据.例如,可以序列化一个对象,然后使用 HTTP 通过 Internet 在客户端和服务器之间传输该对象.反之,反序列化根据流重新构造对象.此外还可以将对象序列化后保存到本地,再次运行的时候可以从本地文件 中“恢复”对象到序列化之前的状态.在.net中有提供了几种序列化的方式:二进制序列化

浅谈——页面静态化

现在互联网发展越来越迅速,对网站的性能要求越来越高,也就是如何应对高并发量.像12306需要应付上亿人同时来抢票,淘宝双十一--所以,如何提高网站的性能,是做网站都需要考虑的. 首先网站性能优化的方面有很多:1,使用缓存,最传统的一级二级缓存:2,将服务和数据库分开,使用不同的服务器,分工更加明确,效率更加高:3,分布式,提供多台服务器,利用反向代理服务器nginx进行反向代理,将请求分散开来:4,数据库的读写分离,不同的数据库,将读操作和写操作分开,并实时同步即可:5,分布式缓存,使用memc