flink入门实例-Windows下本地模式跑SocketWordCount

一般情况下,开发大数据处理程序,我们希望能够在本地编写代码并调试通过,能够在本地进行数据测试,然后在生产环境去跑“大”数据。

一、nc工具

配置windows的nc端口,在网上下载nc.exe(https://eternallybored.org/misc/netcat/)

使用命令开始nc制定端口为9000(nc -L -p 9000 -v) 启动插件

二、idea中配置,代码以及设置参数

maven配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>lims</groupId>
    <artifactId>flink-project</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.7.2</flink.version>
    </properties>

    <dependencies>
        <!--log4j-->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <!--flink-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

</project>
WordCount:
package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @Description: TODO
 * @Date: 2019/2/25 23:49
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        //定义socket的端口号
        int port;
        try{
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("没有指定port参数,使用默认值9000");
            port = 9000;
        }

        //获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n");

        //计算数据
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word:splits) {
                    out.collect(new WordWithCount(word,1L));
                }
            }
        })//打平操作,把每行的单词转为<word,count>类型的数据
                .keyBy("word")//针对相同的word数据进行分组
                .timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
                .sum("count");

        //把数据打印到控制台
        windowCount.print()
                .setParallelism(1);//使用一个并行度
        //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming word count");

    }

    /**
     * 主要为了存储单词以及单词出现的次数
     */
    public static class WordWithCount{
        public String word;
        public long count;
        public WordWithCount(){}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word=‘" + word + ‘\‘‘ +
                    ", count=" + count +
                    ‘}‘;
        }
    }
}

三、运行结果

cmd中输入单词,空格分割,并换行,在idea的控制台中观察输出

本地开发调试实例完成

原文地址:https://www.cnblogs.com/limaosheng/p/10434848.html

时间: 2024-08-28 13:44:53

flink入门实例-Windows下本地模式跑SocketWordCount的相关文章

Angular2的五分钟入门在Windows下的实现

Angular2的五分钟入门在Windows下的实现 官网提供的是在linux的步骤,而实际直接拿这些步骤在windows下也可以实现,但唯一就是无法 --watch TypeScript文件,那就是扯蛋,改一下代码就要重新编译,谁受得了. 那么我来尝试一下直接使用Gulp来搭建. 一.创建项目 虽然Angular2允许我们使用TypeScript.Dart.ES5.ES6来写代码,但是出于Angular2也拥抱TypeScript,那么变成我们唯一最好的选择也是TypeScript. 首先创建

windows下本地或者远程连接MYSQL数据库,报1130错误的解决方法

重装MySQL,由于不知道重装之前的root密码,使用重装之后的密码连接Mysql数据,总报 ERROR 1130: host 'localhost' not allowed to connect to this MySQLserver,不能连接数据库,猜测用户权限和密码的问题. 1.用root用户登录mysql数据库 (1)停止MySQL服务,执行net stop mysql; (2)在mysql的安装路径下找到配置文件my.ini, 找到[mysqld]   输入:skip-grant-ta

【转+修正】在Windows和Rstudio下本地安装SparkR

(根据最新情况进行修正) 毋庸置疑,Spark已经成为最火的大数据工具,本文详细介绍安装SparkR的方法,让你在5分钟之内能在本地使用. ?环境要求:java 7+ .R 及 Rstudio                  Rtools (下载地址:https://cran.r-project.org/bin/windows/Rtools/) 第一步:下载Spark ?在浏览器打开 http://spark.apache.org/,点击右边的绿色按钮“Download Spark” 你会看到

Windows下当地RabbitMQ服务的安装

Windows下本地RabbitMQ服务的安装 本文参考:刘若泽相关技术文档 当然这些内容页可以通过RabbitMQ官方网站获得. RabbitMQ配置说明手册 一.RaibbitMQ服务器配置 1. 准备工作.如果之前安装过RabbitMQ软件,若想重新安装,必须先把之前的RabbitMQ相关软件卸载. 2. 安装ERLANG语言包.首先到http://www.erlang.org/download.html这个页面下载 Erlang Windows Binary File并且运行.这个过程大

Storm集群上的开发 ,本地模式报错问题(插曲)

打包上传到集群上跑是没问题的,在本地模式跑,报客户端没有所需特权,此处客户端指的是MyEclipse,右击用管理员模式打开myclipse即可. 错误日志 : 4573 [SLOT_1027] ERROR o.a.s.d.s.Slot - Error when processing event java.nio.file.FileSystemException: C:\Users\ADMINI~1\AppData\Local\Temp\6d36a211-4aed-4485-ac2f-156088

solr在windows下的安装及配置

solr在windows下的安装及配置 2017-04-28 13:59 122人阅读 评论(0) 收藏 举报 .embody { padding: 10px 10px 10px; margin: 0 -20px; border-bottom: solid 1px #ededed } .embody_b { margin: 0; padding: 10px 0 } .embody .embody_t,.embody .embody_c { display: inline-block; margi

如何在Windows下用cpu模式跑通py-faster-rcnn 的demo.py

关键字:Windows.cpu模式.Python.faster-rcnn.demo.py 声明:本篇blog暂时未经二次实践验证,主要以本人第一次配置过程的经验写成.计划在7月底回家去电脑城借台机子试试验证步骤的正确性,本blog将根据实际遇到的问题持续更新.另外blog中除提到的下载链接外我还会给出网盘链接方便下载,包括我的整个工程的网盘链接.如果有些报错解决不了可直接拿本人的相关文件替换,本篇blog具有较高的参考性. 本人微软版caffe工程     下载链接:http://pan.bai

Spark+ECLIPSE+JAVA+MAVEN windows开发环境搭建及入门实例【附详细代码】

http://blog.csdn.net/xiefu5hh/article/details/51707529 Spark+ECLIPSE+JAVA+MAVEN windows开发环境搭建及入门实例[附详细代码] 标签: SparkECLIPSEJAVAMAVENwindows 2016-06-18 22:35 405人阅读 评论(0) 收藏 举报  分类: spark(5)  版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 前言 本文旨在记录初学Spark时,根据官网快速

深入学习:Windows下Git入门教程(下)

声明:由于本人对于Git的学习还处于摸索阶段,对有些概念的理解或许只是我断章取义,有曲解误导的地方还请见谅指正! 一.分支 1.1分支的概念. 对于的分支的理解,我们可以用模块化这个词来解释:在日常工作中,一个项目的开发模式往往是模块化,团队协作式的开发.这样我们项目的进度可以称得上多核并发式的开发了.这种模块化的开发要求我们尽可能的高内聚低耦合以免造成一只胳膊没了整个人都废了的局面.因此在所有的版本控制器对代码进行管理的时候都引入了分支这个概念.那么分支是什么呢? 分支是相对于主干来说的,或者