hadoop下实现kmeans算法——一个mapreduce的实现方法

写mapreduce程序实现kmeans算法,我们的思路可能是这样的

1. 用一个全局变量存放上一次迭代后的质心

2. map里,计算每个质心与样本之间的距离,得到与样本距离最短的质心,以这个质心作为key,样本作为value,输出

3. reduce里,输入的key是质心,value是其他的样本,这时重新计算聚类中心,将聚类中心put到一个全部变量t中。

4. 在main里比较前一次的质心和本次的质心是否发生变化,如果变化,则继续迭代,否则退出。

本文的思路基本上是按照上面的步骤来做的,只不过有几个问题需要解决

1. hadoop是不存在自定义的全局变量的,所以上面定义一个全局变量存放质心的想法是实现不了的,所以一个替代的思路是将质心存放在文件中

2. 存放质心的文件在什么地方读取,如果在map中读取,那么可以肯定我们是不能用一个mapreduce实现一次迭代,所以我们选择在main函数里读取质心,然后将质心set到configuration中,configuration在map和reduce都是可读

3. 如何比较质心是否发生变化,是在main里比较么,读取本次质心和上一次质心的文件然后进行比较,这种方法是可以实现的,但是显得不够高富帅,这个时候我们用到了自定义的counter,counter是全局变量,在map和reduce中可读可写,在上面的思路中,我们看到reduce是有上次迭代的质心和刚刚计算出来的质心的,所以直接在reduce中进行比较就完全可以,如果没发生变化,counter加1。只要在main里比较获取counter的值就行了。

梳理一下,具体的步骤如下

1. main函数读取质心文件

2. 将质心的字符串放到configuration中

3. 在mapper类重写setup方法,获取到configuration的质心内容,解析成二维数组的形式,代表质心

4. mapper类中的map方法读取样本文件,跟所有的质心比较,得出每个样本跟哪个质心最近,然后输出<质心,样本>

5. reducer类中重新计算质心,如果重新计算出来的质心跟进来时的质心一致,那么自定义的counter加1

6. main中获取counter的值,看是否等于质心,如果不相等,那么继续迭代,否在退出

具体的实现如下

1. pom依赖

这个要跟集群的一致,因为如果不一致在计算其他问题的时候没有问题,但是在使用counter的时候会出现问题

java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.Counter, but class was expected

原因是:其实从2.0开始,org.apache.hadoop.mapreduce.Counter从1.0版本的class改为interface,可以看一下你导入的这个类是class还是interface,如果是class那么就是导包导入的不对,需要修改

2.
样本

实例样本如下

1,1
2,2
3,3
-3,-3
-4,-4
-5,-5

3.
质心

这个质心是从样本中随机找的

1,1
2,2

4. 代码实现

首先定义一个Center类,这个类主要存放了质心的个数k,还有两个从hdfs上读取质心文件的方法,一个用来读取初始的质心,这个实在文件中,还有一个是用来读取每次迭代后的质心文件夹,这个是在文件夹中的,代码如下

Center类

public class Center {

	protected static int k = 2;		//质心的个数

	/**
	 * 从初始的质心文件中加载质心,并返回字符串,质心之间用tab分割
	 * @param path
	 * @return
	 * @throws IOException
	 */
	public String loadInitCenter(Path path) throws IOException {

		StringBuffer sb = new StringBuffer();

		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FSDataInputStream dis = hdfs.open(path);
		LineReader in = new LineReader(dis, conf);
		Text line = new Text();
		while(in.readLine(line) > 0) {
			sb.append(line.toString().trim());
			sb.append("\t");
		}

		return sb.toString().trim();
	}

	/**
	 * 从每次迭代的质心文件中读取质心,并返回字符串
	 * @param path
	 * @return
	 * @throws IOException
	 */
	public String loadCenter(Path path) throws IOException {

		StringBuffer sb = new StringBuffer();

		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FileStatus[] files = hdfs.listStatus(path);

		for(int i = 0; i < files.length; i++) {

			Path filePath = files[i].getPath();
			if(!filePath.getName().contains("part")) continue;
			FSDataInputStream dis = hdfs.open(filePath);
			LineReader in = new LineReader(dis, conf);
			Text line = new Text();
			while(in.readLine(line) > 0) {
				sb.append(line.toString().trim());
				sb.append("\t");
			}
		}

		return sb.toString().trim();
	}
}

KmeansMR类

public class KmeansMR {

	private static String FLAG = "KCLUSTER";

	public static class TokenizerMapper
    extends Mapper<Object, Text, Text, Text>{

		double[][] centers = new double[Center.k][];
		String[] centerstrArray = null;

		@Override
		public void setup(Context context) {

			//将放在context中的聚类中心转换为数组的形式,方便使用
			String kmeansS = context.getConfiguration().get(FLAG);
			centerstrArray = kmeansS.split("\t");
			for(int i = 0; i < centerstrArray.length; i++) {
				String[] segs = centerstrArray[i].split(",");
				centers[i] = new double[segs.length];
				for(int j = 0; j < segs.length; j++) {
					centers[i][j] = Double.parseDouble(segs[j]);
				}
			}
		}

		public void map(Object key, Text value, Context context
                 ) throws IOException, InterruptedException {

			String line = value.toString();
			String[] segs = line.split(",");
			double[] sample = new double[segs.length];
			for(int i = 0; i < segs.length; i++) {
				sample[i] = Float.parseFloat(segs[i]);
			}
			//求得距离最近的质心
			double min = Double.MAX_VALUE;
			int index = 0;
			for(int i = 0; i < centers.length; i++) {
				double dis = distance(centers[i], sample);
				if(dis < min) {
					min = dis;
					index = i;
				}
			}

			context.write(new Text(centerstrArray[index]), new Text(line));
		}
	}

	public static class IntSumReducer
    extends Reducer<Text,Text,NullWritable,Text> {

		Counter counter = null;

		public void reduce(Text key, Iterable<Text> values,
                    Context context
                    ) throws IOException, InterruptedException {

			double[] sum = new double[Center.k];
			int size = 0;
			//计算对应维度上值的加和,存放在sum数组中
			for(Text text : values) {
				String[] segs = text.toString().split(",");
				for(int i = 0; i < segs.length; i++) {
					sum[i] += Double.parseDouble(segs[i]);
				}
				size ++;
			}

			//求sum数组中每个维度的平均值,也就是新的质心
			StringBuffer sb = new StringBuffer();
			for(int i = 0; i < sum.length; i++) {
				sum[i] /= size;
				sb.append(sum[i]);
				sb.append(",");
			}

			/**判断新的质心跟老的质心是否是一样的*/
			boolean flag = true;
			String[] centerStrArray = key.toString().split(",");
			for(int i = 0; i < centerStrArray.length; i++) {
				if(Math.abs(Double.parseDouble(centerStrArray[i]) - sum[i]) > 0.00000000001) {
					flag = false;
					break;
				}
			}
			//如果新的质心跟老的质心是一样的,那么相应的计数器加1
			if(flag) {
				counter = context.getCounter("myCounter", "kmenasCounter");
				counter.increment(1l);
			}
			context.write(null, new Text(sb.toString()));
		}
	}

	public static void main(String[] args) throws Exception {

		Path kMeansPath = new Path("/dsap/middata/kmeans/kMeans");	//初始的质心文件
		Path samplePath = new Path("/dsap/middata/kmeans/sample");	//样本文件
		//加载聚类中心文件
		Center center = new Center();
		String centerString = center.loadInitCenter(kMeansPath);

		int index = 0;	//迭代的次数
		while(index < 5) {

			Configuration conf = new Configuration();
			conf.set(FLAG, centerString);	//将聚类中心的字符串放到configuration中

			kMeansPath = new Path("/dsap/middata/kmeans/kMeans" + index);	//本次迭代的输出路径,也是下一次质心的读取路径

			/**判断输出路径是否存在,如果存在,则删除*/
			FileSystem hdfs = FileSystem.get(conf);
			if(hdfs.exists(kMeansPath)) hdfs.delete(kMeansPath);

			Job job = new Job(conf, "kmeans" + index);
			job.setJarByClass(KmeansMR.class);
			job.setMapperClass(TokenizerMapper.class);
			job.setReducerClass(IntSumReducer.class);
			job.setOutputKeyClass(NullWritable.class);
			job.setOutputValueClass(Text.class);
			job.setMapOutputKeyClass(Text.class);
		    job.setMapOutputValueClass(Text.class);
		    FileInputFormat.addInputPath(job, samplePath);
		    FileOutputFormat.setOutputPath(job, kMeansPath);
			job.waitForCompletion(true);

			/**获取自定义counter的大小,如果等于质心的大小,说明质心已经不会发生变化了,则程序停止迭代*/
			long counter = job.getCounters().getGroup("myCounter").findCounter("kmenasCounter").getValue();
			if(counter == Center.k)	System.exit(0);
			/**重新加载质心*/
			center = new Center();
			centerString = center.loadCenter(kMeansPath);

			index ++;
		}
		System.exit(0);
	}

	public static double distance(double[] a, double[] b) {

		if(a == null || b == null || a.length != b.length) return Double.MAX_VALUE;
		double dis = 0;
		for(int i = 0; i < a.length; i++) {
			dis += Math.pow(a[i] - b[i], 2);
		}
		return Math.sqrt(dis);
	}
}	

5. 结果

产生了两个文件夹,分别是第一次、第二次迭代后的聚类中心

最后的聚类中心的内容如下

hadoop下实现kmeans算法——一个mapreduce的实现方法

时间: 2024-11-12 10:19:54

hadoop下实现kmeans算法——一个mapreduce的实现方法的相关文章

hadoop在实现kmeans算法——一个mapreduce实施

写mapreduce程序实现kmeans算法.我们的想法可能是 1. 次迭代后的质心 2. map里.计算每一个质心与样本之间的距离,得到与样本距离最短的质心,以这个质心作为key,样本作为value,输出 3. reduce里,输入的key是质心,value是其它的样本,这时又一次计算聚类中心,将聚类中心put到一个所有变量t中. 4. 在main里比較前一次的质心和本次的质心是否发生变化,假设变化,则继续迭代,否则退出. 本文的思路基本上是依照上面的步骤来做的,仅仅只是有几个问题须要解决 1

机器学习实战ByMatlab(三)K-means算法

K-means算法属于无监督学习聚类算法,其计算步骤还是挺简单的,思想也挺容易理解,而且还可以在思想中体会到EM算法的思想. K-means 算法的优缺点: 1.优点:容易实现 2.缺点:可能收敛到局部最小值,在大规模数据集上收敛较慢 使用数据类型:数值型数据 以往的回归算法.朴素贝叶斯.SVM等都是有类别标签y的,因此属于有监督学习,而K-means聚类算法只有x,没有y 在聚类问题中,我们的训练样本是 其中每个Xi都是n维实数. 样本数据中没有了y,K-means算法是将样本聚类成k个簇,具

Hadoop:统计文本中单词熟练MapReduce程序

这是搭建hadoop环境后的第一个MapReduce程序: 基于python的脚本: 1 map.py文件,把文本的内容划分成单词: #!/bin/pythonimport sys for line in sys.stdin:    data_list = line.strip().split()    for i in range(0, len(data_list)):        print data_list[i]         2 reduce文件,把统计单词出现的次数: #!/bi

mahout下的Hadoop平台上的Kmeans算法实现

Mahout主要有协同过滤.聚类和分类三种算法的实现.现在我们就用Mahout来实现经典的Kmeans聚类算法. 首先,下载Hadoop和Mahout.因为Mahout有很多实现是运行在Hadoop上的,所以要先安装Hadoop. 具体怎么安装?简单地说一下: 1. 先安装SSH. ufw disable 关闭防火墙 cd .ssh/   进入ssh文件夹,没有的话,下面生产密钥的时候自动生成 ssh-keygen -t rsa 生成ssh密钥 cp id_rsa.pub authorized_

K-Means 算法的 Hadoop 实现

K-Means 算法的 Hadoop 实现 K-Means 算法简介 k-Means是一种聚类分析算法,它是一种无监督学习算法.它主要用来计算数据的聚集,将数据相近的点归到同一数据蔟.学习聚类时我们需要了解聚类与分类的区别,分类的类别是我们实现设定好的,而聚类的类别是通过计算得到的. 算法原理 维基百科的算法描述如下: 已知观测集 (x1,x2,x3,...,xn) ,其中每个观测都是一个d-维实向量,k-平均聚类要把这n个观测划分到k个集合中 (k≤n) ,使得组内平方和(WCSS withi

在Hadoop上运行基于RMM中文分词算法的MapReduce程序

原文:http://xiaoxia.org/2011/12/18/map-reduce-program-of-rmm-word-count-on-hadoop/ 在Hadoop上运行基于RMM中文分词算法的MapReduce程序 23条回复 我知道这个文章标题很“学术”化,很俗,让人看起来是一篇很牛B或者很装逼的论文!其实不然,只是一份普通的实验报告,同时本文也不对RMM中文分词算法进行研究.这个实验报告是我做高性能计算课程的实验里提交的.所以,下面的内容是从我的实验报告里摘录出来的,当作是我学

Hadoop学习---第三篇Hadoop的第一个Mapreduce程序

Mapreducer程序写了好几个了,但是之前一直都没有仔细的测试过本地运行和集群上运行的区别,今天写了一个Mapreduce程序,在此记录下来. 本地运行注意事项有以下几点: 1.本地必须配置好Hadoop的开发环境 2.在src里不加入配置文件运行,或者如果本地的src里有mapred-site.xml和yarn-site.xml配置文件,那么mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local 测试说明:sr

剑指offer系列——二维数组中,每行从左到右递增,每列从上到下递增,设计一个算法,找其中的一个数

题目:二维数组中,每行从左到右递增,每列从上到下递增,设计一个算法,找其中的一个数 分析: 二维数组这里把它看作一个矩形结构,如图所示: 1 2 8 9 2 4 9 12 4 7 10 13 6 8 11 15 在做这道题的时候我最先考虑的是每次比较对角线上的元素可能可以取得较好的效果, 以查找9为例, 从1(0,0)开始,1<10,可以得出结论,10在1的右侧或下侧: 1 2 8 9 2 4 9 12 4 7 10 13 6 8 11 15 然后看4(1,1),4<9, 1 2 8 9 2

mahout运行测试与kmeans算法解析

在使用mahout之前要安装并启动hadoop集群 将mahout的包上传至linux中并解压即可 mahout下载地址: 点击打开链接 mahout中的算法大致可以分为三大类: 聚类,协同过滤和分类 其中 常用聚类算法有:canopy聚类,k均值算法(kmeans),模糊k均值,层次聚类,LDA聚类等 常用分类算法有:贝叶斯,逻辑回归,支持向量机,感知器,神经网络等 下面将运行mahout中自带的example例子jar包来查看mahou是否能正确运行 练习数据下载地址: 点击打开链接 上面的