flink 批量梯度下降算法线性回归参数求解(Linear Regression with BGD(batch gradient descent) )

1、线性回归

假设线性函数如下:

假设我们有10个样本x1,y1),(x2,y2).....(x10,y10),求解目标就是根据多个样本求解theta0和theta1的最优值。

什么样的θ最好的呢?最能反映这些样本数据之间的规律呢?

为了解决这个问题,我们需要引入误差分析预测值与真实值之间的误差为最小。

2、梯度下降算法

梯度下降的场景:

梯度下降法的基本思想可以类比为一个下山的过程。假设这样一个场景:一个人被困在山上,需要从山上下来(i.e. 找到山的最低点,也就是山谷)。
但此时山上的浓雾很大,导致可视度很低。因此,下山的路径就无法确定,他必须利用自己周围的信息去找到下山的路径。这个时候,他就可以利用梯度下降算法来帮助自己下山。
具体来说就是,以他当前的所处的位置为基准,寻找这个位置最陡峭的地方,然后朝着山的高度下降的地方走,同理,如果我们的目标是上山,也就是爬到山顶,那么此时应该是朝着最陡峭的方向往上走。
然后每走一段距离,都反复采用同一个方法,最后就能成功的抵达山谷。

梯度下降实现:原理baidu,这里略过。下图来自internet,解释的非常到位。

α含义
α在梯度下降算法中被称作为学习率或者步长,意味着我们可以通过α来控制每一步走的距离,以保证不要步子跨的太大,就是不要走太快,错过了最低点。同时也要保证不要走的太慢,导致太阳下山了,还没有走到山下。所以α的选择在梯度下降法中往往是很重要的!α不能太大也不能太小,太小的话,可能导致迟迟走不到最低点,太大的话,会导致错过最低点!

梯度要乘以一个负号
梯度前加一个负号,就意味着朝着梯度相反的方向前进!梯度的方向实际就是函数在此点上升最快的方向!而我们需要朝着下降最快的方向走,自然就是负的梯度的方向,所以此处需要加上负号。

实现梯度下降,需要定义一个代价函数,比如:

这是均方误差代价函数

m是数据集中点的个数
二分之一(½)是一个常量,这样是为了在求梯度的时候,二次方乘下来就和这里的½抵消了,自然就没有多余的常数系数,方便后续的计算,同时对结果不会有影响
y 是数据集中每个点的真实y坐标的值

h 是预测函数,根据每一个输入x,根据Θ 计算得到预测的y值

即:

3、最终求解公式,代价函数是j=h(x)-y

4、代码实现

/**
 * @Author: xu.dm
 * @Date: 2019/7/16 21:52
 * @Description: 批量梯度下降算法解决线性回归 y = theta0 + theta1*x 的参数求解。
 * 本例实现一元数据求解二元参数。
 * BGD(批量梯度下降)算法的线性回归是一种迭代聚类算法,其工作原理如下:
 * BGD给出了数据集和目标集,试图找出适合目标集的数据集的最佳参数。
 * 在每次迭代中,算法计算代价函数(cost function)的梯度并使用它来更新所有参数。
 * 算法在固定次数的迭代后终止(如本实现中所示)通过足够的迭代,算法可以最小化成本函数并找到最佳参数。
 * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:
 * Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set.
 * In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters.
 * The algorithm terminates after a fixed number of iterations (as in this implementation)
 * With enough iteration, the algorithm can minimize the cost function and find the best parameters
 *
 * This implementation works on one-dimensional data. And find the two-dimensional theta.
 * It find the best Theta parameter to fit the target.
 *
 * <p>Input files are plain text files and must be formatted as follows:
 * <ul>
 * <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target).
 * Data points are separated by newline characters.<br>
 * For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
 * </ul>
 */
public class LinearRegression {
    public static void main(String args[]) throws Exception{
        final ParameterTool params = ParameterTool.fromArgs(args);

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(params);

        final int iterations = params.getInt("iterations", 10);

        // get input x data from elements
        DataSet<Data> data;
        if (params.has("input")) {
            // read data from CSV file
            data = env.readCsvFile(params.get("input"))
                    .fieldDelimiter(" ")
                    .includeFields(true, true)
                    .pojoType(Data.class);
        } else {
            System.out.println("Executing LinearRegression example with default input data set.");
            System.out.println("Use --input to specify file input.");
            data = LinearRegressionData.getDefaultDataDataSet(env);
        }

        // get the parameters from elements
        DataSet<Params> parameters = LinearRegressionData.getDefaultParamsDataSet(env);

        // set number of bulk iterations for SGD linear Regression
        IterativeDataSet<Params> loop = parameters.iterate(iterations);

        DataSet<Params> newParameters = data
                // compute a single step using every sample
                .map(new SubUpdate()).withBroadcastSet(loop,"parameters")
                // sum up all the steps
                .reduce(new UpdateAccumulator())
                // average the steps and update all parameters
                .map(new Update());

        // feed new parameters back into next iteration
        DataSet<Params> result = loop.closeWith(newParameters);

        // emit result
        if (params.has("output")) {
            result.writeAsText(params.get("output"));
            // execute program
            env.execute("Linear Regression example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            result.print();
        }

    }

    /**
     * A simple data sample, x means the input, and y means the target.
     */
    public static class Data implements Serializable{
        public double x, y;

        public Data() {}

        public Data(double x, double y) {
            this.x = x;
            this.y = y;
        }

        @Override
        public String toString() {
            return "(" + x + "|" + y + ")";
        }

    }

    /**
     * A set of parameters -- theta0, theta1.
     */
    public static class Params implements Serializable {

        private double theta0, theta1;

        public Params() {}

        public Params(double x0, double x1) {
            this.theta0 = x0;
            this.theta1 = x1;
        }

        @Override
        public String toString() {
            return theta0 + " " + theta1;
        }

        public double getTheta0() {
            return theta0;
        }

        public double getTheta1() {
            return theta1;
        }

        public void setTheta0(double theta0) {
            this.theta0 = theta0;
        }

        public void setTheta1(double theta1) {
            this.theta1 = theta1;
        }

        public Params div(Integer a) {
            this.theta0 = theta0 / a;
            this.theta1 = theta1 / a;
            return this;
        }

    }

    /**
     * Compute a single BGD type update for every parameters.
     * h(x) = theta0*X0 + theta1*X1,假设X0=1,则h(x) = theta0 + theta1*X1,即y = theta0 + theta1*x
     * 代价函数:j=h(x)-y,这里用的是比较简单的cost function
     * theta0 = theta0 - α∑(h(x)-y)
     * theta1 = theta1 - α∑((h(x)-y)*x)
     *
     */
    public static class SubUpdate extends RichMapFunction<Data, Tuple2<Params, Integer>> {

        private Collection<Params> parameters;

        private Params parameter;

        private int count = 1;

        /** Reads the parameters from a broadcast variable into a collection. */
        @Override
        public void open(Configuration parameters) throws Exception {
            this.parameters = getRuntimeContext().getBroadcastVariable("parameters");
        }

        @Override
        public Tuple2<Params, Integer> map(Data in) throws Exception {

            for (Params p : parameters){
                this.parameter = p;
            }
            //核心计算,对于y = theta0 + theta1*x 假定theta0乘以X0=1,所以theta0计算不用乘以in.x
            double theta0 = parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y);
            double theta1 = parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x);
            System.out.println("theta0: "+theta0+" , theta1: "+theta1);

            return new Tuple2<>(new Params(theta0, theta1), count);
        }
    }

    /**
     * Accumulator all the update.
     * */
    public static class UpdateAccumulator implements ReduceFunction<Tuple2<Params, Integer>> {

        @Override
        public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> val1, Tuple2<Params, Integer> val2) {

            double newTheta0 = val1.f0.theta0 + val2.f0.theta0;
            double newTheta1 = val1.f0.theta1 + val2.f0.theta1;
            Params result = new Params(newTheta0, newTheta1);
            return new Tuple2<>(result, val1.f1 + val2.f1);

        }
    }

    /**
     * Compute the final update by average them.
     */
    public static class Update implements MapFunction<Tuple2<Params, Integer>, Params> {

        @Override
        public Params map(Tuple2<Params, Integer> arg0) throws Exception {

            return arg0.f0.div(arg0.f1);

        }

    }
}

数据准备:

public class LinearRegressionData {
    // We have the data as object arrays so that we can also generate Scala Data
    // Sources from it.
    public static final Object[][] PARAMS = new Object[][] { new Object[] {
            0.0, 0.0 } };

    public static final Object[][] DATA = new Object[][] {
            new Object[] { 0.5, 1.0 }, new Object[] { 1.0, 2.0 },
            new Object[] { 2.0, 4.0 }, new Object[] { 3.0, 6.0 },
            new Object[] { 4.0, 8.0 }, new Object[] { 5.0, 10.0 },
            new Object[] { 6.0, 12.0 }, new Object[] { 7.0, 14.0 },
            new Object[] { 8.0, 16.0 }, new Object[] { 9.0, 18.0 },
            new Object[] { 10.0, 20.0 }, new Object[] { -0.08, -0.16 },
            new Object[] { 0.13, 0.26 }, new Object[] { -1.17, -2.35 },
            new Object[] { 1.72, 3.45 }, new Object[] { 1.70, 3.41 },
            new Object[] { 1.20, 2.41 }, new Object[] { -0.59, -1.18 },
            new Object[] { 0.28, 0.57 }, new Object[] { 1.65, 3.30 },
            new Object[] { -0.55, -1.08 } };

    public static DataSet<LinearRegression.Params> getDefaultParamsDataSet(ExecutionEnvironment env) {
        List<LinearRegression.Params> paramsList = new LinkedList<>();
        for (Object[] params : PARAMS) {
            paramsList.add(new LinearRegression.Params((Double) params[0], (Double) params[1]));
        }
        return env.fromCollection(paramsList);
    }

    public static DataSet<LinearRegression.Data> getDefaultDataDataSet(ExecutionEnvironment env) {
        List<LinearRegression.Data> dataList = new LinkedList<>();
        for (Object[] data : DATA) {
            dataList.add(new LinearRegression.Data((Double) data[0], (Double) data[1]));
        }
        return env.fromCollection(dataList);
    }
}

原文地址:https://www.cnblogs.com/asker009/p/11202480.html

时间: 2024-10-14 10:37:48

flink 批量梯度下降算法线性回归参数求解(Linear Regression with BGD(batch gradient descent) )的相关文章

线性回归模型(Linear Regression)及Python实现

线性回归模型(Linear Regression)及Python实现 http://www.cnblogs.com/sumai 1.模型 对于一份数据,它有两个变量,分别是Petal.Width和Sepal.Length,画出它们的散点图.我们希望可以构建一个函数去预测Sepal.Length,当我们输入Petal.Width时,可以返回一个预测的Sepal.Length.从散点图可以发现,可以用一条直线去拟合,这时我们可以构建一元线性回归模型:hθ(x) = θ0 + θ1x1 (x1= Pe

贝叶斯线性回归(Bayesian Linear Regression)

贝叶斯线性回归(Bayesian Linear Regression) 标签(空格分隔): 监督学习 @ author : [email protected] @ time : 2015-06-19 原文地址 贝叶斯线性回归Bayesian Linear Regression 原文地址 关于参数估计 极大似然估计 渐进无偏 渐进一致 最大后验估计 贝叶斯估计 贝叶斯估计核心问题 贝叶斯估计第一个重要元素 贝叶斯估计第二个重要元素 贝叶斯估计的增量学习 贝叶斯线性回归 贝叶斯线性回归的学习过程 贝

局部权重线性回归(Locally weighted linear regression)

在线性回归中,由于对参数个数选择的问题是在问题求解之前已经确定好的,因此参数的个数不能很好的确定,如果参数个数过少可能拟合度不好,产生欠拟合(underfitting)问题,或者参数过多,使得函数过于复杂产生过拟合问题(overfitting).因此本节介绍的局部线性回归(LWR)可以减少这样的风险. 欠拟合与过拟合 首先看下面的图 对于图中的一系列样本点,当我们采用y=θ0+θ1x形式的时候,我们可能产生最左边图形式的拟合曲线:如果我们采用y=θ0+θ1x+θ2x2时候,我们就可以产生中间的拟

随机梯度下降(stochastic gradient descent),批梯度下降(batch gradient descent),正规方程组(The normal equations)

对于一个线性回归问题有 为了使得预测值h更加接近实际值y,定义 J越小,预测更加可信,可以通过对梯度的迭代来逼近极值 批梯度下降(batch gradient descent)(the entire training set before taking a single step) 随机梯度下降(stochastic gradient descent)(gets θ "close" to the minimum much faster than batch gradient desce

梯度下降算法&amp;线性回归算法

**机器学习的过程说白了就是让我们编写一个函数使得costfunction最小,并且此时的参数值就是最佳参数值. 定义 假设存在一个代价函数 fun:\(J\left(\theta_{0}, \theta_{1}\right)\) 通过不断地调整\(\theta_{0}\)和\(\theta_{1}\)是函数\(J\left(\theta_{0}, \theta_{1}\right)\)取得最小值 梯度下降就是使J不断通过导数下降的一种算法 \(\theta_{j}:=\theta_{j}-\a

【转】 随机梯度下降(Stochastic gradient descent)和 批量梯度下降(Batch gradient descent )的公式对比、实现对比

梯度下降(GD)是最小化风险函数.损失函数的一种常用方法,随机梯度下降和批量梯度下降是两种迭代求解思路,下面从公式和实现的角度对两者进行分析,如有哪个方面写的不对,希望网友纠正. 下面的h(x)是要拟合的函数,J(theta)损失函数,theta是参数,要迭代求解的值,theta求解出来了那最终要拟合的函数h(theta)就出来了.其中m是训练集的记录条数,j是参数的个数. 1.批量梯度下降的求解思路如下: (1)将J(theta)对theta求偏导,得到每个theta对应的的梯度 (2)由于是

随机梯度下降(Stochastic gradient descent)和 批量梯度下降(Batch gradient descent )的公式对比

转自:here 注意下面说的全局最优是特殊的情况,一般还是梯度下降的方法还是很容易变成局部最优. 梯度下降(GD)是最小化风险函数.损失函数的一种常用方法,随机梯度下降和批量梯度下降是两种迭代求解思路,下面从公式的角度对两者进行分析. 下面的h(x)是要拟合的函数,J(theta)损失函数,theta是参数,要迭代求解的值,theta求解出来了那最终要拟合的函数h(theta)就出来了.其中m是训练集的记录条数,j是参数的个数. 1.批量梯度下降的求解思路如下: (1)将J(theta)对the

批量梯度下降法(Batch Gradient Descent)

批量梯度下降:在梯度下降的每一步中都用到了所有的训练样本. 思想:找能使代价函数减小最大的下降方向(梯度方向). ΔΘ = - α▽J      α:学习速率 梯度下降的线性回归 线性模型的代价函数: 对于更新项,为什么是 - α▽J : Θ如果在极值点右边,偏导大于0,则Θ要减去偏导的值(Θ偏大,减去一个正值) Θ如果在极值点左边,偏导小于0,则Θ要减去偏导的值(Θ偏小,减去一个负值) 实现方法:同步更新每个Θ 特点:梯度下降法即使α取很大也可以收敛到局部最小值. 随着算法的进行,越接近最小值

局部加权线性回归(Locally weighted linear regression)

首先我们来看一个线性回归的问题,在下面的例子中,我们选取不同维度的特征来对我们的数据进行拟合. 对于上面三个图像做如下解释: 选取一个特征,来拟合数据,可以看出来拟合情况并不是很好,有些数据误差还是比较大 针对第一个,我们增加了额外的特征,,这时我们可以看出情况就好了很多. 这个时候可能有疑问,是不是特征选取的越多越好,维度越高越好呢?所以针对这个疑问,如最右边图,我们用5揭多项式使得数据点都在同一条曲线上,为.此时它对于训练集来说做到了很好的拟合效果,但是,我们不认为它是一个好的假设,因为它不