K-Means Hadoop MapReduce

聚类与分类

聚类(clustering)是指根据“物以类聚”的原理,将本身没有类别的样本聚集成不同的组,这样的一组数据对象的集合叫做簇,并且对每一个这样的簇进行描述的过程。

在分类( classification )中,对于目标数据库中存在哪些类是知道的,要做的就是将每一条记录分别属于哪一类标记出来。

聚类分析也称无监督学习, 因为和分类学习相比,聚类的样本没有标记,需要由聚类学习算法来自动确定。聚类分析是研究如何在没有训练的条件下把样本划分为若干类。

K-Means 算法

K-means算法, 也被称为k-平均或k-均值算法,是一种得到最广泛使用的聚类算法。 它是将各个聚类子集内的所有数据样本的均值作为该聚类的代表点,算法的主要思想是通过迭代过程把数据集划分为不同的类别,使得评价聚类性能的准则函数达到最优(平均误差准则函数E ),从而使生成的每个聚类(又称簇)内紧凑,类间独立。

, 该算法最常见的形式是采用被称为劳埃德算法(Lloyd algorithm)的迭代式改进探索法。劳埃德算法首先把输入点分成k个初始化分组,可以是随机的或者使用一些启发式数据。然后计算每组的中心点,根据中心点的位置把对象分到离它最近的中心,重新确定分组。继续重复不断地计算中心并重新分组,直到收敛,即对象不再改变分组(中心点位置不再改变)。

1.  计算两个类别,初始化两个类别
2.  每个种子点都计算与另外所有的白色点的距离
3.  对于白点pi,若种子点si距离pi最近,则si属于pi
4. 重新计算求中心点 xn=(x1+x2+x3)/3,yn=(y1+y2+y3)/3
迭代至类别无变化

求点群中心的算法

Minkowski Distance (闵可夫斯基距离)

Euclidean Distance(欧几里得距离)

Manhattan Distance(曼哈顿距离)

闵可夫斯基距离

闵氏距离不是一种距离,而是一组距离的定义

闵氏距离的定义:

两个n维变量a(x11,x12,…,x1n)与b(x21,x22,…,x2n)间的闵可夫斯基距离定义为:

其中p是一个变参数。

当p=1时,就是曼哈顿距离

当p=2时,就是欧氏距离

当p→∞时,就是切比雪夫距离根据变参数的不同,闵氏距离可以表示一类的距离。

欧几里得距离

欧氏距离是最易于理解的一种距离计算方法,源自欧氏空间中两点间的距离公式。

两个n维向量a(x11,x12,…,x1n)与 b(x21,x22,…,x2n)间的欧氏距离

曼哈顿距离

曼哈顿距离——两点在南北方向上的距离加上在东西方向上的距离,即d(i,j)=|xi-xj|+|yi-yj|。对于一个具有正南正北、正东正西方向规则布局的城镇街道,从一点到达另一点的距离正是在南北方向上旅行的距离加上在东西方向上旅行的距离因此曼哈顿距离又称为出租车距离

两个n维向量a(x11,x12,…,x1n)与b(x21,x22,…,x2n)间的曼哈顿距离

算法缺陷

在簇的平均值被定义的情况下才能使用,这对于处理符号属性的数据不适用。

必须事先给出k(要生成的簇的数目),而且对初值敏感,对于不同的初始值,可能会导致不同结果。经常发生得到次优划分的情况。解决方法是多次尝试不同的初始值。

它对于“躁声”和孤立点数据是敏感的,少量的该类数据能够对平均值产生极大的影响

K-Means++

先从我们的数据库随机挑个随机点当“种子点”

对于每个点,我们都计算其和最近的一个“种子点”的距离D(x)并保存在一个数组里,然后把这些距离加起来得到Sum(D(x))

然后,再取一个随机值,用权重的方式来取计算下一个“种子点”。这个算法的实现是,**先取一个能落在Sum(D(x))中的随机值Random,然后用Random -= D(x),直到其<=0,此时的点就是下一个“种子点”。

重复第(2)和第(3)步直到所有的K个种子点都被选出来。

进行K-Means算法。**

package org.bigdata.mapreduce.kmeans;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.bigdata.util.HadoopCfg;

/**
 * 计算种子点
 *
 * @author wwhhf
 *
 */
public class ClusterMapReduce {

    /**
     * 2,1,3,4,1,4
     *
     * @author wwhhf
     *
     */
    public static class ClusterMapper extends
            Mapper<LongWritable, Text, Text, DoubleWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String terms[] = value.toString().split(",");
            for (int i = 0, len = terms.length; i < len; i++) {
                context.write(new Text("c" + (i + 1)), new DoubleWritable(
                        Double.valueOf(terms[i])));
            }
        }

    }

    public static class ClusterReducer extends
            Reducer<Text, DoubleWritable, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values,
                Context context) throws IOException, InterruptedException {
            double maxx = Integer.MIN_VALUE;
            double minx = Integer.MAX_VALUE;
            for (DoubleWritable value : values) {
                maxx = Math.max(maxx, value.get());
                minx = Math.min(minx, value.get());
            }
            context.write(key, new Text("" + maxx + "," + minx + ","
                    + ((maxx + minx) * 1.0 / 2)));
        }

    }

    /**
     * max c1,5 min c1,6 avg c1,4
     *
     * @author wwhhf
     *
     */
    public static class PointMapper extends
            Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String terms[] = value.toString().split("\t");
            String values[] = terms[1].split(",");
            double maxx = Double.valueOf(values[0]);
            double minx = Double.valueOf(values[1]);
            double avg = Double.valueOf(values[2]);
            context.write(new Text("max"),
                    new Text(key.toString() + "," + maxx));
            context.write(new Text("min"),
                    new Text(key.toString() + "," + minx));
            context.write(new Text("avg"), new Text(key.toString() + "," + avg));
        }

    }

    /**
     * max c1,5 min c1,6 avg c1,4
     *
     * @author wwhhf
     *
     */
    public static class PointReducer extends
            Reducer<Text, Text, Text, NullWritable> {

        private static int cnt = 0;

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            TreeMap<String, Double> map = new TreeMap<>();
            for (Text value : values) {
                // value:c1,5
                String terms[] = value.toString().split(",");
                map.put(terms[0], Double.valueOf(terms[1]));
            }
            // write
            cnt++;
            StringBuffer sb = new StringBuffer();
            for (Entry<String, Double> entry : map.entrySet()) {
                System.out.println(entry.getKey() + " " + entry.getValue());
                sb.append(entry.getValue()).append(",");
            }
            context.write(
                    new Text("c"
                            + cnt
                            + ":"
                            + sb.toString().substring(0,
                                    sb.toString().length() - 1)),
                    NullWritable.get());
        }

    }

    private static final String JOB_NAME = "cluster";

    public static void solve(String pathin, String pathout)
            throws ClassNotFoundException, InterruptedException {
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            Job job = Job.getInstance(cfg);
            job.setJobName(JOB_NAME);
            job.setJarByClass(ClusterMapReduce.class);

            // mapper
            job.setMapperClass(ClusterMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DoubleWritable.class);

            // Reducer
            job.setReducerClass(ClusterReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);

            FileInputFormat.addInputPath(job, new Path(pathin));
            FileOutputFormat.setOutputPath(job, new Path(pathout));

            job.waitForCompletion(true);

        } catch (IllegalStateException | IllegalArgumentException | IOException e) {
            e.printStackTrace();
        }
    }

    public static void genPoint(String pathin, String pathout)
            throws ClassNotFoundException, InterruptedException {
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            Job job = Job.getInstance(cfg);
            job.setJobName(JOB_NAME);
            job.setJarByClass(ClusterMapReduce.class);

            // mapper
            job.setMapperClass(PointMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // reducer
            job.setReducerClass(PointReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);

            FileInputFormat.addInputPath(job, new Path(pathin));
            FileOutputFormat.setOutputPath(job, new Path(pathout));

            job.waitForCompletion(true);

        } catch (IllegalStateException | IllegalArgumentException | IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws ClassNotFoundException,
            InterruptedException {
        solve("/kmeans", "/kmeans_out");
        genPoint("/kmeans_out", "/kmeans_out1");
    }

}
package org.bigdata.mapreduce.kmeans;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Vector;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.bigdata.util.DistanceUtil;
import org.bigdata.util.HadoopCfg;
import org.bigdata.util.HadoopUtil;

/**
 * K-Means
 *
 * @author wwhhf
 *
 */
public class KMeansMapReduce {

    private static final String JOB_NAME = "kmeans";
    private static final String RES_PATH = "/kmeans_res";
    private static final String POINTS = "kmeans.txt";

    // 中心点 name -> points
    private static Map<String, Vector<Double>> points = new HashMap<>();

    public static void initPoint(String pathin, String filename)
            throws IOException {
        List<String> lines = HadoopUtil.lslFile(pathin, filename);
        for (String line : lines) {
            String terms[] = line.toString().split(":");
            points.put(terms[0], DistanceUtil.getVector(terms[1].split(",")));
        }
    }

    public static class KMeansMapper extends
            Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            if (POINTS.equals(fileName)) {
                String terms[] = value.toString().split(",");
                Vector<Double> p = DistanceUtil.getVector(terms);
                String center = null;
                double minx = Double.MAX_VALUE;
                for (Entry<String, Vector<Double>> entry : points.entrySet()) {
                    try {
                        double dis = DistanceUtil.getEuclideanDisc(
                                entry.getValue(), p);
                        if (dis < minx) {
                            minx = dis;
                            center = entry.getKey();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                context.write(new Text(center), value);
            }

        }

    }

    public static class KMeansReducer extends
            Reducer<Text, Text, Text, NullWritable> {

        // 多路输出
        private MultipleOutputs<Text, NullWritable> output = null;

        private static int cnt = 1;

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
            //多文件输出
            output = new MultipleOutputs<Text, NullWritable>(context);
            cnt++;
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
            output.close();
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<Vector<Double>> list = new ArrayList<>();
            int num = 0;
            for (Text value : values) {
                String terms[] = value.toString().split(",");
                num = terms.length;
                Vector<Double> p = DistanceUtil.getVector(terms);
                list.add(p);
                output.write(
                        new Text(value + " is belong to " + key.toString()),
                        NullWritable.get(), RES_PATH + cnt);
            }
            String point = DistanceUtil.getAvg(list, num);
            context.write(new Text(key.toString() + ":" + point),
                    NullWritable.get());
        }

    }

    public static void solve(String pointsin, String pathout)
            throws ClassNotFoundException, InterruptedException {
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            Job job = Job.getInstance(cfg);
            job.setJobName(JOB_NAME);
            job.setJarByClass(ClusterMapReduce.class);

            // mapper
            job.setMapperClass(KMeansMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // reducer
            job.setReducerClass(KMeansReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);

            FileInputFormat.addInputPath(job, new Path(pointsin));
            FileOutputFormat.setOutputPath(job, new Path(pathout));
            FileOutputFormat.setOutputPath(job, new Path(pathout));

            job.waitForCompletion(true);

        } catch (IllegalStateException | IllegalArgumentException | IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws ClassNotFoundException,
            InterruptedException, IOException {
        String path = "/kmeans";
        String pathout = "/kmeans_point";
        String tmp_pathin = pathout;
        String point_filename = "part-r-00000";
        for (int i = 1; i <= 3; i++) {
            initPoint(tmp_pathin, point_filename);
            String tmp_pathout = pathout + i;
            solve(path, tmp_pathout);
            tmp_pathin = tmp_pathout;
        }
    }

}

输入

c1:1,1,1,1,0,4
c2:5,5,5,5,5,5
c3:2.5,2.5,2.5,2.5,2.5,2.5
2,1,3,4,1,4
3,2,5,2,3,5
4,4,4,3,1,5
2,3,1,2,0,3
4,0,1,1,1,5
1,2,3,5,0,1
5,3,2,2,1,3
3,4,1,1,2,1
0,2,3,3,1,4
0,2,5,0,2,2
2,1,4,5,4,3
4,1,4,3,3,2
0,3,2,2,0,1
1,3,1,0,3,0
3,3,4,2,1,3
3,5,3,5,3,2
2,3,2,3,0,1
4,3,3,2,2,3
1,4,3,4,3,1
3,2,3,0,2,5
1,0,2,1,0,4
4,4,3,5,5,4
5,1,4,3,5,2
3,4,4,4,1,1
2,2,4,4,5,5
5,2,0,3,1,3
1,1,3,1,1,3
2,4,2,0,3,5
1,1,1,1,0,4
1,1,4,1,3,0
时间: 2024-10-13 22:40:35

K-Means Hadoop MapReduce的相关文章

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

Hadoop MapReduce编程 API入门系列之处理Excel通话记录(二十)

不多说,直接上代码. 与家庭成员之间的通话记录一份,存储在Excel文件中,如下面的数据集所示.我们需要基于这份数据,统计每个月每个家庭成员给自己打电话的次数,并按月份输出到不同文件夹. 2016-12-12 20:04:10,203 INFO [zhouls.bigdata.myMapReduce.ExcelContactCount.ExcelContactCount$ExcelMapper] - Map processing finished2016-12-12 20:04:10,203 I

Hadoop MapReduce编程 API入门系列之倒排索引(二十四)

不多说,直接上代码. 2016-12-12 21:54:04,509 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId=2016-12-12 21:54:05,166 WARN [org.apache.hadoop.mapreduce.JobSubmitter] - Hadoop command-line option

Hadoop MapReduce编程 API入门系列之wordcount版本5(九)

这篇博客,给大家,体会不一样的版本编程. 代码 package zhouls.bigdata.myMapReduce.wordcount1; import java.io.IOException; import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce

Hadoop Mapreduce 工作机制

一.Mapreduce 中的Combiner package com.gec.demo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WcCombiner extends Reducer<Text, IntWritable,

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

Hadoop MapReduce编程 API入门系列之FOF(Fund of Fund)(二十三)

不多说,直接上代码. 代码 package zhouls.bigdata.myMapReduce.friend; import org.apache.hadoop.io.Text; public class Fof extends Text{//自定义Fof,表示f1和f2关系 public Fof(){//无参构造 super(); } public Fof(String a,String b){//有参构造 super(getFof(a, b)); } public static Strin

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

Hadoop MapReduce Next Generation - Setting up a Single Node Cluster

Hadoop MapReduce Next Generation - Setting up a Single Node Cluster. Purpose This document describes how to set up and configure a single-node Hadoop installation so that you can quickly perform simple operations using Hadoop MapReduce and the Hadoop