02 使用Flink的本地模式完成词频统计

前面我们已经安装了flink,与storm一样,flink也有两种模式,一是本地模式,主要用于学习和测试,另一个是集群模式,实际生产中使用这种模式。本节将阐述如何使用本地模式的flink进行词频统计。

1 系统、软件以及前提约束

2 操作

  • 1 在idea中创建一个maven项目
  • 2 修改该maven项目的pom.xml中的依赖
   <dependencies>
        <dependency>
            <!--spark依赖-->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <!--scala依赖-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <!--hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-cdh5.7.0</version>
        </dependency>
        <!--hbase依赖-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.0-cdh6.0.1</version>
        </dependency>
        <!--storm依赖-->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
            <version>1.2.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>
    </dependencies>
  • 3 在src/main/java中添加SocketWindowWordCountWithFlink.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCountWithFlink {
    public static void main(String[] args) throws Exception {
        // final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // local模式
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
        @SuppressWarnings("serial")
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word").timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });
        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount(zyl_test)");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}
  • 4 测试

    (1)打开windows命令行,执行以下命令:

nc -l -p 9999

(2)在idea中执行SocketWindowWordCountWithFlink.java

(3)在nc窗口输入字符串,观察idea中的控制台,会有统计结果打印。

以上就是使用Flink的本地模式进行的词频统计过程,在本实验中,我们通过人输入字符串来模拟源源不断到来的数据流。

原文地址:https://www.cnblogs.com/alichengxuyuan/p/12576766.html

时间: 2024-11-02 11:36:32

02 使用Flink的本地模式完成词频统计的相关文章

MapReduce词频统计

自定义Mapper实现 import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * KEYIN: Map任务读取数据的key类型,offset,是每行数据起始位置的偏移量,

Windows8.1+Eclipse搭建Hadoop2.7.2本地模式开发环境

下面介绍如何在Windows8.1上搭建hadoop2.7.2的本地模式开发环境,为后期做mapreduce的开发做准备. 在搭建开发环境之前,首先选择开发工具,就是大家都很熟悉的Eclipse(本人这次使用的是eclipse4.4.2版本),Eclipse提供了hadoop的插件,我们通过这个插件,就可以在eclipse中编写mapreduce.但是,这个插件可能会随着hadoop的版本升级或者eclipse的版本升级,而需要相应的去进行编译.所以,在我们开发之前,学会编译这个eclipse的

IntelliJ IDEA(Ultimate版本)的下载、安装和WordCount的初步使用(本地模式和集群模式)Ultimate

不多说,直接上干货! IntelliJ IDEA号称当前Java开发效率最高的IDE工具.IntelliJ IDEA有两个版本:社区版(Community)和旗舰版(Ultimate).社区版时免费的.开源的,但功能较少,旗舰版提供了较多的功能,是收费的,可以试用30天. 强烈推荐,新手刚入门,可以去用社区版,但是,立马还是用旗舰版,我可是走了弯路,当然,体会到其中的棘手还是很不错! IDEA Community(社区版)再谈之无奈之下还是去安装社区版 IntelliJ IDEA(Communi

Hive的三种安装方式(内嵌模式,本地模式远程模式)

一.安装模式介绍:     Hive官网上介绍了Hive的3种安装方式,分别对应不同的应用场景.     1.内嵌模式(元数据保村在内嵌的derby种,允许一个会话链接,尝试多个会话链接时会报错)     2.本地模式(本地安装mysql 替代derby存储元数据)     3.远程模式(远程安装mysql 替代derby存储元数据) 二.安装环境以及前提说明:     首先,Hive是依赖于hadoop系统的,因此在运行Hive之前需要保证已经搭建好hadoop集群环境.     本文中使用的

Pig安装及本地模式实战

Pig是Apache的一个开源项目,用于简化MapReduce的开发,实质Pig将转化为MapReduce作业,使开发人员更多专注数据而不是执行的本质,是不懂java人员的福利. Pig由以下两部分组成: 1.表达数据流的语言,成为Pig Latin. 2.运行Pig Latin程序的执行环境.目前有两种环境:在单个JVM本地执行和在HADOOP集群上分步执行 废话不多说,实战起来: 和Apache其他项目一样,安装Pig都很简单,在/etc/profile文件中设置环境变量. #set pig

Hadoop学习笔记(2)-搭建Hadoop本地模式

0.前言 hadoop总共有三种运行方式.本地模式(Local (Standalone) Mode),伪分布式(Pseudo-Distributed Mode),分布式(Fully-Distributed Mode).后面足一讲解搭建本地以及伪分布式,分布式读者自行搭建. 参考资料(官网为主,网络资料为铺): http://hadoop.apache.org/docs/r2.6.4/hadoop-project-dist/hadoop-common/SingleCluster.html#Stan

【node.js】本地模式安装express:&#39;express&#39; 不是内部或外部命令,也不是可运行的程序或批处理文件。

今天闲来无事想起了node.js,因此到网上下载了一个node.js的安装程序进行安装.其中: 安装程序:node-v0.11.13-x64.msi PC系统:Windows 7 自定义安装路径:D:\TOOLS\NodeJs 安装完成后,执行: D:\TOOLS\NodeJs>node -v v0.11.13 安装框架express,从网站上下载了一个安装文档,说安装express可分全局模式和本地模式,个人觉得全局模式就是默认的没什么意思,就选择本地模式进行安装,执行: D:\TOOLS\N

55.storm 之 hello word(本地模式)

strom hello word 概述 然后卡一下代码怎么实现的: 编写数据源类:Spout.可以使用两种方式: 继承BaseRichSpout类 实现IRichSpout接口 主要需要实现或重写几个方法:open.nextTuple.declareOutputFields 继续编写数据处理类:Bolt.可以使用两种方式: 继承BaseBasicBolt类 实现IRichBolt接口 终点实现或重写几个方法:execute.declareOutputFields 最后编写主函数(Topology

Hive本地模式安装及遇到的问题和解决方案

Apache Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行. 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析. Hive有三种运行模式: 1.内嵌模式:将元数据保存在本地内嵌的Derby数据库中,这得使用Hive最简单的方式,不过使用内嵌模式的话,缺点也比较明显,因为一个内嵌的D