Flink入门wordCount

Flink的编程模型
1、获取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加载、创建数据;
DataSet
3、数据转换;
Transformation
4、数据结果存放;
5、触发执行。
env.execution

下面为flink输出wordcount数据:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkMain {

@SuppressWarnings("serial")
public static class LineSplit implements FlatMapFunction<String,Tuple2<String, Integer>>{

    @SuppressWarnings("rawtypes")
    @Override
    /**
     * @param value 原数据
     * @param out 输出的数据
     */
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.split(" ");
        for (String token : tokens) {
            if(token!=null && token.length()>0){
                Tuple2 t = new Tuple2<String, Integer>(token,1);
                out.collect(t);
            }
        }
    }

}

public static void main(String[] args) throws Exception {
    //创建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //创建数据集
    DataSet<String> text = env.fromElements("to be","or no to be","is question");
    //对数据集转换
    DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplit());
    //输出转换后的数据集(print中包含了env.execute执行)
    count.print();
    System.out.println("-----------------------");
    //对数据集分组统计转换,0,1是下标,对应Tuple2类中的参数
    count = count.groupBy(0).sum(1);
    //控制台输出数据集
    count.print();
    System.out.println("-----------------------");
}

}

Flink使用sql方式转换数据
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

public class FlinkMain2 {

@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {

    //创建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

    List<WordCount> list = new ArrayList();
    String workStr = "to be or no to be is question";
    String[] tokens = workStr.split(" ");
    for (String token : tokens) {
        if(token!=null && token.length()>0){
            list.add( new WordCount(token,1));
        }
    }
    //创建数据集
    DataSet<WordCount> input = env.fromCollection(list);
    //注册为数据表wordCount为数据库表,word,frequency为wordCount表字段
    tEnv.registerDataSet("wordCount", input, "word, frequency");

    Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );

    DataSet<WordCount> res = tEnv.toDataSet(table, WordCount.class);
    //控制台输出
    res.print();

}

public static class WordCount    {
    public String word;
    public long frequency;
    public WordCount(){}

    public WordCount(String word, long frequency) {
        this.word = word;
        this.frequency = frequency;
    }

    @Override
    public String toString() {
        return "词语:" + word + ",词频:" + frequency;
    }
}

}

原文地址:https://blog.51cto.com/14538371/2436853

时间: 2024-10-01 08:52:20

Flink入门wordCount的相关文章

Flink入门(五)——DataSet Api编程指南

Apache Flink Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾批处理,流处理的特性,Flink可能正在颠覆整个大数据的生态. DataSet API 首先要想运行Flink,我们需要下载并解压Flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html 我们可以选择Flink与Scala结合版本,这里我们选择最新的1.9版本Apache

apache flink 入门

配置环境 包括 JAVA_HOME jobmanager.rpc.address jobmanager.heap.mb 和 taskmanager.heap.mb taskmanager.numberOfTaskSlots taskmanager.tmp.dirs slaves文件 启动关闭bin/start-cluster.shbin/stop-cluster.sh   初步使用 public static void main(String[] args) throws Exception {

flink入门

wordCount POM文件需要导入的依赖: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>

flink入门实例-Windows下本地模式跑SocketWordCount

一般情况下,开发大数据处理程序,我们希望能够在本地编写代码并调试通过,能够在本地进行数据测试,然后在生产环境去跑“大”数据. 一.nc工具 配置windows的nc端口,在网上下载nc.exe(https://eternallybored.org/misc/netcat/) 使用命令开始nc制定端口为9000(nc -L -p 9000 -v) 启动插件 二.idea中配置,代码以及设置参数 maven配置: <?xml version="1.0" encoding="

Flink入门宝典(详细截图版)

本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本.需要安装Netcat进行简单调试. 这里简述安装过程,并使用IDEA进行开发一个简单流处理程序,本地调试或者提交到Flink上运行,Maven与JDK安装这里不做说明. 一.Flink简介 Flink诞生于欧洲的一个大数据研究项目StratoSphere.该项目是柏林工业大学的一个研究性项目.早期,Flink是做Batch计算的,但是在2014年,StratoSphere里面的核心成员孵化出

Apache Flink 入门示例demo

在本文中,我们将从零开始,教您如何构建第一个Apache Flink (以下简称Flink)应用程序. 开发环境准备 Flink 可以运行在 Linux, Max OS X, 或者是 Windows 上.为了开发 Flink 应用程序,在本地机器上需要有 Java 8.x 和 maven 环境. 如果有 Java 8 环境,运行下面的命令会输出如下版本信息: $ java -versionjava version "1.8.0_65" Java(TM) SE Runtime Envir

Flink入门(二)——Flink架构介绍

1.基本组件栈 了解Spark的朋友会发现Flink的架构和Spark是非常类似的,在整个软件架构体系中,同样遵循着分层的架构设计理念,在降低系统耦合度的同时,也为上层用户构建Flink应用提供了丰富且友好的接口. https://mmbiz.qpic.cn/mmbiz_png/mqibsuEhdUyIVKMN1mHneQiantTzuhJYqwSD0k9gn8RCcJZHeD19KxcLj8ydCUr9KuepDWu6fk2J47oKx6dyQlfQ/640?wx_fmt=png&wxfrom

Flink入门(一)——Apache Flink介绍

Apache Flink是什么? 在当代数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题.随着雅虎对hadoop的开源,越来越多的大数据处理技术开始涌入人们的视线,例如目前比较流行的大数据处理引擎Apache Spark,基本上已经取代了MapReduce成为当前大数据处理的标准.但是随着数据的不断增长,新技术的不断发展,人们逐渐意识到对实时数据处理的重要性.相对于传统的数据处理模式,流式数据处理有着更高的处理效率

【Flink】Flink基础之WordCount实例(Java与Scala版本)

简述 WordCount(单词计数)作为大数据体系的标准示例,一直是入门的经典案例,下面用java和scala实现Flink的WordCount代码: 采用IDEA + Maven + Flink 环境:文末附 pom 文件和相关技术点总结: Java批处理版本 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apa