flink伪分布式搭建及其本地idea测flink连接

下载安装flink:
上传压缩包:flink-1.7.2-bin-scala_2.12.tgz
解压:tar -zxvf /flink-1.7.2-bin-scala_2.12.tgz -C ../hone
复制解压文件到子节点:
scp -r /home/flink-1.7.2/ [email protected]:/home/
scp -r /home/flink-1.7.2/ [email protected]:/home/
修改配置文件:选择一个master节点,配置conf/flink-conf.yaml
vi conf/flink-conf.yaml
设置jobmanager.rpc.address 配置项为该节点的IP 或者主机名
jobmanager.rpc.address: 10.108.4.202
然后添加子节点配置:
在所有的节点中:flink目录下:vi conf/slaves
添加所有子节点ip然后保存
启动本地的flink集群:
cd 到flink目录下
./bin/start-cluster.sh
查看webui:ip:8081
启动监听:nc -lk 9000
当报nc命令不存在时(yum install nc)
然后执行测试jar:
停止flink集群:bin/stop-cluster.sh
以集群方式提交任务:在flink目录下
./bin/flink run -m yarn-cluster -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar --port 9000

新建maven程序
pom.xml依赖如下:
然后新建一个TestSocketWindowWordCount类具体代码如下
然后启动flink集群->新建一个监听:nc -lk 6666
然后启动TestSocketWindowWordCount类
在linux监听页面输入代码
观察在idea控制台就有统计的输出
-------pom.xml开始-----------
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>
-------pom.xml结束-----------
-------TestSocketWindowWordCount开始------------------
package com.gyb;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import javax.xml.soap.Text;

public class TestSocketWindowWordCount {
public static void main(String args[]) {
String hostname = "192.168.198.130";
int port = 6666;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream text = env.socketTextStream(hostname, port, "\n");//获取执行环境
SingleOutputStreamOperator windowCounts = text
.flatMap(new FlatMapFunction<String, SocketWindowWordCount.WordWithCount>() {br/>@Override
public void flatMap(String value, Collector<SocketWindowWordCount.WordWithCount> out) {
for (String word : value.split("\s")) {
out.collect(new SocketWindowWordCount.WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(5))
.reduce(new ReduceFunction<SocketWindowWordCount.WordWithCount>() {br/>@Override
public SocketWindowWordCount.WordWithCount reduce(SocketWindowWordCount.WordWithCount a, SocketWindowWordCount.WordWithCount b) {
return new SocketWindowWordCount.WordWithCount(a.word, a.count + b.count);
}
});

// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
//env.execute("Socket Window WordCount");
try {
    env.execute("Socket Window WordCount");
} catch (Exception e) {
    e.printStackTrace();
}

}

public static class WordWithCount {

    public String word;
    public long count;

    public WordWithCount() {}

    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }

    @Override
    public String toString() {
        return word + " : " + count;
    }
}

}
-------TestSocketWindowWordCount结束------------------

flink伪分布式搭建及其本地idea测flink连接

原文地址:https://blog.51cto.com/13184837/2437778

时间: 2024-10-09 07:51:57

flink伪分布式搭建及其本地idea测flink连接的相关文章

hadoop2.2.0伪分布式搭建

一.准备linux环境 1.更改VMware适配器设置 由于是在单机环境下进行学习的,因此选择适配器模式是host-only模式,如果想要联网,可以选择桥接模式,配置的方式差不多. 点击VMware快捷方式,右键打开文件所在位置 -> 双击vmnetcfg.exe -> VMnet1 host-only ->修改subnet ip 设置网段:192.168.85.0 子网掩码:255.255.255.0 -> apply -> ok 回到windows --> 打开网络

Hadoop - Hadoop伪分布式搭建

进行Hadoop伪分布式搭建时,需要有一定的Linux命令基础, 因为Hadoop是搭建在Linux环境上的开源框架, Hadoop的框架最核心的设计就是:HDFS和MapReduce. 本文也主要讲述HDFS和MapReduce环境的搭建. 搭建环境: 虚拟机: VMware10 操作系统:CentOS-6.5 JDK版本:jdk-8u171-linux-x64 Hadoop版本:hadoop-2.7.3 使用工具:Xshell 6,Xftp 6,Notepad++ 虚拟机ip地址:192.1

redis cluster单机伪分布式搭建--- 3主3从3哨兵集群

redis cluster单机伪分布式搭建--- 3主3从3哨兵集群 最近公司引进微服务框架,之前的一台redis的预存60G已经无法满足现在的260G业务需要,经过一番考虑搭建了这套集群 . 为了方便我就用一台服务器演示,生产环境中不建议这么做(没啥用),只为记录一下过程,至于精细化的配置需要在生产中自行研究 演示环境 [[email protected] ~]# cat /etc/redhat-release CentOS Linux release 7.4.1708 (Core) redi

大数据伪分布式搭建

大数据伪分布式搭建 ***对于大数据这块相信大家对linux有一定的认识,所有对创建虚拟机那块就不给予详细的说明了. 基础环境的搭建 1.系统环境 平台:VMware Workstation pro 系统:centos 7 Hadoop版本: Apache Hadoop 3.0.0 本次实验是搭建一台master和两台node节点.因为我们主要的目的是想让大家了解一下Hadoop伪分布式的搭建流程,如果说大家的电脑小于8G的话,那就每台节点就大概开个1.5G左右,也是为了大家有一个好的体验. 修

Hadoop伪分布式搭建步骤

说明: 搭建环境是VMware10下用的是Linux CENTOS 32位,Hadoop:hadoop-2.4.1  JAVA :jdk7 32位:本文是本人在网络上收集的HADOOP系列视频所附带的课堂笔记(这里直接抄过来的,望原谅,我不是坏人),记录备忘. 开始前让我们简单修改下LINUX系统的JDK:一般情况下,我们都要将linux自带的OPENJDK卸载掉,然后安装SUN的JDK(下面的代码来源网络,请注意真实自己环境) <1> 查看已经安装的JAVA版本信息 # java -vers

Hadoop2.2.0伪分布式搭建简述

简述了自己搭建Hadoop伪分布式的过程,方便以后查看参考. 环境:Vmware10+RedHat6.3+hadoop2.2.0+JDK1.7 Hadoop模式: 本地模式:只能其一个reduce和一个map,用于调试 伪分布式模式:通过一台机器模拟分布式,在学习时使用.验证逻辑是否正确 集群模式:工作的模式,有几百上千台机器. linux环境配 关闭防火墙 若是对外网提供的服务是绝对不能关闭防火墙的.而Hadoop一般是公司内部使用,有多台节点,且之间需要通信,此时若防火前将通信的端口屏蔽则无

Dockerfile完成Hadoop2.6的伪分布式搭建

在 <Docker中搭建Hadoop-2.6单机伪分布式集群>中在容器中操作来搭建伪分布式的Hadoop集群,这一节中将主要通过Dokcerfile 来完成这项工作. 1 获取一个简单的Docker系统镜像,并建立一个容器. 1.1 这里我选择下载CentOS镜像 docker pull centos 1.2 通过docker tag命令将下载的CentOS镜像名称换成centos,并删除老标签 docker tag docker.io/centos centosdocker rmr dock

Hadoop的伪分布式搭建

我们在搭建伪分布式Hadoop环境,需要将一系列的配置文件配置好. 一.配置文件 1. 配置文件hadoop-env.sh export JAVA_HOME=/opt/modules/jdk1.7.0_67 2. 配置core-site.xml dfs.defaultFS hdfs://hostname:8020 hadoop.tmp.dir /opt/modules/hadoop-2.5.0/data/tmp 配置hdfs-site.xml dfs.replication 3 配置yarn-s

Hadoop简单入门之伪分布式搭建

前面两章主要讲解了完全分布式的搭建,这章主要讲解服务器单机完成伪分布的搭建,介绍Hadoop配置,启动,以及简单测试.我的机器:阿里云服务器,64位,Java64,Hadoop2.4.1(64) ? 所有软件下载百度云 密码:uup8 讲在开头:对于笔者的完全分布式环境请见该文:Hadoop完全分布式安装 写文章不易,欢迎大家采我的文章,以及给出有用的评论,当然大家也可以关注一下我的github:多谢: 1,Hadoop简单介绍: Apache Hadoop是一款支持数据密集型分布式应用,并以A