MapReduce实现线性回归

1. 软件版本:

Hadoop2.6.0(IDEA中源码编译使用CDH5.7.3,对应Hadoop2.6.0),集群使用原生Hadoop2.6.4,JDK1.8,Intellij IDEA 14 。源码可以在https://github.com/fansy1990/linear_regression 下载。

2. 实现思路:

本博客实现的是一元一次线性方程,等于是最简单的线性方程了,采用的是Couresa里面的机器学习中的大数据线性方程的方法来更新参数值的(即随机梯度下降方法,当然也可以使用批量梯度下降方法来实现,只是在LinearRegressionJob中实现的不一样而已),如果对随机梯度下降或者批量梯度下降不了解的话,需要先去看看。下面是实现思路:

2.1 Shuffle Data(打乱数据):

如果要采用随机梯度下降的话,那么需要保持原始数据随机,所以这里的第一步就是随机打乱原始数据。采用的思路是:在Mapper端输出随机值作为key,输出当前记录作为value,在Reducer端直接遍历每个key的所有values,直接输出value以及NullWritable.get即可。

在这里添加一个额外的参数randN,这个参数表示在Mapper端随机值时,多少个原始数据使用同一个随机值,如果randN为1,那么每个原始数据都会使用一个随机值作为key,如果randN为2,那么每两个原始数据使用一个随机值,如果randN为0或小于0,那么所有数据都使用同一个随机值(注意,这个时候其实在Reducer端的values其实也是乱序的,请读者思考为什么?)。

其Mapper中map核心实现如下所示

 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if(randN <= 0) { // 如果randN 比0小,那么不再次打乱数据
            context.write(randFloatKey,value);
            return ;
        }
        if(++countI >= randN){// 如果randN等于1,那么每次随机的值都是不一样的
            randFloatKey.set(random.nextFloat());
            countI =0;
        }
        context.write(randFloatKey,value);
    }

2.2 Linear Regression(线性回归):

线性回归采用随机梯度下降的方法来更新theta0和theta1 (只实现了一元一次,所以只有两个参数),每个Mapper都会使用同样的初始化参数(theta0=1和theta1=0),在每个Mapper中使用自己的数据来更新theta0和theta1,更新的公式为:

theta0 = theta0 -alpha*(h(x)-y)x
theta1 = theta1 -alpha*(h(x)-y)x

其中,h(x)= theta0 + theta1 * x ;同时,需要注意这里的更新是同步更新,其核心代码如下所示:

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        float[] xy = Utils.str2float(value.toString().split(splitter));
        float x = xy[0];
        float y = xy[1];
        // 同步更新 theta0 and theta1
        lastTheta0 = theta0;

        theta0 -=  alpha *(theta0+theta1* x - y) * x; // 保持theta0 和theta1 不变
        theta1 -= alpha *(lastTheta0 + theta1 * x -y) * x;// 保持theta0 和theta1 不变
    }

然后在每个Mapper的cleanup函数中直接输出theta的参数值即可

protected void cleanup(Context context) throws IOException, InterruptedException {
        theta0_1.set(theta0 + splitter + theta1);
        context.write(theta0_1,NullWritable.get());
    }

由于在每个mapper中已经更新了theta的各个参数值,所以不需要使用reducer即可;同时,由于测试数据比较小,所以设置mapreduce.input.fileinputformat.split.maxsize的大小,读者需要根据自己实际数据的大小来设置,其Driver类核心代码如下所示:

conf.setLong("mapreduce.input.fileinputformat.split.maxsize",700L);// 获取多个mapper;
job.setNumReduceTasks(0);

2.3 Combine Theta (合并参数值):

在2.2步中已经算得了各个theta值,那么应该如何来合并这些求得得各个theta值呢?可以直接用平均值么?对于一元一次线性回归是可以直接使用平均值来作为最终合并后的theta值的,但是针对其他的线性回归(特指有多个局部最小值的线性回归,这样求得的多个theta值合并就会有问题了)。

如果只是使用平均值的话,那么在2.2步其实加一个Reducer就可以完成了,这里提出了一种另外的方式来合并theta值,即采用各个theta值的全局误差作为参数来进行加权。所以,在Mapper的setup中会读取2.2中的多个输出theta值,在map函数中针对各个原始数据求其误差,输出到reducer的数据为theta值和其误差;其核心代码如下所示:

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        float[] xy = Utils.str2float(value.toString().split(splitter));
        for(int i =0;i<thetas.size() ;i++){
            // error = (theta0 + theta1 * x - y) ^2
            thetaErrors[i] += (thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) *
                    (thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) ;
            thetaNumbers[i]+= 1;
        }
    }
protected void cleanup(Context context) throws IOException, InterruptedException {
        for(int i =0;i<thetas.size() ;i++){
            theta.set(thetas.get(i));
            floatAndLong.set(thetaErrors[i],thetaNumbers[i]);
            context.write(theta,floatAndLong);
        }
    }

在Reducer端,直接针对每个键(也就是theta值)把各个误差加起来,在cleanup函数中采用加权来合并theta值,其核心代码如下所示:

protected void reduce(FloatAndFloat key, Iterable<FloatAndLong> values, Context context) throws IOException, InterruptedException {
        float sumF = 0.0f;
        long sumL = 0L ;
        for(FloatAndLong value:values){
            sumF +=value.getSumFloat();
            sumL += value.getSumLong();
        }
        theta_error.add(new float[]{key.getTheta0(),key.getTheta1(), (float)Math.sqrt((double)sumF / sumL)});
        logger.info("theta:{}, error:{}", new Object[]{key.toString(),Math.sqrt(sumF/sumL)});
    }
protected void cleanup(Context context) throws IOException, InterruptedException {
        // 如何加权?
        // 方式1:如果误差越小,那么说明权重应该越大;
        // 方式2:直接平均值
        float [] theta_all = new float[2];
        if("average".equals(method)){
//            theta_all = theta_error.get(0);
            for(int i=0;i< theta_error.size();i++){
                theta_all[0] += theta_error.get(i)[0];
                theta_all[1] += theta_error.get(i)[1];
            }
            theta_all[0] /= theta_error.size();
            theta_all[1] /= theta_error.size();
        } else {
            float sumErrors = 0.0f;
            for(float[] d:theta_error){
                sumErrors += 1/d[2];
            }
            for(float[] d: theta_error){
                theta_all[0] += d[0] * 1/d[2] /sumErrors;
                theta_all[1] += d[1] * 1/d[2] /sumErrors;
            }
        }
        context.write(new FloatAndFloat(theta_all),NullWritable.get());
    }

2.4 验证

这里的验证指的是使用2.3步求的得合并后的theta值求全局误差,由于在2.3步也求得了各个theta值的全局误差,所以这里可以对比看下哪个theta值最优;其Mapper可以直接使用2.3步骤的mapper,而reducer也类似2.3步骤中的reducer,只是最终输出就不需要cleanup中的合并了。

3. 运行结果:

3.1 shuffle Job

测试类:

public static void main(String[] args) throws Exception {
        args = new String[]{
                "hdfs://master:8020/user/fanzhe/linear_regression.txt",
                "hdfs://master:8020/user/fanzhe/shuffle_out",
                "1"
        }    ;
        ToolRunner.run(Utils.getConf(),new ShuffleDataJob(),args);

    }

原始数据:(可以在源码中的resource目录中下载 linear_regression.txt)

6.1101,17.592
5.5277,9.1302
8.5186,13.662
。。。

Shuffle输出:

每次输出应该都是不一样的(使用了随机数),可以看到数据确实被随机化了。

3.2 Linear Regression

测试类:

public static void main(String[] args) throws Exception {
//        <input> <output> <theta0;theta1;alpha> <splitter> // 注意第三个参数使用分号分割
        args = new String[]{
                "hdfs://master:8020/user/fanzhe/shuffle_out",
                "hdfs://master:8020/user/fanzhe/linear_regression",
                "1;0;0.01",
                ","
        }    ;
        ToolRunner.run(Utils.getConf(),new LinearRegressionJob(),args);
    }

查看输出结果:

从输出结果可以看出,两个结果相差还是很大的,这个主要是因为测试数据比较少的原因,如果数据比较大,并且被很好的shuffle的话,那么这两个值应该是相差不大的;

3.3 Combine Theta

测试类:

public static void main(String[] args) throws Exception {
//        <input> <output> <theta_path> <splitter> <average|weight>
        args = new String[]{
                "hdfs://master:8020/user/fanzhe/shuffle_out",
                "hdfs://master:8020/user/fanzhe/single_linear_regression_error",
                "hdfs://master:8020/user/fanzhe/linear_regression",
                ",",
                "weight"
        }    ;
        ToolRunner.run(Utils.getConf(),new SingleLinearRegressionError(),args);
    }

这里设置的合并theta值的方式使用加权,读者可以设置为average,从而使用平均值;

结果:

根据日志可以看出theta参数值选取下面的一个,其误差会比较小,合并后的参数值为:

看到其结果是在两个theta参数值之间。

如果是平均值,那么其输出结果为:

3.4 验证

验证测试类:

public static void main(String[] args) throws Exception {
//        <input> <output> <theta_path> <splitter>
        args = new String[]{
                "hdfs://master:8020/user/fanzhe/shuffle_out",
                "hdfs://master:8020/user/fanzhe/last_linear_regression_error",
                "hdfs://master:8020/user/fanzhe/single_linear_regression_error",
                ",",
        }    ;
        ToolRunner.run(Utils.getConf(),new LastLinearRegressionError(),args);
    }

输出结果为:

从结果中可以看出,合并后的结果并没有原来其中的一个Theta参数组值的效果好,不过这个也可能和数据量有关,根据输出结果,也可以把合并后的theta值以及合并前的对比,然后使用最优的theta来作为最后的输出。

如果是平均值,那么其输出结果为:

从上面的结果可以看到加权的组合比平均值的组合效果好点;

4. 总结

1. 改算法只针对有一个局部最优解(也就是全局最优解)的情况,否则,在合并阶段会有问题;

2. 通过小量数据验证,使用合并后的效果并没有使用合并前的最优解的效果好,这个可能是数据问题,待验证;

3. 通过很直观的想象,一般情况下使用加权组合要比平均组好效果好;

分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990

时间: 2024-10-01 22:41:08

MapReduce实现线性回归的相关文章

Spark MLlib Linear Regression线性回归算法

1.Spark MLlib Linear Regression线性回归算法 1.1 线性回归算法 1.1.1 基础理论 在统计学中,线性回归(Linear Regression)是利用称为线性回归方程的最小平方函数对一个或多个自变量和因变量之间关系进行建模的一种回归分析.这种函数是一个或多个称为回归系数的模型参数的线性组合. 回归分析中,只包括一个自变量和一个因变量,且二者的关系可用一条直线近似表示,这种回归分析称为一元线性回归分析.如果回归分析中包括两个或两个以上的自变量,且因变量和自变量之间

MapReduce原理及其主要实现平台分析

原文:http://www.infotech.ac.cn/article/2012/1003-3513-28-2-60.html 亢丽芸, 王效岳, 白如江 摘要 关键词: MapReduce; 实现平台; Hadoop; Phoenix; Disco; Mars Analysis of MapReduce Principle and Its Main Implementation Platforms Kang Liyun, Wang Xiaoyue, Bai Rujiang Abstract

用scikit-learn和pandas学习线性回归

对于想深入了解线性回归的童鞋,这里给出一个完整的例子,详细学完这个例子,对用scikit-learn来运行线性回归,评估模型不会有什么问题了. 1. 获取数据,定义问题 没有数据,当然没法研究机器学习啦.:) 这里我们用UCI大学公开的机器学习数据来跑线性回归. 数据的介绍在这: http://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant 数据的下载地址在这: http://archive.ics.uci.edu/ml/ma

MapReduce实现手机上网流量分析

一.问题背景 现在的移动刚一通话就可以在网站上看自己的通话记录,以前是本月只能看上一个月.不过流量仍然是只能看上一月的. 目的就是找到用户在一段时间内的上网流量. 本文并没有对时间分组. 二.数据集分析 可以看出实际数据集并不是每个字段都有值,但是还好,完整地以tab隔开了,数据格式还是不错的,我们需要的上行下行数据都有,没有缺失值.其实这个需要在程序中处理,如果不在的话 该怎么办. 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196

mapreduce和spark的原理及区别

Mapreduce和spark是数据处理层两大核心,了解和学习大数据必须要重点掌握的环节,根据自己的经验和大家做一下知识的分享. 首先了解一下Mapreduce,它最本质的两个过程就是Map和Reduce,Map的应用在于我们需要数据一对一的元素的映射转换,比如说进行截取,进行过滤,或者任何的转换操作,这些一对一的元素转换就称作是Map:Reduce主要就是元素的聚合,就是多个元素对一个元素的聚合,比如求Sum等,这就是Reduce. Mapreduce是Hadoop1.0的核心,Spark出现

基于 Eclipse 的 MapReduce 开发环境搭建

文 / vincentzh 原文连接:http://www.cnblogs.com/vincentzh/p/6055850.html 上周末本来要写这篇的,结果没想到上周末自己环境都没有搭起来,运行起来有问题的呢,拖到周一才将问题解决掉.刚好这周也将之前看的内容复习了下,边复习边码代码理解,印象倒是很深刻,对看过的东西理解也更深入了. 目录 1.概述 2.环境准备 3.插件配置 4.配置文件系统连接 5.测试连接 6.代码编写与执行 7.问题梳理 7.1 console 无日志输出问题 7.2

mongodb aggregate and mapReduce

Aggregate MongoDB中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果.有点类似sql语句中的 count(*) 语法如下: db.collection.aggregate() db.collection.aggregate(pipeline,options) db.runCommand({ aggregate: "<collection>", pipeline: [ <stage>, <...&g

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

我们知道,MapReduce有三层调度模型,即Job-->Task-->TaskAttempt,并且: 1.通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业.JobSetup Task等复杂的情况这里不做考虑): 2.每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次. 而TaskImpl中存在一个成员变

初步掌握MapReduce的架构及原理

目录 1.MapReduce定义 2.MapReduce来源 3.MapReduce特点 4.MapReduce实例 5.MapReduce编程模型 6.MapReduce 内部逻辑 7.MapReduce架构 8.MapReduce框架的容错性 9.MapReduce资源组织方式 1.MapReduce 定义 Hadoop 中的 MapReduce是一个使用简单的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错式并行处理TB级别的数据集 2.MapR