Flink集群模式部署及案例执行

一.软件要求

  Flink在所有类UNIX的环境【例如linux,mac os x和cygwin】上运行,并期望集群由一个 主节点和一个或多个工作节点组成。在开始设置系统之前,确保在每个节点上都安装了一下软件:

  1.Java1.8.x或更高版本

  2.ssh,必须运行sshd才能使用管理远程组件的Flink脚本

  在所有集群节点上都具有免密码的ssh和相同的目录结构,将使你可以使用flink脚本来控制所有内容。

二.Flink Standalone模式设置

  1.下载

  前往Flink官网下载最新版Flink【我下载的是flink-1.8.2】。若要在Hadoop上使用Flink,则需要下载与Hadoop匹配的版本。下载完成后,上传到几个各个节点并解压

  

  2.配置Flink

  通过编辑conf/flink-conf.yaml来为集群配置flink。设置jobmanager.rpc.address以指定flink主节点。还可以通过设置jobmanager.heap.size和taskmanager.heap.size来指定允许JVM在每个节点上分配的最大内存。这些值都是以MB为单位,如果某些工作程序节点有更多的内存分配给Flink集群,则可以通过FLINK_TM_HEAP在那些特定节点上设置环境变量来覆盖默认值。最后,必须提供集群中所有节点的列表,这些列表将用作工作节点。因此,类似于HDFS配置,编辑文件conf/slaves并输入每个子节点的IP/主机名。每个子节点都将运行TaskManager。

  以下示例说明了具有三个节点(IP地址从10.0.0.1到10.0.0.3且主机名分别为master,worker1,worker2)的设置,并显示了配置文件的内容:

  

  具体配置如下:

jobmanager.rpc.address:192.168.136.7 # 在每个节点上分别指定各自节点的IP/主机名
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1

  解释如下:

    1.jobmanager.heap.size:每个JobManager的可用内存大小,默认为1024M

    2.taskmanager.heap.size:每个TaskManager的可用内存大小,默认为1024M

    3.taskmanager.numberOfTaskSlots:每台计算机可用的CPU数,默认为1

    4.parallelism.default:集群中的CPU总数之和

    5.io.tmp.dirs:临时目录

  3.配置slaves

    

  4.配置环境变量

    

  5.启动flink

    执行bin/start-cluster.sh启动JobManager,并通过SSH连接到slaves文件中列出的所有工作节点,以在每个节点上启动TaskManager。

    

  6.Web UI

    打开浏览器,输入:http://master:8081

  配置成功!

三.本地执行WordCount

  1.代码

package cn.demo

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._ //必须导入

/**
  * Created by Administrator on 2020/1/22.
  */
object WordCount {
  def main(args: Array[String]) {
    val params : ParameterTool = ParameterTool.fromArgs(args)

    // 设置execution执行环境
    val execution = ExecutionEnvironment.getExecutionEnvironment

    // 设置web界面有效参数
    execution.getConfig.setGlobalJobParameters(params)

    val text = execution.fromElements("Apache Flink is an open source platform for distributed stream and batch data processing.",
      "Flink core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. ",
      "Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.")

    val counts = text.flatMap(_.toLowerCase.split(" ").filter(_.nonEmpty))
      .map((_, 1))
      .groupBy(0)//根据第一个元素分组
      .sum(1)
      .sortPartition(0, Order.ASCENDING) //按照分区进行排序
      .first(6)

    counts.print()
  }
}

  2.本地执行结果

    

四.案例执行

  要运行Flink案例,必须有一个正在运行的Flink实例。最简单的方法是运行./bin/start-cluster.sh,默认情况下会启动一个带有JobManager和一个TaskManager的本地集群。每个Flink二进制发行版都包含一个examples目录,其中包含WordCount这个最常用案例。

  要运行WordCount案例,执行以下命令:

  ./bin/flink run ./examples/batch/WordCount.jar --input /data/flink/wordcount --output /data/flink/wcresult

  备注:input路径要提前创建好,其中保存要计算的数据!

  

  

原文地址:https://www.cnblogs.com/yszd/p/12251398.html

时间: 2024-10-10 17:54:34

Flink集群模式部署及案例执行的相关文章

Redis集群模式部署

以下以Linux(CentOS)系统为例 1.1 下载和编译 $ wget http://download.redis.io/releases/redis-4.0.7.tar.gz $ tar xzf redis-4.0.7.tar.gz $ cd redis-4.0.7 $ make 1 2 3 4 编译完成后会在src目录下生成Redis服务端程序redis-server和客户端程序redis-cli. 1.2 启动服务 1.前台运行 src/redis-server 1 该方式启动默认为前

flink部署操作-flink standalone集群安装部署

flink集群安装部署 standalone集群模式 必须依赖 必须的软件 JAVA_HOME配置 flink安装 配置flink 启动flink 添加Jobmanager/taskmanager 实例到集群 个人真实环境实践安装步骤 必须依赖 必须的软件 flink运行在所有类unix环境中,例如:linux.mac.或者cygwin,并且集群由一个master节点和一个或者多个worker节点.在你开始安装系统之前,确保你有在每个节点上安装以下软件. java 1.8.x或者更高 ssh 如

redis解决方案之三种集群模式的概念与部署

上篇文章为大家总结了redis命令并讲述了持久化,今天我们来看一下redis的三种集群模式:主从复制,哨兵集群,Cluster集群 本篇文章先介绍redis-cluster,然后再依次介绍它的哨兵集群与主从复制 一.Cluster集群模式概念 redis集群是一个分布式与容错的redis实现.在集群中不存在代理节点与中心节点.后期可以很好的将其进行扩展 此模式也解决了redis高可用与可扩展的问题.但是redis集群不支持需要同时处理多个Key的redis命令 因为执行这些命令需要在多个redi

56.storm 之 hello world (集群模式)

回顾 在上一小节,我们在PWTopology1 这一个java类中注解掉了集群模式,使用本地模式大概了解一下storm的工作流程.这一节我们注解掉本地模式相关的代码,放开集群模式相关代码,并且将项目打包,在实际环境中运行一下. 集群模式部署步骤 项目打包 pom.xml右键 --> run as --> maven clean --> maven install 将大好的jar包上传到 nimbus 的 /usr/local下 提交拓扑 storm jar storm01.jar bhz

Zookeeper实战之嵌入式执行Zookeeper集群模式

非常多使用Zookeeper的情景是须要我们嵌入Zookeeper作为自己的分布式应用系统的一部分来提供分布式服务.此时我们须要通过程序的方式来启动Zookeeper.此时能够通过Zookeeper API的ZooKeeperServerMain类来启动Zookeeper服务. 以下是一个集群模式下启动Zookeeper服务的样例 这里假定我们执行Zookeeper集群的三台机器名分别为fanbinx1,fanbinx2,fanbinx3  首先是zoo.cfg配置文件 tickTime=200

【待补充】Spark 集群模式 && Spark Job 部署模式

0. 说明 Spark 集群模式 && Spark Job 部署模式 1. Spark 集群模式 [ Local ] 使用一个 JVM 模拟 Spark 集群 [ Standalone ] 启动 master + worker 进程 [ mesos ] -- [ Yarn ] -- 2. Spark Job 部署模式 [ Client ] Driver 程序运行在 Client 端. [ Cluster ] Driver 程序运行在某个 worker 上. spark-shell 只能以

Storm笔记整理(三):Storm集群安装部署与Topology作业提交

[TOC] Storm分布式集群安装部署 概述 Storm集群表面类似Hadoop集群.但在Hadoop上你运行的是"MapReduce jobs",在Storm上你运行的是"topologies"."Jobs"和"topologies"是大不同的,一个关键不同是一个MapReduce的Job最终会结束,而一个topology永远处理消息(或直到你kill它). Storm集群有两种节点:控制(master)节点和工作者(wo

Redis集群模式介绍

前言: 一.为什么要使用redis 1,解决应用服务器的cpu和内存压力 2,减少io的读操作,减轻io的压力(内存中读取) 3,关系型数据库扩展性,不强,难以改变表的结构 二.优点 1,nosql数据库没有关联关系,数据结构简单,扩展容易 2,数据读写快,能够每秒胜任几十万的并发,处理速度快 三.使用场景 1,数据高并发读写 2,海量数据读写 3,对不规则数据也就是扩展性要求高的数据 四.不适合场景 1,需要事务支持,虽然它也有事务但是没有关系型数据库的那么成熟吧 2,基于sql进行操作 五.

Spring Cloud - Nacos注册中心入门单机模式及集群模式

近几年微服务很火,Spring Cloud提供了为服务领域的一整套解决方案.其中Spring Cloud Alibaba是我们SpringCloud的一个子项目,是提供微服务开发的一站式解决方案. 包含微服务开发的必要组件,基于SpringCloud 符合SpringCloud标准,是阿里的微服务的解决方案. 文档:https://github.com/alibaba/spring-cloud-alibaba/blob/master/README-zh.md Nacos的安装与启动 Nacos是