Flink+kafka实现Wordcount实时计算

1. Flink

Flink介绍:

Flink 是一个针对流数据和批数据的分布式处理引擎。它主要是由 Java 代码实现。目前主要还是依靠开源社区的贡献而发展。对 Flink 而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。再换句话说,Flink 会把所有任务当成流来处理,这也是其最大的特点。Flink 可以支持本地的快速迭代,以及一些环形的迭代任务。

Flink的特性:

Flink是个分布式流处理开源框架:

1>. 即使数据源是无序的或者晚到达的数据,也能保持结果准确性

2>. 有状态并且容错,可以无缝的从失败中恢复,并可以保持exactly-once

3>. 大规模分布式

4>. 实时计算场景的广泛应用(阿里双十一实时交易额使用的Blink就是根据Flink改造而来)

Flink可以确保仅一次语义状态计算;Flink有状态意味着,程序可以保持已经处理过的数据;

Flink支持流处理和窗口事件时间语义,Flink支持灵活的基于时间窗口,计数,或会话数据驱动的窗户;

Flink容错是轻量级和在同一时间允许系统维持高吞吐率和提供仅一次的一致性保证,Flink从失败中恢复,零数据丢失;

Flink能够高吞吐量和低延迟;

Flink保存点提供版本控制机制,从而能够更新应用程序或再加工历史数据没有丢失并在最小的停机时间。

2. Kafka

Kafka介绍

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Kafka特性

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

1>. 通过磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

2>. 高吞吐量即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。

3>. 支持通过Kafka服务器和消费机集群来分区消息。

4>. 支持Hadoop并行数据加载。

Kafka的安装配置及基础使用

因为此篇博客是本地Flink消费Kafka的数据实现WordCount,所以Kafka不需要做过多配置,从Apache官网下载安装包直接解压即可使用

这里我们创建一个名为test的topic

在producer输入数据流:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在consumer监控从producer输入的数据流:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

3. Flink Java API实现Flink消费Kafka的数据实现WordCount过程

1>. 创建maven project

2>. 配置flink和flink-kafka需要的依赖pom文件

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.0.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>
    </dependencies>

3>. 引入Flink StreamExecutionEnvironment

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

4>. 设置监控数据流时间间隔(官方叫状态与检查点)

env.enableCheckpointing(1000);

5>. 配置kafka和zookeeper的ip和端口

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
properties.setProperty("group.id", "test");

6>. 将kafka和zookeeper配置信息加载到Flink StreamExecutionEnvironment

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),properties);

7>. 将Kafka的数据转成flink的DataStream类型

DataStream<String> stream = env.addSource(myConsumer);

8>. 实施计算模型并输出结果

DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

counts.print();

计算模型具体逻辑代码

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

4. 验证

1>. Kafka producer输入

2>. Flink客户端立刻得出结果

完整代码

package com.scn;

import java.util.Properties;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class FilnkCostKafka {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
        properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
                properties);

        DataStream<String> stream = env.addSource(myConsumer);

        DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

        counts.print();

        env.execute("WordCount from Kafka data");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

}

原文地址:https://www.cnblogs.com/jiashengmei/p/9025535.html

时间: 2024-11-05 12:13:16

Flink+kafka实现Wordcount实时计算的相关文章

Storm 大数据 视频教程 安装 Spark Kafka Hadoop 分布式实时计算

视频资料都逐个核对,清晰高质量,而且包含各种文档.软件安装包和源码!永久免费更新! 技术团队永久免费解答各种技术问题:Hadoop.Redis.Memcached.MongoDB.Spark.Storm.云计算.R语言.机器学习.Nginx.Linux.MySQL.Java EE..NET.PHP,节省您的时间! 获取视频资料和技术支持地址 ------------------------------------------------------------------------------

Flink通过SQLClinet创建kafka源表并进行实时计算

1.通过自建kafka的生产者来产生数据 /bin/kafka-console-producter.sh --broker-list 192.168.58.177:9092 --topic my_topic 数据 {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior":

可以穿梭时空的实时计算框架——Flink对时间的处理

Flink对于流处理架构的意义十分重要,Kafka让消息具有了持久化的能力,而处理数据,甚至穿越时间的能力都要靠Flink来完成. 在Streaming-大数据的未来一文中我们知道,对于流式处理最重要的两件事,正确性,时间推理工具.而Flink对两者都有非常好的支持. Flink对于正确性的保证 对于连续的事件流数据,由于我们处理时可能有事件暂未到达,可能导致数据的正确性受到影响,现在采取的普遍做法的通过高延迟的离线计算保证正确性,但是也牺牲了低延迟. Flink的正确性体现在计算窗口的定义符合

storm消费kafka实现实时计算

大致架构 * 每个应用实例部署一个日志agent * agent实时将日志发送到kafka * storm实时计算日志 * storm计算结果保存到hbase storm消费kafka 创建实时计算项目并引入storm和kafka相关的依赖 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.

【Streaming】30分钟概览Spark Streaming 实时计算

本文主要介绍四个问题: 什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark Streaming相对其他实时计算框架该如何技术选型? 本文主要针对初学者,如果有不明白的概念可了解之前的博客内容. 1.什么是Spark Streaming? 与其他大数据框架Storm.Flink一样,Spark Streaming是基于Spark Core基础之上用于处理实时计算业务的框架.其实

实时计算,流数据处理系统简介与简单分析

转自:http://www.csdn.net/article/2014-06-12/2820196-Storm 摘要:实时计算一般都是针对海量数据进行的,一般要求为秒级.实时计算主要分为两块:数据的实时入库.数据的实时计算.今天这篇文章详细介绍了实时计算,流数据处理系统简介与简单分析. 编者按:互联网领域的实时计算一般都是针对海量数据进行的,除了像非实时计算的需求(如计算结果准确)以外,实时计算最重要的一个需求是能够实时响应计算结果,一般要求为秒级.实时计算的今天,业界都没有一个准确的定义,什么

实时计算平台

实时计算平台中的弹性集群资源管理 本文系微博运维数据平台(DIP)在实时计算平台的研发过程中集群资源管理方面的一些经验总结和运用,主要关注以下几个问题: 异构资源如何整合? 实时计算应用之间的物理资源如何隔离? 集群资源利用率如何提高? 集群运维成本如何降低? 1. 背景 这是我们初期的一个实时计算架构,大致划分为三个部分: (1)日志收集: 使用Rsynlog.Flume.Scribe汇聚各个业务方发送过来的日志数据:如果条件允许,业务方也可以直接将数据写入Kafka. (2)日志传输: 使用

实时计算平台中的弹性集群资源管理

本文系微博运维数据平台(DIP)在实时计算平台的研发过程中集群资源管理方面的一些经验总结和运用,主要关注以下几个问题: 异构资源如何整合? 实时计算应用之间的物理资源如何隔离? 集群资源利用率如何提高? 集群运维成本如何降低? 1. 背景 这是我们初期的一个实时计算架构,大致划分为三个部分: (1)日志收集: 使用Rsynlog.Flume.Scribe汇聚各个业务方发送过来的日志数据:如果条件允许,业务方也可以直接将数据写入Kafka. (2)日志传输: 使用Kafka作为日志收集组件与实时应

权威详解 | 阿里新一代实时计算引擎 Blink,每秒支持数十亿次计算

王峰,淘宝花名"莫问",2006年毕业后即加入阿里巴巴集团,长期从事搜索和大数据基础技术研发工作,目前在计算平台事业部,负责实时计算北京研发团队. 在阿里巴巴的11年工作期间,持续专注大数据计算与存储技术领域,基于Hadoop开源生态打造的数据基础设施一直服务于搜索.推荐等阿里核心电商业务场景,最近一年带领团队对Apache Flink进行了大量架构改进.功能完善和性能提升,打造出了阿里新一代实时计算引擎: Blink.目前数千台规模的Blink生产集群已经开始在线支持搜索.推荐.广告