【Flink】Flink基础之WordCount实例(Java与Scala版本)

简述

WordCount(单词计数)作为大数据体系的标准示例,一直是入门的经典案例,下面用java和scala实现Flink的WordCount代码;

采用IDEA + Maven + Flink 环境;文末附 pom 文件和相关技术点总结;

Java批处理版本

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCountBatchByJava {
    public static void main(String[] args) throws Exception {

        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 加载或创建源数据
        DataSet<String> text = env.fromElements("this a book", "i love china", "i am chinese");

        // 转化处理数据
        DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);

        // 输出数据到目的端
        ds.print();

        // 执行任务操作
        // 由于是Batch操作,当DataSet调用print方法时,源码内部已经调用Excute方法,所以此处不再调用,如果调用会出现错误
        //env.execute("Flink Batch Word Count By Java");

    }

    static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
            for (String word:line.split(" ")) {
                collector.collect(new Tuple2<>(word,1));
            }
        }
    }
}

运行输出结果如下:

(a,1)
(am,1)
(love,1)
(china,1)
(this,1)
(i,2)
(book,1)
(chinese,1)

Java流处理版本

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WordCountStreamingByJava {
    public static void main(String[] args) throws Exception {

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置socket数据源
        DataStreamSource<String> source = env.socketTextStream("192.168.1.111", 9999, "\n");
        // 转化处理数据
        DataStream<WordWithCount> dataStream = source.flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String line, Collector<WordWithCount> collector) throws Exception {
                for (String word : line.split(" ")) {
                    collector.collect(new WordWithCount(word, 1));
                }
            }
        }).keyBy("word")//以key分组统计
                .timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
                .sum("count");//计算时间窗口内的词语个数

        // 输出数据到目的端
        dataStream.print();

        // 执行任务操作
        env.execute("Flink Streaming Word Count By Java");

    }

    public static class WordWithCount{
        public String word;
        public int count;

        public WordWithCount(){

        }

        public WordWithCount(String word, int count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

启动一个shell窗口,联通9999端口,输入数据:

[[email protected] flink-1.6.2]# nc -l 9999
山东 天津 北京 河北 河南 山东 上海 北京
山东 海南 青海 西藏 四川 海南

IDEA 输出结果如下:

4> WordWithCount{word='北京', count=2}
1> WordWithCount{word='上海', count=1}
5> WordWithCount{word='天津', count=1}
4> WordWithCount{word='河南', count=1}
7> WordWithCount{word='山东', count=2}
3> WordWithCount{word='河北', count=1}
------------------------为了区分前后时间窗口结果,手动加的这条线--------------------------
8> WordWithCount{word='海南', count=2}
8> WordWithCount{word='四川', count=1}
7> WordWithCount{word='山东', count=1}
1> WordWithCount{word='西藏', count=1}
5> WordWithCount{word='青海', count=1}

Scala批处理版本

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment

object WordCountBatchByScala {
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //加载数据源
    val source = env.fromElements("china is the best country","beijing is the capital of china")

    //转化处理数据
    val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)

    //输出至目的端
    ds.print()

    // 执行操作
    // 由于是Batch操作,当DataSet调用print方法时,源码内部已经调用Excute方法,所以此处不再调用,如果调用会出现错误
    //env.execute("Flink Batch Word Count By Scala")

  }
}

运行结果如下:

(is,2)
(beijing,1)
(the,2)
(china,2)
(country,1)
(of,1)
(best,1)
(capital,1)

Scala流处理版本

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object WordCountStreamingByScala {
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //加载或创建数据源
    val source = env.socketTextStream("192.168.1.111",9999,'\n')

    //转化处理数据
    val dataStream = source.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      .timeWindow(Time.seconds(2),Time.seconds(2))
      .sum(1)

    //输出到目的端
    dataStream.print()

    //执行操作
    env.execute("Flink Streaming Word Count By Scala")

  }
}

启动shell窗口,开启9999端口通信,输入词语:

[[email protected] flink-1.6.2]# nc -l 9999
time is passed what is the time?
time is nine time passed again

运行结果如下:

4> (what,1)
5> (time,1)
8> (is,2)
5> (time?,1)
8> (passed,1)
5> (the,1)
------------------------为了区分前后时间窗口结果,手动加的这条线--------------------------
8> (is,1)
5> (time,2)
8> (passed,1)
7> (nine,1)
6> (again,1)

POM文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ssrs</groupId>
    <artifactId>flinkdemo</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hadoop.version>2.8.4</hadoop.version>
        <flink.version>1.6.1</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
</project>

总结

  1. flink处理任务流程如下:

    ? ① 获取执行环境 (Environment)

    ? ② 加载或者创建数据源(source)

    ? ③ 转化处理数据(transformation)

    ? ④ 输出目的端(sink)

    ? ⑤ 执行任务(execute)

  2. 在批处理中,如果输出目的端,执行的 print 命令(除此之外,还有count,collect方法),则执行任务Execute不需要调用(因为这些方法内部已经调用了Execute方法);如果调用,虽然也有正确结果,但是会有错误信息输出;错误如下:
    Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
     at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940)
     at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922)
     at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:85)
     at com.ssrs.WordCountBatchByJava.main(WordCountBatchByJava.java:27)
  3. 如果批处理代码中,输出目的端调用writeAsCsv、writeAsText等其他方法,则后面需要调用Execute;
  4. 批处理获取执行环境用ExecutionEnvironment,流处理获取环境用StreamExecutionEnvironment
  5. 批处理后的数据是DataSet,流处理后的数据是DataStream.

原文地址:https://www.cnblogs.com/ShadowFiend/p/11951948.html

时间: 2024-08-29 20:49:19

【Flink】Flink基础之WordCount实例(Java与Scala版本)的相关文章

小记--------sparksql和DataFrame的小小案例java、scala版本

sparksql是spark中的一个模块,主要用于进行结构化数据的处理,他提供的最核心的编程抽象,就是DataFrame.同时,sparksql还可以作为分布式的sql查询引擎. 最最重要的功能就是从hive中查询数据.     Dataframe可以理解为:以列的形式组织的,分布式的数据集合.     Dataframe可以通过很多来源进行构建,包括:结构化的数据文件.hive中的表.外部的关系型数据库.以及RDD   使用sparksql 首先需要创建一个sqlContext对象,或者是它的

Hadoop3 在eclipse中访问hadoop并运行WordCount实例

前言:       毕业两年了,之前的工作一直没有接触过大数据的东西,对hadoop等比较陌生,所以最近开始学习了.对于我这样第一次学的人,过程还是充满了很多疑惑和不解的,不过我采取的策略是还是先让环境跑起来,然后在能用的基础上在多想想为什么.       通过这三个礼拜(基本上就是周六周日,其他时间都在加班啊T T)的探索,我目前主要完成的是: 1.在Linux环境中伪分布式部署hadoop(SSH免登陆),运行WordCount实例成功. http://www.cnblogs.com/Pur

JVM 并发性: Java 和 Scala 并发性基础

处理器速度数十年来一直持续快速发展,并在世纪交替之际走到了终点.从那时起,处理器制造商更多地是通过增加核心来提高芯片性能,而不再通过增加时钟速率来提高芯片性能.多核系统现在成为了从手机到企业服务器等所有设备的标准,而这种趋势可能继续并有所加速.开发人员越来越需要在他们的应用程序代码中支持多个核心,这样才能满足性能需求. 在本系列文章中,您将了解一些针对 Java 和 Scala 语言的并发编程的新方法,包括 Java 如何将 Scala 和其他基于 JVM 的语言中已经探索出来的理念结合在一起.

Java web基础总结三之—— java web 服务器

Java web基础总结三之-- java web 服务器 一.什么是java web服务器 首先来看一下什么是web服务器,它一般指网站服务器,是指一个再互联网一个主机上的一个程序.它可以解析客户端发送来的遵循http协议的请求,并且经过逻辑业务处理后,以http协议向浏览器等Web客户端提供文档. 互联网上供外界访问的Web资源可以分为两种:一个是静态web资源(如html 页面),指web页面中供人们浏览的数据始终是不变.另一个动态web资源,指web页面中供人们浏览的数据是由程序动态产生

Java基础9:解读Java回调机制

Java基础9:解读Java回调机制 模块间的调用 本部分摘自https://www.cnblogs.com/xrq730/p/6424471.html 在一个应用系统中,无论使用何种语言开发,必然存在模块之间的调用,调用的方式分为几种: (1)同步调用 同步调用是最基本并且最简单的一种调用方式,类A的方法a()调用类B的方法b(),一直等待b()方法执行完毕,a()方法继续往下走.这种调用方式适用于方法b()执行时间不长的情况,因为b()方法执行时间一长或者直接阻塞的话,a()方法的余下代码是

wordcount实例

scala的wordcount实例 package com.wondersgroup.myscala import scala.actors.{Actor, Future} import scala.collection.mutable.ListBuffer import scala.io.Source //首先统计每个文本中出现的频率=>汇总 case class SubmitTask(f:String) case object StopTask //统计一个文本中单词出现的次数 class

大数据计算引擎之Flink Flink状态管理和容错

原文地址:大数据计算引擎之Flink Flink状态管理和容错 有状态计算 在Flink架构体系中,有状态计算可以说是Flink非常重要的特征之一.有状态计算是指在程序计算过程中,在Flink程序内部,存储计算产生的中间结果,并提供给Functions 或 孙子计算结果使用.如图所示: 状态数据可以维系在本地存储中,这里的存储可以是 Flink 的堆内存或者堆外内存,也可以借助第三方的存储介质,例如:Flink中已经实现的RocksDB,当然用户也可以自己实现相应的缓存系统去存储状态信息,以完成

java基础知识回顾之java Thread类学习(八)--java多线程通信等待唤醒机制经典应用(生产者消费者)

 *java多线程--等待唤醒机制:经典的体现"生产者和消费者模型 *对于此模型,应该明确以下几点: *1.生产者仅仅在仓库未满的时候生产,仓库满了则停止生产. *2.消费者仅仅在有产品的时候才能消费,仓空则等待. *3.当消费者发现仓储没有产品可消费的时候,会唤醒等待生产者生产. *4.生产者在生产出可以消费的产品的时候,应该通知等待的消费者去消费. 下面先介绍个简单的生产者消费者例子:本例只适用于两个线程,一个线程生产,一个线程负责消费. 生产一个资源,就得消费一个资源. 代码如下: pub

java基础知识回顾之java Thread类学习(七)--java多线程通信等待唤醒机制(wait和notify,notifyAll)

1.wait和notify,notifyAll: wait和notify,notifyAll是Object类方法,因为等待和唤醒必须是同一个锁,不可以对不同锁中的线程进行唤醒,而锁可以是任意对象,所以可以被任意对象调用的方法,定义在Object基类中. wait()方法:对此对象调用wait方法导致本线程放弃对象锁,让线程处于冻结状态,进入等待线程的线程池当中.wait是指已经进入同步锁的线程,让自己暂时让出同步锁,以便使其他正在等待此锁的线程可以进入同步锁并运行,只有其它线程调用notify方