Flink Java Demo(Windows)

关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽。本文主要记录一下Java使用Flink的简单例子。

首先,去官网下载Flink的zip包(链接就不提供了,你已经是个成熟的程序员了,该有一定的搜索能力了),解压后放到你想放的地方。

进入主目录后,是这样子的

image.png

你可以简单的看下其目录结构,然后就回到你喜欢的IDE创建一个工程吧。

使用IDEA创建一个maven项目,然后加入相应的依赖即可。也可以按照Flink官网的方式去创建一个maven工程,然后导入你喜欢的IDE。下面是官网的quickstart里的maven依赖。

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

创建工程后我们就可以写代码了,以下的例子和官网上的差不多,直接上代码

package org.myorg.quickstart;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Skeleton for a Flink Streaming Job.
 *
 * <p>For a tutorial how to write a Flink streaming application, check the
 * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
 *
 * <p>To package your appliation into a JAR file for execution, run
 * ‘mvn clean package‘ on the command line.
 *
 * <p>If you change the name of the main class (with the public static void main(String[] args))
 * method, change the respective entry in the POM.xml file (simply search for ‘mainClass‘).
 */
public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*
         * Here, you can start creating your execution plan for Flink.
         *
         * Start with getting some data from the environment, like
         *  env.readTextFile(textPath);
         *
         * then, transform the resulting DataStream<String> using operations
         * like
         *  .filter()
         *  .flatMap()
         *  .join()
         *  .coGroup()
         *
         * and many more.
         * Have a look at the programming guide for the Java API:
         *
         * http://flink.apache.org/docs/latest/apis/streaming/index.html
         *
         */

        DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);
        DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = s.toLowerCase().split("\\W+");

                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1);

        dataStream.print();
        // execute program
        env.execute("Java WordCount from SocketTextStream Example");
    }
}

大家都是文化人,注释已经很详尽了,就不翻译了,唯一需要注意的是,IDEA好像不支持它的lambda表达式,所以我这里没有直接变lambda。

接下来就是激动人心的运行环节,Windows需要安装个瑞士军刀来支持nc命令(直接在官网下个zip包,解压,配置到环境变量即可)。在命令行中执行 nc -l -p 9000,然后运行上边那个程序(如果先运行程序会因为连接不到socket报错)

image.png

随便输入,然后在IDEA的console中可以看到如下的结果。

image.png

以上因为没启动Flink服务,所以不需要像其他博主那样,去localhost:8081的webUI中进行监控
,StreamExecutionEnvironment.getExecutionEnvironment()会创建一个LocalEnvironment然后在Java虚拟机上执行。

Windows单机模式下启动Flink相当简单,进入到bin目录,直接双击start-cluster.bat,会启动Flink的JobManager和TaskManager两个服务。如果想将上述程序提交到Flink,需要执行maven命令打成jar包,然后在命令行中,进入到bin目录下执行 flink.bat run xxxx/xxx/xxx.jar 即可,输出结果会在TaskManager的服务窗口中输出。

作者:瓜尔佳_半阙
链接:https://www.jianshu.com/p/f8b66afd32bf
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

原文地址:https://www.cnblogs.com/duwamish/p/10376605.html

时间: 2024-10-13 07:51:07

Flink Java Demo(Windows)的相关文章

Java反射机制demo(四)—获取一个类的父类和实现的接口

Java反射机制demo(四)—获取一个类的父类和实现的接口 1,Java反射机制得到一个类的父类 使用Class类中的getSuperClass()方法能够得到一个类的父类 如果此 Class 表示 Object 类.一个接口.一个基本类型或 void,则返回 null.如果此对象表示一个数组类,则返回表示该 Object 类的 Class 对象. 测试代码: package com.aaron.reflect; public class Demo4 { public static void

java反序列化原理-Demo(一)

java反序列化原理-Demo(一) 0x00 什么是java序列化和反序列? Java 序列化是指把 Java 对象转换为字节序列的过程便于保存在内存.文件.数据库中,ObjectOutputStream类的 writeObject() 方法可以实现序列化.Java 反序列化是指把字节序列恢复为 Java 对象的过程,ObjectInputStream 类的 readObject() 方法用于反序列化. 0x01 java反序列漏洞原理分析 首先先定义一个user类需继承Serializabl

.net辗转java系列(一)视野

本文目的在于扩展你我视野,求各位大神帮忙补充下表格中的内容,特别是Java的相关内容. 下面的文字纯是为了凑足150个字. 本人作为一名普通的.net程序员,也快混了十年了.在.net方面的知识面较广,但是深度严重不够.我们从最下层次的开发说起: 1.         嵌入系统wince开发(基于.net compack framwork, Visual Studio 2008之后就不支持了) 2.         上位机开发(Winform为主,主要是硬件信号的收集) 3.         桌

Java虚拟机(JVM)中的内存设置详解

在一些规模稍大的应用中,Java虚拟机(JVM)的内存设置尤为重要,想在项目中取得好的效率,GC(垃圾回收)的设置是第一步. PermGen space:全称是Permanent Generation space.就是说是永久保存的区域,用于存放Class和Meta信息,Class在被Load的时候被放入该区域Heap space:存放Instance. GC(Garbage Collection)应该不会对PermGen space进行清理,所以如果你的APP会LOAD很多CLASS的话,就很

java 基础(二)

java 基础(二)java 基础(二) 2016-2-1 by Damon 61. 编写多线程程序有几种实现方式 Java 5以前实现多线程有两种实现方法:一种是继承Thread类:另一种是实现Runnable接口.两种方式都要通过重写run()方法来定义线程的行为,推荐使用后者,因为Java中的继承是单继承,一个类有一个父类,如果继承了Thread类就无法再继承其他类了,显然使用Runnable接口更为灵活. 补充:Java 5以后创建线程还有第三种方式:实现Callable接口,该接口中的

r与java整合(转)

http://jliblog.com/archives/10 R是统计计算的强大工具,而JAVA是做应用系统的主流语言,两者天然具有整合的需要.关于整合,一方面,R中可以创建JAVA对象调用JAVA方法,另一方面,JAVA中可以转换R的数据类型调用R的函数,互相取长补短.现在也有一个项目JGR,用JAVA做R的图形界面,可以实现高亮显示自动补全等,还能让JAVA和R互相调用. 关于R中调用JAVA,我想主要是为了利用其面向对象的特性,毕竟R语言近来很致力于向面向对象发展,有个很好的项目rJava

Java 笔记(8)

JSP 技术 day8 JSP语法 + EL + JSTL day9 案例 2-3 个 综合小案例 day10 Servlet+JSP 综合练习 为什么sun推出 JSP技术 ? Servlet 生成网页比较复杂,本身不支持HTML语法,html代码需要通过response输出流输出,JSP支持HTML语法,生成HTML方便. JSP技术与Servlet 技术区别和关系? JSP和Servlet技术都是用来动态生成网页的,Servlet不支持HTML语法,生成网页麻烦,JSP支持HTML语法,生

Java笔记(11)

day1 -- day3 XML Java基础加强 day4 -- day7 Servlet编程 day8 -- day10 JSP 综合案例 day11 -- day16 MySQL 数据库 练习SQL语句 JDBC编程 关系化数据模型? 常见关系化数据库有哪些? 收费产品 免费产品 Microsoft SQL Server : 微软公司产品,中等规模数据库 收费产品,运行在windows平台上 --- .net平台+SQLServer进行开发 Oracle :甲骨文公司产品,大型商业数据层,

跟老杨学java系列(五) JDK的安装与配置

跟老杨学java系列(五) JDK的安装与配置 提示:本节内容对于java入门是非常关键的,对于刚接触java的同学一定要认真学习,欢迎大家留言探讨技术问题.其他问题概不回复. (书接上回)上节课程我们简单介绍了java项目的开发过程及常用的开发工具,这节课我们详细讲解一下JDK的安装与配置.根据上一节的学习,我们知道编写完java代码后,需要先对java代码进行编译,然后再执行.而java程序的编译与执行都是通过JDK来完成的.所以做java开发,首先我们需要学会安装和配置JDK.下面我们就来