hadoop 大矩阵相乘

package org.bigdata.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 大矩阵相乘
 *
 * @author wwhhf
 *
 */
public class MatrixMapReduce {

    public static class Node {
        private Integer i = null;
        private Integer j = null;
        private Long val = null;

        public Node(Integer i, Integer j, Long val) {
            super();
            this.i = i;
            this.j = j;
            this.val = val;
        }

        public Integer getI() {
            return i;
        }

        public Integer getJ() {
            return j;
        }

        public Long getVal() {
            return val;
        }

        @Override
        public String toString() {
            return "Node [i=" + i + ", j=" + j + ", val=" + val + "]";
        }
    }

    public static class MatrixComparator implements Comparator<Node> {

        @Override
        public int compare(Node o1, Node o2) {
            if (o1.getI() == o2.getI()) {
                return (int) (o1.getJ() - o2.getJ());
            } else {
                return (int) (o1.getI() - o2.getI());
            }
        }

    }

    public static class MatrixMapper extends
            Mapper<LongWritable, Text, Text, Text> {

        private int M = 0;
        private int N = 0;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().getName();

            String terms[] = value.toString().split(" ");
            String xy[] = terms[0].split(",");
            int x = Integer.valueOf(xy[0]);
            int y = Integer.valueOf(xy[1]);

            // 矩阵M*N
            if (fileName.startsWith("M")) {
                // 矩阵M
                for (int i = 1; i <= N; i++) {
                    context.write(new Text(x + "," + i),
                            new Text("M" + value.toString()));
                }
            } else {
                // 矩阵N
                for (int i = 1; i <= M; i++) {
                    context.write(new Text(i + "," + y),
                            new Text("N" + value.toString()));
                }
            }
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            Configuration config = context.getConfiguration();
            M = config.getInt("M", 0);
            N = config.getInt("N", 0);
        }

    }

    public static class MatrixReducer extends
            Reducer<Text, Text, Text, LongWritable> {

        private int M = 0;
        private int N = 0;

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<Node> MMatrix = new ArrayList<>();
            List<Node> NMatrix = new ArrayList<>();
            for (Text value : values) {
                String record = value.toString();
                String terms[] = record.substring(1).split(" ");
                String xy[] = terms[0].split(",");
                int x = Integer.valueOf(xy[0]);
                int y = Integer.valueOf(xy[1]);
                long val = Integer.valueOf(terms[1]);
                if (record.startsWith("M")) {
                    // 矩阵M
                    MMatrix.add(new Node(x, y, val));
                } else {
                    NMatrix.add(new Node(x, y, val));
                }
            }
            Comparator<Node> cmp = new MatrixComparator();
            Collections.sort(MMatrix, cmp);
            Collections.sort(NMatrix, cmp);
            System.out.println(MMatrix);
            System.out.println(NMatrix);
            if (NMatrix.size() == MMatrix.size()) {
                long sum = 0L;
                for (Node a : MMatrix) {
                    for (Node b : NMatrix) {
                        sum = sum + (a.getVal() * b.getVal());
                    }
                }
                context.write(key, new LongWritable(sum));
            }
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            Configuration config = context.getConfiguration();
            M = config.getInt("M", 0);
            N = config.getInt("N", 0);
        }
    }

    public static void main(String[] args) {
        Scanner cin = new Scanner(System.in);
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            cfg.setInt("M", cin.nextInt());
            cfg.setInt("K", cin.nextInt());
            cfg.setInt("N", cin.nextInt());

            Job job = Job.getInstance(cfg);
            job.setJobName("Matrix");
            job.setJarByClass(MatrixMapReduce.class);

            // mapper
            job.setMapperClass(MatrixMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // reducer
            job.setReducerClass(MatrixReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);

            FileInputFormat.addInputPath(job, new Path("/matrix"));
            FileOutputFormat.setOutputPath(job, new Path("/matrix_out/"));

            System.exit(job.waitForCompletion(true) ? 0 : 1);

        } catch (IllegalStateException | IllegalArgumentException
                | ClassNotFoundException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
时间: 2024-08-05 03:50:12

hadoop 大矩阵相乘的相关文章

MapReduce实现矩阵相乘

前言 MapReduce打开了并行计算的大门,让我们个人开发者有了处理大数据的能力.但想用好MapReduce,把原来单机算法并行化,也不是一件容易事情.很多的时候,我们需要从单机算法能否矩阵化去思考,所以矩阵操作就变成了算法并行化的基础. 矩阵介绍 为了方便说明,举两个矩阵作为示例: , 容易看出,是一个矩阵,是一个矩阵,我们能够算出: 这三个矩阵当然不大,但作为示例,它们将暂时享受大矩阵的待遇. 矩阵稀疏存储 理论上,在一个文件中存储4000万*4000万的矩阵当然是可以的,但非常失之优雅,

【CUDA并行编程之四】矩阵相乘

前面介绍了基本的Cuda编程的相关知识,那么这一篇在此基础之上来看看GPU在处理数据计算上的高效能,我们拿矩阵相乘来作为例子. 1.CPU上执行矩阵相乘以及性能. 在CPU上进行矩阵相乘运算的代码: mat_mul.cc: <span style="font-family:Microsoft YaHei;font-size:18px;">//a[i]*b[i] + c[i] = d[i] #include<iostream> #include<vector

稀疏矩阵的三元组顺序表存储及矩阵相乘算法小结

稀疏矩阵的三元组顺序表存储及矩阵相乘算法小结 巧若拙(欢迎转载,但请注明出处:http://blog.csdn.net/qiaoruozhuo) 一:稀疏矩阵的三元组顺序表数据结构 typedef int ElemType; typedef struct { intx, y;  //该非零元素的行下标和列下标 ElemTypee; //该非零元素的值 } Triple; typedef struct { Tripledata[MAXSIZE]; //非零元素三元组顺序表 intmu, nu, t

矩阵相乘优化算法实现讲解

矩阵相乘      什么是矩阵? 在数学中,矩阵(Matrix)是指纵横排列的二维数据表格,最早来自于方程组的系数及常数所构成的方阵.这一概念由19世纪英国数学家凯利首先提出. 矩阵是高等代数学中的常见工具,也常见于统计分析等应用数学学科中.并且在ACM竞赛,有很多涉及到矩阵知识的题.许多算法都会结合矩阵来处理,而比较具有代表性的矩阵算法有:矩阵快速幂.高斯消元等等. 例如下面的图片就是一个矩阵: 上述矩阵是一个 4 × 3 矩阵: 某矩阵 A 的第 i 行第 j 列,或 I , j位,通常记为

MapReduce实现二阶矩阵相乘

二阶矩阵相乘公式 上例中的C11=A11*B11+A12*B21+A13*B31=1*3+0*2+2*1=5.C12=A11*B12+A12*B22+A13*B32=1*1+0*1+2*0=1 分析 因为分布式计算的特点,需要找到相互独立的计算过程,以便能够在不同的节点上进行计算而不会彼此影响.根据矩 阵乘法的公式,C中各个元素的计算都是相互独立的,即各个cij在计算过程中彼此不影响.这样的话,在Map阶段可 以把计算所需要的元素都集中到同一个key中,然后,在Reduce阶段就可以从中解析出各

超人学院Hadoop大数据资源分享

超人学院Hadoop大数据资源分享-----数据结构与算法(java解密版) http://yunpan.cn/cw5avckz8fByJ  访问密码 b0f8 更多精彩内容请关注:http://bbs.superwu.cn 关注超人学院微信二维码:  关注超人学院java免费学习交流群: 

Hadoop实战视频教程完整版 完整的Hadoop大数据视频教程

分享一套迪伦老师的完整的Hadoop实战视频教程,教程从学习Hadoop需要的数据库.Java和Linux基础讲起,非常适合零基础的学员,课程最后结合了实战项目演练,理论结合实战,深入浅出,绝对是当前最为完整.实战的Hadoop教程. <Hadoop大数据零基础高端实战培训系列配文本挖掘项目(七大亮点.十大目标)> 课程讲师:迪伦 课程分类:大数据 适合人群:初级 课时数量:230课时 用到技术:部署Hadoop集群 涉及项目:京东商城.百度.阿里巴巴 咨询QQ:1337192913(小公子)

ambari 搭建hadoop大数据平台系列4-配置ambari-server

ambari 搭建hadoop大数据平台系列4-配置ambari-server,分为三部分: 官网:  https://docs.hortonworks.com/HDPDocuments/Ambari-2.4.2.0/bk_ambari-installation/content/download_the_ambari_repo_lnx7.html 安装ambari-server  配置ambari-server  命令;ambari-server setup 启动ambari-server 命令

Hadoop大数据赵强老师免费公开课招募啦~~~~

Hadoop大数据公开课招募啦~~~ 赵强老师免费公开课 l  时间:2017年03月14号晚19:30-21:00 n  19:30-20:30讲述Hadoop的背景知识,包括:大数据背景.数据仓库.Hadoop的思想来源(Google的三大思想) n  20:30-21:00答疑 l  讲师简介 13年IT行业从业经历,清华大学计算机硕士,曾在BEA.甲骨文.摩托罗拉等世界500强公司担任高级软件架构师或咨询顾问等要职,精通大数据.数据库.中间件技术和Java技术. 讲师详情链接:http: