flink入门

wordCount

POM文件需要导入的依赖:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.7.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.7.1</version>
        </dependency>

  

离线代码:

java版本:

package flink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class WordExample {
    public static void main(String[] args) throws Exception {

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //创建构建字符串的数据集
        DataSet<String> text = env.fromElements(
                "flink test","" +
                        "I think I hear them. Stand, ho! Who‘s there?");

        //分割字符串,按照key进行分组,统计相同的key个数
        DataSet<Tuple2<String, Integer>> wordCount = text.flatMap(new LineSplitter())
                .groupBy(0).sum(1);

        wordCount.print();
    }
}
package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
    @Override
    public void flatMap(String o, Collector<Tuple2<String, Integer>> collector) throws Exception {
        for (String word : o.split(" ")) {
            collector.collect(new Tuple2<String, Integer>(word,1));
        }
    }
}

scala版本:

package flink

import org.apache.flink.api.scala._

object WordCountExample {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.fromElements("Who‘s there?",

      "I think I hear them. Stand, ho! Who‘s there?")

    val counts = text.flatMap(_.toLowerCase().split("\\W+")filter(_.nonEmpty))
      .map((_,1)).groupBy(0).sum(1)

    counts.print()
  }
}

流式:

 java版本:

package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception {
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.out.println("No port specified.Please run ‘SocketWindowWordCount--port <port>‘");
            return;
        }
        //get the execution enviroment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, ‘\n‘);
        //parse the data,group it.window it,and aggregeate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String s, Collector<WordWithCount> collector) {
                        for (String word : s.split("\\s")) {
                            collector.collect(new WordWithCount(word, 1L));
                        }
                    }
                }).keyBy("word").timeWindow(Time.seconds(10), Time.seconds(5))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount wordWithCount, WordWithCount t1) throws Exception {
                        return new WordWithCount(wordWithCount.word, wordWithCount.count + t1.count);
                    }
                });

        //print the result with a single thread,rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }
}
package flink;

public class WordWithCount {
    public String word;
    public long count;

    public WordWithCount() {
    }

    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }

    @Override
    public String toString() {
        return word + ":" + count;
    }
}

  scala版本

package flink

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object SokcetWindowWordCount {

  case class WordWithCount(word: String, count: Long)

  def main(args: Array[String]): Unit = {
    //the port to connect to
    val port: Int = try {
      ParameterTool.fromArgs(args).getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified.Please run ‘SocketWindowWordCount --port<port>‘")
        return
      }
    }
    //get the execution enviroment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //parse input data by connecting to the socket
    val text = env.socketTextStream("localhost", port, ‘\n‘)

    //parse the data.group it.window it.and aggregate the counts

    val windowCount = text
      .flatMap{w => w.split("\\s")}
      .map{w => WordWithCount(w, 1)}
      .keyBy("word")
      .timeWindow(Time.seconds(10), Time.seconds(5))

      .sum("count")

    //print the results with a single thread ,rather than in parallel
    windowCount.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }
}

  运行,传参:

终端使用nc命令进行模拟发送数据到9999端口

  运行结果:

  注意事项:

    千万不要把包导错了,java就导java,scala就导scala,如果导错,程序跑步起来

原文地址:https://www.cnblogs.com/Gxiaobai/p/10290990.html

时间: 2024-10-20 22:32:11

flink入门的相关文章

Flink入门(五)——DataSet Api编程指南

Apache Flink Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾批处理,流处理的特性,Flink可能正在颠覆整个大数据的生态. DataSet API 首先要想运行Flink,我们需要下载并解压Flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html 我们可以选择Flink与Scala结合版本,这里我们选择最新的1.9版本Apache

Flink入门宝典(详细截图版)

本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本.需要安装Netcat进行简单调试. 这里简述安装过程,并使用IDEA进行开发一个简单流处理程序,本地调试或者提交到Flink上运行,Maven与JDK安装这里不做说明. 一.Flink简介 Flink诞生于欧洲的一个大数据研究项目StratoSphere.该项目是柏林工业大学的一个研究性项目.早期,Flink是做Batch计算的,但是在2014年,StratoSphere里面的核心成员孵化出

apache flink 入门

配置环境 包括 JAVA_HOME jobmanager.rpc.address jobmanager.heap.mb 和 taskmanager.heap.mb taskmanager.numberOfTaskSlots taskmanager.tmp.dirs slaves文件 启动关闭bin/start-cluster.shbin/stop-cluster.sh   初步使用 public static void main(String[] args) throws Exception {

flink入门实例-Windows下本地模式跑SocketWordCount

一般情况下,开发大数据处理程序,我们希望能够在本地编写代码并调试通过,能够在本地进行数据测试,然后在生产环境去跑“大”数据. 一.nc工具 配置windows的nc端口,在网上下载nc.exe(https://eternallybored.org/misc/netcat/) 使用命令开始nc制定端口为9000(nc -L -p 9000 -v) 启动插件 二.idea中配置,代码以及设置参数 maven配置: <?xml version="1.0" encoding="

Flink入门wordCount

Flink的编程模型1.获取Flink上下文:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();2.加载.创建数据:DataSet3.数据转换:Transformation4.数据结果存放:5.触发执行.env.execution 下面为flink输出wordcount数据: import org.apache.flink.api.common.functions.FlatMapFunction;i

Apache Flink 入门示例demo

在本文中,我们将从零开始,教您如何构建第一个Apache Flink (以下简称Flink)应用程序. 开发环境准备 Flink 可以运行在 Linux, Max OS X, 或者是 Windows 上.为了开发 Flink 应用程序,在本地机器上需要有 Java 8.x 和 maven 环境. 如果有 Java 8 环境,运行下面的命令会输出如下版本信息: $ java -versionjava version "1.8.0_65" Java(TM) SE Runtime Envir

Flink入门(二)——Flink架构介绍

1.基本组件栈 了解Spark的朋友会发现Flink的架构和Spark是非常类似的,在整个软件架构体系中,同样遵循着分层的架构设计理念,在降低系统耦合度的同时,也为上层用户构建Flink应用提供了丰富且友好的接口. https://mmbiz.qpic.cn/mmbiz_png/mqibsuEhdUyIVKMN1mHneQiantTzuhJYqwSD0k9gn8RCcJZHeD19KxcLj8ydCUr9KuepDWu6fk2J47oKx6dyQlfQ/640?wx_fmt=png&wxfrom

Flink入门(一)——Apache Flink介绍

Apache Flink是什么? 在当代数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题.随着雅虎对hadoop的开源,越来越多的大数据处理技术开始涌入人们的视线,例如目前比较流行的大数据处理引擎Apache Spark,基本上已经取代了MapReduce成为当前大数据处理的标准.但是随着数据的不断增长,新技术的不断发展,人们逐渐意识到对实时数据处理的重要性.相对于传统的数据处理模式,流式数据处理有着更高的处理效率

Apache-Flink深度解析-TableAPI

您可能感兴趣的文章合集: Flink入门 Flink DataSet&DataSteam API Flink集群部署 Flink重启策略 Flink分布式缓存 Flink重启策略 Flink中的Time Flink中的窗口 Flink的时间戳和水印 Flink广播变量 Flink-Kafka-connetor Flink-Table&SQL Flink实战项目-热销排行 Flink-Redis-Sink Flink消费Kafka写入Mysql 什么是Table API 在<SQL概览