Flink - CoGroup

使用方式,

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});

 

可以看到coGroup只是产生CoGroupedStreams

    public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
        return new CoGroupedStreams<>(this, otherStream);
    }

 

而where, equalTo只是添加keySelector,对于两个流需要分别指定

keySelector1,keySelector2

 

window设置双流的窗口,很容易理解

 

apply,

       /**
         * Completes the co-group operation with the user function that is executed
         * for windowed groups.
         *
         * <p>Note: This method‘s return type does not support setting an operator-specific parallelism.
         * Due to binary backwards compatibility, this cannot be altered. Use the
         * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific parallelism.
         */
        public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            //clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
            UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

            DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1 //将input1封装成TaggedUnion,很简单,就是赋值到one上
                    .map(new Input1Tagger<T1, T2>())
                    .setParallelism(input1.getParallelism())
                    .returns(unionType);
            DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2 //将input2封装成TaggedUnion
                    .map(new Input2Tagger<T1, T2>())
                    .setParallelism(input2.getParallelism())
                    .returns(unionType);

            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2); //由于现在双流都是TaggedUnion类型,union成一个流,问题被简化

            // we explicitly create the keyed stream to manually pass the key type information in
            WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = //创建窗口
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
                    .window(windowAssigner);

            if (trigger != null) { //如果有trigger,evictor,设置上
                windowOp.trigger(trigger);
            }
            if (evictor != null) {
                windowOp.evictor(evictor);
            }

            return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); //调用window的apply
        }

关键理解,他要把两个流变成一个流,这样问题域就变得很简单了

最终调用到WindowedStream的apply,apply是需要保留window里面的所有原始数据的,和reduce不一样

apply的逻辑,是CoGroupWindowFunction

 

private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
            extends WrappingFunction<CoGroupFunction<T1, T2, T>>
            implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

        private static final long serialVersionUID = 1L;

        public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
            super(userFunction);
        }

        @Override
        public void apply(KEY key,
                W window,
                Iterable<TaggedUnion<T1, T2>> values,
                Collector<T> out) throws Exception {

            List<T1> oneValues = new ArrayList<>();
            List<T2> twoValues = new ArrayList<>();

            for (TaggedUnion<T1, T2> val: values) {
                if (val.isOne()) {
                    oneValues.add(val.getOne());
                } else {
                    twoValues.add(val.getTwo());
                }
            }
            wrappedFunction.coGroup(oneValues, twoValues, out);
        }
    }
}

逻辑也非常的简单,就是将该key所在window里面的value,放到oneValues, twoValues两个列表中

最终调用到用户定义的wrappedFunction.coGroup

 

DataStream.join就是用CoGroup实现的

            return input1.coGroup(input2)
                    .where(keySelector1)
                    .equalTo(keySelector2)
                    .window(windowAssigner)
                    .trigger(trigger)
                    .evictor(evictor)
                    .apply(new FlatJoinCoGroupFunction<>(function), resultType);

 

FlatJoinCoGroupFunction

private static class FlatJoinCoGroupFunction<T1, T2, T>
            extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
            implements CoGroupFunction<T1, T2, T> {
        private static final long serialVersionUID = 1L;

        public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
            super(wrappedFunction);
        }

        @Override
        public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
            for (T1 val1: first) {
                for (T2 val2: second) {
                    wrappedFunction.join(val1, val2, out);
                }
            }
        }
    }

可以看出当前join是inner join,必须first和second都有的情况下,才会调到用户的join函数

时间: 2024-10-14 05:19:31

Flink - CoGroup的相关文章

Flink Program Guide (2) -- DataStream API编程指导 -- For Java

v\:* {behavior:url(#default#VML);} o\:* {behavior:url(#default#VML);} w\:* {behavior:url(#default#VML);} .shape {behavior:url(#default#VML);} 张安 张安 2 1 2016-08-02T10:56:00Z 2016-08-02T10:56:00Z 1 2945 16790 139 39 19696 16.00 false false false false

&lt;译&gt;Flink编程指南

Flink 的流数据 API 编程指南 Flink 的流数据处理程序是常规的程序 ,通过再流数据上,实现了各种转换 (比如 过滤, 更新中间状态, 定义窗口, 聚合).流数据可以来之多种数据源 (比如, 消息队列, socket 流, 文件). 通过sink组件落地流计算的最终结果,比如可以把数据落地文件系统,标准输出流比如命令行界面, Flink 的程序可以运行在多种上下文环境 ,可以单独只是Flink api,也可以嵌入其他程序. execution可以运行在本地的 JVM里, 也可以 运行

Flink流计算编程--在双流中体会joinedStream与coGroupedStream

一.joinedStream与coGroupedStream简介 在实际的流计算中,我们经常会遇到多个流进行join的情况,Flink提供了2个Transformations来实现. 如下图: 注意:Join(Cogroups) two data streams on a given key and a common window.这里很明确了,我们要在2个DataStream中指定连接的key以及window下来运算. 二.SQL比较 我们最熟悉的SQL语言中,如果想要实现2个表join,可以

Flink Java Demo(Windows)

关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽.本文主要记录一下Java使用Flink的简单例子. 首先,去官网下载Flink的zip包(链接就不提供了,你已经是个成熟的程序员了,该有一定的搜索能力了),解压后放到你想放的地方. 进入主目录后,是这样子的 image.png 你可以简单的看下其目录结构,然后就回到你喜欢的IDE创建一个工程吧. 使用IDEA创建一个maven项目,然后加入相应的依赖即可.也可以按照Flink官网的方式去创建一个maven工程,然后导入你喜欢的IDE

flink Transitive Closure算法,实现寻找新的可达路径

1.Transitive Closure是翻译闭包传递?我觉得直译不准确,意译应该是传递特性直至特性关闭,也符合本例中传递路径,寻找路径可达,直到可达路径不存在(即关闭). 2.代码很简单,里面有些概念直指核心原理,详细看注释. /** * @Author: xu.dm * @Date: 2019/7/3 11:41 * @Version: 1.0 * @Description: 传递闭包算法,本例中就是根据成对路径,查找和生成新的可达路径 * 例如:1-2,2-4这两对数据,可以得出新的可达路

Apache Flink 是什么?

架构 Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算.Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算. 接下来,我们来介绍一下 Flink 架构中的重要方面. 处理无界和有界数据 任何类型的数据都可以形成一种事件流.信用卡交易.传感器测量.机器日志.网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流. 数据可以被作为 无界 或者 有界 流来处理. 无界流 有定义流的开始,但没有定义流的结束.它们会无休止地产生

Flink入门(五)——DataSet Api编程指南

Apache Flink Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾批处理,流处理的特性,Flink可能正在颠覆整个大数据的生态. DataSet API 首先要想运行Flink,我们需要下载并解压Flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html 我们可以选择Flink与Scala结合版本,这里我们选择最新的1.9版本Apache

Flink 更新中

一.介绍 Flink分层组件栈 API支持 对Streaming数据类应用,提供DataStream API 对批处理类应用,提供DataSet API(支持Java/Scala) Libraries支持 支持机器学习(FlinkML) 支持图分析(Gelly) 支持关系数据处理(Table) 支持复杂事件处理(CEP) 整合支持 支持Flink on YARN 支持HDFS 支持来自Kafka的输入数据 支持Apache HBase 支持Hadoop程序 支持Tachyon 支持Elastic

为什么学习Apache Flink

Apache Flink 的简介 Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎.Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序.此外,Flink的运行时本身也支持迭代算法的执行. Flink :是一个数据处理框架.分布式数据处理引擎.有状态计算.支持有界数据计算与无界数据计算 Flink应用开发基础语义.多层API:数据流.数据集合.有状态.无状态.时间