聚类与分类
聚类(clustering)是指根据“物以类聚”的原理,将本身没有类别的样本聚集成不同的组,这样的一组数据对象的集合叫做簇,并且对每一个这样的簇进行描述的过程。
在分类( classification )中,对于目标数据库中存在哪些类是知道的,要做的就是将每一条记录分别属于哪一类标记出来。
聚类分析也称无监督学习, 因为和分类学习相比,聚类的样本没有标记,需要由聚类学习算法来自动确定。聚类分析是研究如何在没有训练的条件下把样本划分为若干类。
K-Means 算法
K-means算法, 也被称为k-平均或k-均值算法,是一种得到最广泛使用的聚类算法。 它是将各个聚类子集内的所有数据样本的均值作为该聚类的代表点,算法的主要思想是通过迭代过程把数据集划分为不同的类别,使得评价聚类性能的准则函数达到最优(平均误差准则函数E ),从而使生成的每个聚类(又称簇)内紧凑,类间独立。
, 该算法最常见的形式是采用被称为劳埃德算法(Lloyd algorithm)的迭代式改进探索法。劳埃德算法首先把输入点分成k个初始化分组,可以是随机的或者使用一些启发式数据。然后计算每组的中心点,根据中心点的位置把对象分到离它最近的中心,重新确定分组。继续重复不断地计算中心并重新分组,直到收敛,即对象不再改变分组(中心点位置不再改变)。
1. 计算两个类别,初始化两个类别
2. 每个种子点都计算与另外所有的白色点的距离
3. 对于白点pi,若种子点si距离pi最近,则si属于pi
4. 重新计算求中心点 xn=(x1+x2+x3)/3,yn=(y1+y2+y3)/3
迭代至类别无变化
求点群中心的算法
Minkowski Distance (闵可夫斯基距离)
Euclidean Distance(欧几里得距离)
Manhattan Distance(曼哈顿距离)
闵可夫斯基距离
闵氏距离不是一种距离,而是一组距离的定义
闵氏距离的定义:
两个n维变量a(x11,x12,…,x1n)与b(x21,x22,…,x2n)间的闵可夫斯基距离定义为:
其中p是一个变参数。
当p=1时,就是曼哈顿距离
当p=2时,就是欧氏距离
当p→∞时,就是切比雪夫距离根据变参数的不同,闵氏距离可以表示一类的距离。
欧几里得距离
欧氏距离是最易于理解的一种距离计算方法,源自欧氏空间中两点间的距离公式。
两个n维向量a(x11,x12,…,x1n)与 b(x21,x22,…,x2n)间的欧氏距离
曼哈顿距离
曼哈顿距离——两点在南北方向上的距离加上在东西方向上的距离,即d(i,j)=|xi-xj|+|yi-yj|。对于一个具有正南正北、正东正西方向规则布局的城镇街道,从一点到达另一点的距离正是在南北方向上旅行的距离加上在东西方向上旅行的距离因此曼哈顿距离又称为出租车距离
两个n维向量a(x11,x12,…,x1n)与b(x21,x22,…,x2n)间的曼哈顿距离
算法缺陷
在簇的平均值被定义的情况下才能使用,这对于处理符号属性的数据不适用。
必须事先给出k(要生成的簇的数目),而且对初值敏感,对于不同的初始值,可能会导致不同结果。经常发生得到次优划分的情况。解决方法是多次尝试不同的初始值。
它对于“躁声”和孤立点数据是敏感的,少量的该类数据能够对平均值产生极大的影响
K-Means++
先从我们的数据库随机挑个随机点当“种子点”。
对于每个点,我们都计算其和最近的一个“种子点”的距离D(x)并保存在一个数组里,然后把这些距离加起来得到Sum(D(x))。
然后,再取一个随机值,用权重的方式来取计算下一个“种子点”。这个算法的实现是,**先取一个能落在Sum(D(x))中的随机值Random,然后用Random -= D(x),直到其<=0,此时的点就是下一个“种子点”。
重复第(2)和第(3)步直到所有的K个种子点都被选出来。
进行K-Means算法。**
package org.bigdata.mapreduce.kmeans;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.bigdata.util.HadoopCfg;
/**
* 计算种子点
*
* @author wwhhf
*
*/
public class ClusterMapReduce {
/**
* 2,1,3,4,1,4
*
* @author wwhhf
*
*/
public static class ClusterMapper extends
Mapper<LongWritable, Text, Text, DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String terms[] = value.toString().split(",");
for (int i = 0, len = terms.length; i < len; i++) {
context.write(new Text("c" + (i + 1)), new DoubleWritable(
Double.valueOf(terms[i])));
}
}
}
public static class ClusterReducer extends
Reducer<Text, DoubleWritable, Text, Text> {
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
double maxx = Integer.MIN_VALUE;
double minx = Integer.MAX_VALUE;
for (DoubleWritable value : values) {
maxx = Math.max(maxx, value.get());
minx = Math.min(minx, value.get());
}
context.write(key, new Text("" + maxx + "," + minx + ","
+ ((maxx + minx) * 1.0 / 2)));
}
}
/**
* max c1,5 min c1,6 avg c1,4
*
* @author wwhhf
*
*/
public static class PointMapper extends
Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String terms[] = value.toString().split("\t");
String values[] = terms[1].split(",");
double maxx = Double.valueOf(values[0]);
double minx = Double.valueOf(values[1]);
double avg = Double.valueOf(values[2]);
context.write(new Text("max"),
new Text(key.toString() + "," + maxx));
context.write(new Text("min"),
new Text(key.toString() + "," + minx));
context.write(new Text("avg"), new Text(key.toString() + "," + avg));
}
}
/**
* max c1,5 min c1,6 avg c1,4
*
* @author wwhhf
*
*/
public static class PointReducer extends
Reducer<Text, Text, Text, NullWritable> {
private static int cnt = 0;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
TreeMap<String, Double> map = new TreeMap<>();
for (Text value : values) {
// value:c1,5
String terms[] = value.toString().split(",");
map.put(terms[0], Double.valueOf(terms[1]));
}
// write
cnt++;
StringBuffer sb = new StringBuffer();
for (Entry<String, Double> entry : map.entrySet()) {
System.out.println(entry.getKey() + " " + entry.getValue());
sb.append(entry.getValue()).append(",");
}
context.write(
new Text("c"
+ cnt
+ ":"
+ sb.toString().substring(0,
sb.toString().length() - 1)),
NullWritable.get());
}
}
private static final String JOB_NAME = "cluster";
public static void solve(String pathin, String pathout)
throws ClassNotFoundException, InterruptedException {
try {
Configuration cfg = HadoopCfg.getConfiguration();
Job job = Job.getInstance(cfg);
job.setJobName(JOB_NAME);
job.setJarByClass(ClusterMapReduce.class);
// mapper
job.setMapperClass(ClusterMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
// Reducer
job.setReducerClass(ClusterReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(pathin));
FileOutputFormat.setOutputPath(job, new Path(pathout));
job.waitForCompletion(true);
} catch (IllegalStateException | IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}
public static void genPoint(String pathin, String pathout)
throws ClassNotFoundException, InterruptedException {
try {
Configuration cfg = HadoopCfg.getConfiguration();
Job job = Job.getInstance(cfg);
job.setJobName(JOB_NAME);
job.setJarByClass(ClusterMapReduce.class);
// mapper
job.setMapperClass(PointMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// reducer
job.setReducerClass(PointReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(pathin));
FileOutputFormat.setOutputPath(job, new Path(pathout));
job.waitForCompletion(true);
} catch (IllegalStateException | IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws ClassNotFoundException,
InterruptedException {
solve("/kmeans", "/kmeans_out");
genPoint("/kmeans_out", "/kmeans_out1");
}
}
package org.bigdata.mapreduce.kmeans;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.bigdata.util.DistanceUtil;
import org.bigdata.util.HadoopCfg;
import org.bigdata.util.HadoopUtil;
/**
* K-Means
*
* @author wwhhf
*
*/
public class KMeansMapReduce {
private static final String JOB_NAME = "kmeans";
private static final String RES_PATH = "/kmeans_res";
private static final String POINTS = "kmeans.txt";
// 中心点 name -> points
private static Map<String, Vector<Double>> points = new HashMap<>();
public static void initPoint(String pathin, String filename)
throws IOException {
List<String> lines = HadoopUtil.lslFile(pathin, filename);
for (String line : lines) {
String terms[] = line.toString().split(":");
points.put(terms[0], DistanceUtil.getVector(terms[1].split(",")));
}
}
public static class KMeansMapper extends
Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
if (POINTS.equals(fileName)) {
String terms[] = value.toString().split(",");
Vector<Double> p = DistanceUtil.getVector(terms);
String center = null;
double minx = Double.MAX_VALUE;
for (Entry<String, Vector<Double>> entry : points.entrySet()) {
try {
double dis = DistanceUtil.getEuclideanDisc(
entry.getValue(), p);
if (dis < minx) {
minx = dis;
center = entry.getKey();
}
} catch (Exception e) {
e.printStackTrace();
}
}
context.write(new Text(center), value);
}
}
}
public static class KMeansReducer extends
Reducer<Text, Text, Text, NullWritable> {
// 多路输出
private MultipleOutputs<Text, NullWritable> output = null;
private static int cnt = 1;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
//多文件输出
output = new MultipleOutputs<Text, NullWritable>(context);
cnt++;
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
super.cleanup(context);
output.close();
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<Vector<Double>> list = new ArrayList<>();
int num = 0;
for (Text value : values) {
String terms[] = value.toString().split(",");
num = terms.length;
Vector<Double> p = DistanceUtil.getVector(terms);
list.add(p);
output.write(
new Text(value + " is belong to " + key.toString()),
NullWritable.get(), RES_PATH + cnt);
}
String point = DistanceUtil.getAvg(list, num);
context.write(new Text(key.toString() + ":" + point),
NullWritable.get());
}
}
public static void solve(String pointsin, String pathout)
throws ClassNotFoundException, InterruptedException {
try {
Configuration cfg = HadoopCfg.getConfiguration();
Job job = Job.getInstance(cfg);
job.setJobName(JOB_NAME);
job.setJarByClass(ClusterMapReduce.class);
// mapper
job.setMapperClass(KMeansMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// reducer
job.setReducerClass(KMeansReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(pointsin));
FileOutputFormat.setOutputPath(job, new Path(pathout));
FileOutputFormat.setOutputPath(job, new Path(pathout));
job.waitForCompletion(true);
} catch (IllegalStateException | IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws ClassNotFoundException,
InterruptedException, IOException {
String path = "/kmeans";
String pathout = "/kmeans_point";
String tmp_pathin = pathout;
String point_filename = "part-r-00000";
for (int i = 1; i <= 3; i++) {
initPoint(tmp_pathin, point_filename);
String tmp_pathout = pathout + i;
solve(path, tmp_pathout);
tmp_pathin = tmp_pathout;
}
}
}
输入
c1:1,1,1,1,0,4
c2:5,5,5,5,5,5
c3:2.5,2.5,2.5,2.5,2.5,2.5
2,1,3,4,1,4
3,2,5,2,3,5
4,4,4,3,1,5
2,3,1,2,0,3
4,0,1,1,1,5
1,2,3,5,0,1
5,3,2,2,1,3
3,4,1,1,2,1
0,2,3,3,1,4
0,2,5,0,2,2
2,1,4,5,4,3
4,1,4,3,3,2
0,3,2,2,0,1
1,3,1,0,3,0
3,3,4,2,1,3
3,5,3,5,3,2
2,3,2,3,0,1
4,3,3,2,2,3
1,4,3,4,3,1
3,2,3,0,2,5
1,0,2,1,0,4
4,4,3,5,5,4
5,1,4,3,5,2
3,4,4,4,1,1
2,2,4,4,5,5
5,2,0,3,1,3
1,1,3,1,1,3
2,4,2,0,3,5
1,1,1,1,0,4
1,1,4,1,3,0