flink Transitive Closure算法,实现寻找新的可达路径

1、Transitive Closure是翻译闭包传递?我觉得直译不准确,意译应该是传递特性直至特性关闭,也符合本例中传递路径,寻找路径可达,直到可达路径不存在(即关闭)。

2、代码很简单,里面有些概念直指核心原理,详细看注释。

/**
 * @Author: xu.dm
 * @Date: 2019/7/3 11:41
 * @Version: 1.0
 * @Description: 传递闭包算法,本例中就是根据成对路径,查找和生成新的可达路径
 * 例如:1-2,2-4这两对数据,可以得出新的可达路径1-4。
 *
 * 迭代算法步骤:
 * 1、获取成对数据集edges,里面包括路径对,比如 1->2,2->4,2->5等,如果是无向边,还可以反转数据集union之前的数据。本例按有向边处理
 * 2、生成迭代头paths可迭代数据集
 * 3、用paths和原始数据集edges做join连接,找出头尾相连的数据nextPaths,即类似1->2,2->4这种,然后生成新的路径1->4。
 * 4、新的路径集nextPaths和迭代头数据集paths进行并集操作,即union操作,生成新的nextPaths,这个时候它包含了新旧两种数据
 *    在这里总是nextPaths>=paths
 * 5、去重操作,第一次迭代不会重复,但是第二次迭代开始就会有重复数据,通过groupBy全字段,去分组第一条即可达到去重效果
 * 6、以上核心迭代体完成,后面需要形成迭代闭环,确定迭代退出条件
 * 7、退出原理:每次迭代完成后,需要检查是否新的路径产生,如果没有则表示迭代可以结束
 * 8、可达寻路步骤完成后,通过对比nextPaths和paths,如果nextPaths>paths,表示有新路径生成,需要继续迭代,直到nextPaths=paths
 * 9、这里有一个迭代重要的概念,paths和nextPaths是通过迭代闭环不断更新的
 * 10、本例中迭代头和迭代尾的数据流向:paths->nextPaths->paths.
 * 11、本例通过bulk迭代方式实现了delta迭代的效果
 **/
public class TransitiveClosureNaive {
    public static void main(String args[]) throws Exception {
        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        final int maxIterations = params.getInt("iterations", 10);

        DataSet<Tuple2<Long, Long>> edges;
        if(params.has("edges")){
            edges = env.readCsvFile(params.get("edges")).fieldDelimiter(" ").types(Long.class, Long.class);
        }else {
            System.out.println("Executing TransitiveClosureNaive example with default edges data set.");
            System.out.println("Use --edges to specify file input.");
            edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);
        }

        IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations);

        DataSet<Tuple2<Long,Long>> nextPaths = paths
                .join(edges)
                .where(1)
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    /**
                     left: Path (z,x) - 通过z可达x
                     right: Edge (x,y) - 通过x可达y
                     out: Path (z,y) - 最终输出z可达y
                     */
                    @Override
                    public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
                        return new Tuple2<>(left.f0,right.f1);
                    }
                })
                //类似withForwardedFieldsFirst这种无损转发语义声明,是可选项,有助于提高flink优化器生成更高效的执行计划
                //转发第一个输入Tuple2<Long, Long>中的第一个字段,转发第二个输入Tuple2<Long, Long>中的第二个字段
                .withForwardedFieldsFirst("0").withForwardedFieldsSecond("1")
                //合并原有的路径
                .union(paths)
                //这里的groupBy两个fields是打算给reduceGroup去重使用
                .groupBy(0,1)
                .reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    @Override
                    public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
                        out.collect(values.iterator().next());
                    }
                })
                .withForwardedFields("0;1");

        //对比paths以及新生成的nextPaths,获取nextPaths中比paths多的路径
        //从上面的算子可以得知,nextPaths总是大于或等于paths
        DataSet<Tuple2<Long,Long>> newPaths = paths
                .coGroup(nextPaths)
                .where(0).equalTo(0)
                .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    Set<Tuple2<Long, Long>> prevSet = new HashSet<>();
                    @Override
                    public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
                        for(Tuple2<Long,Long> prev:prevPaths){
                            prevSet.add(prev);
                        }
                        //检查有没有新的数据产生,如果有就继续迭代,否则迭代终止
                        for(Tuple2<Long,Long> next:nextPaths){
                            if(!prevSet.contains(next)){
                               out.collect(next);
                            }
                        }
                    }
                }).withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");

        //迭代尾,在这里形成闭环,nextPaths是反馈通道,nextPaths数据集被重新传递到迭代头paths里,然后通过迭代算子不断执行。
        //newPaths为空或者迭代达到最大次数,迭代结束。newPaths这里表示是否有新的路径。
        //数据集迭代环:paths->nextPaths->paths
        DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths, newPaths);

        // emit result
        if (params.has("output")) {
            transitiveClosure.writeAsCsv(params.get("output"), "\n", " ");

            // execute program explicitly, because file sinks are lazy
            env.execute("Transitive Closure Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            transitiveClosure.print();
        }
    }
}

3、原始数据

public class ConnectedComponentsData {
    public static final long[] VERTICES  = new long[] {
            1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};

    public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env) {
        List<Long> verticesList = new LinkedList<Long>();
        for (long vertexId : VERTICES) {
            verticesList.add(vertexId);
        }
        return env.fromCollection(verticesList);
    }

    public static final Object[][] EDGES = new Object[][] {
            new Object[]{1L, 2L},
            new Object[]{2L, 3L},
            new Object[]{2L, 4L},
            new Object[]{3L, 5L},
            new Object[]{6L, 7L},
            new Object[]{8L, 9L},
            new Object[]{8L, 10L},
            new Object[]{5L, 11L},
            new Object[]{11L, 12L},
            new Object[]{10L, 13L},
            new Object[]{9L, 14L},
            new Object[]{13L, 14L},
            new Object[]{1L, 15L},
            new Object[]{16L, 1L}
    };

    public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {

        List<Tuple2<Long, Long>> edgeList = new LinkedList<Tuple2<Long, Long>>();
        for (Object[] edge : EDGES) {
            edgeList.add(new Tuple2<Long, Long>((Long) edge[0], (Long) edge[1]));
        }
        return env.fromCollection(edgeList);
    }

}

原文地址:https://www.cnblogs.com/asker009/p/11131069.html

时间: 2024-10-10 16:46:27

flink Transitive Closure算法,实现寻找新的可达路径的相关文章

STL_算法(21)_ STL_算法_填充新值

STL_算法_填充新值 fill(b, e, v) fill(b, n, v) generate(b, n, p) generate_n(b, n, p) #include<iostream> #include<algorithm> #include<vector> #include<list> // #include<string> using namespace std; int main() { list<string> sli

【经典算法】寻找最长01字串(转自待字闺中)

这两天在微信公众号“待字闺中”中看到一个经典的面试算法,寻找最长01字串,原题目是这么说的: 给定一个数组,数组中只包含0和1.请找到一个最长的子序列,其中0和1的数量是相同的. 例1:10101010 结果就是其本身.例2:1101000 结果是110100 这个题目,看起来比较简单,一些同学可能认为题目的描述符合动态规划的特征,然后就开始用动态规划解,努力找状态转移方程.这些同学的感觉,是很正确的.但,找状态转移方程,我们要对原来的数组进行变换一下. 原来是0和1的串,我们将0都换为-1.这

1124: 零起点学算法31——开始新的起程

1124: 零起点学算法31--开始新的起程 Time Limit: 1 Sec  Memory Limit: 64 MB   64bit IO Format: %lldSubmitted: 2861  Accepted: 1381[Submit][Status][Web Board] Description 很开心大家学到现在. 从今天开始大家要有一个新的起程. 我们开始接触循环了! 你能让计算机计算1+2+3+...+10吗? 你能让计算机计算1+2+3+.._100吗? 只要告诉计算机,你

关于Retinex图像增强算法的一些新学习。

最近再次看了一下IPOL网站,有一篇最近发表的文章,名字就是Multiscale Retinex,感觉自己对这个已经基本了解了,但还是进去看了看,也有一些收获,于是抽空把他们稍微整理了下,原始文章及其配套代码详见:http://www.ipol.im/pub/art/2014/107/. 之前在我的 带色彩恢复的多尺度视网膜增强算法(MSRCR)的原理.实现及应用 一文中已经较为详细的描述了Multiscale Retinex的基本原理和应用,这里就不再做过多的说明.为表述方便,还是贴出其基本的

十大基础实用算法之寻找最小(最大)的k个数-线性查找算法

例如:输入1,2,3,4,5,6,7,8这8个数字,则最小的4个数字为1,2,3,4. 思路1:最容易想到的方法:先对这个序列从小到大排序,然后输出前面的最小的k个数即可.如果选择快速排序法来进行排序,则时间复杂度:O(n*logn) 注:针对不同问题我们应该给出不同的思路,如果在应用中这个问题的规模不大,或者求解前k个元素的频率很高,或者k是不固定的.那么我们花费较多的时间对问题排序,在以后是使用中可以线性时间找到问题的解,总体来说,那么思路一的解法是最优的. 思路2:在思路1的基础上更进一步

靠“互联网+”寻找新活路 一双袜子里的传统产业转型升级

儿子PK父亲,父亲输得心服口服! 儿子陈嘉楠,今年24岁,大学毕业才两年:父亲陈军华,已经从事了20多年的袜子生产经营. 2013年,大学毕业的陈嘉楠辞去了银行的工作,回到老家浙江省诸暨市大唐镇在网上开设了专卖袜子的店铺. “家里人都反对,不看好.”陈嘉楠告诉记者.为此,陈嘉楠与父亲较上了劲. 两年过去,从一个人的单枪匹马到现在拥有三四十人的团队,陈嘉楠的楠森贸易有限公司不仅包揽销售父亲工厂生产的全部袜子,还自己设计款式,向外派发订单.陈嘉楠告诉记者,今年公司销售总额预计将超过2500万元,利润

[经典算法题]寻找数组中第K大的数的方法总结

[经典算法题]寻找数组中第K大的数的方法总结 责任编辑:admin 日期:2012-11-26 字体:[大 中 小] 打印复制链接我要评论 今天看算法分析是,看到一个这样的问题,就是在一堆数据中查找到第k个大的值. 名称是:设计一组N个数,确定其中第k个最大值,这是一个选择问题,当然,解决这个问题的方法很多,本人在网上搜索了一番,查找到以下的方式,决定很好,推荐给大家. 所谓"第(前)k大数问题"指的是在长度为n(n>=k)的乱序数组中S找出从大到小顺序的第(前)k个数的问题.

大数据落地的大挑战,明略数据在老环境中寻找新路径

"大数据"一词进入了十九大报告中,报告提出要"推动互联网.大数据.人工智能和实体经济深度融合".换句话说,就是要把大数据这样的先进技术,落地到实际的行业应用和业务场景中,对实体经济发挥真正的作用,创造实在的价值. 从2015年起,"大数据"一词就被移出了Gartner的新兴技术炒作曲线.然而,据有关统计,截至2017年8月初,我国大数据领域有183家企业获得融资,其中A轮81家.天使轮51家,也就是72%的大数据企业仍处于创业初期,商业模式仍有待

php glob()函数实现目录文件遍历与寻找与模式匹配的文件路径

采用PHP函数glob实现寻找与模式匹配的文件路径,主要讨论glob()函数的作用和用法,利用glob函数读取目录比其它的要快N倍,因为glob函数是内置函数处理起来自然要快. 一,函数原型 array glob ( string pattern [, int flags] ) 注意:glob()函数获取的是一个数组,其返回一个包含有匹配文件/目录的数组.如果出错返回FALSE 二,版本兼容 PHP 4 >= 4.3.0, PHP 5 三,函数的基础用法与实例 1,匹配目录下.txt后缀的文件