数据价值-DataValues

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 954|回复: 0

[hadoop] Spark之K-Means聚类算法实现_Spark K-Means聚类算法

[复制链接]

1万

主题

1万

帖子

3万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
39797
发表于 2017-5-15 20:52:35 | 显示全部楼层 |阅读模式
初试Spark之K-Means聚类算法实现,
关键词:kmeans文本聚类spark、spark kmeans 聚类、spark实现kmeans算法、kmeans聚类算法原理、k means聚类算法,
自学Spark有将近一个月了,一直想找一个稍微复杂点的例子练练手,K均值聚类算法实现是个不错的例子,于是有了这篇博客。,
K均值聚类算法的原理本身很简单,大概思想就是:选取初始质心,根据这些质心将样本点聚类,聚类之后计算新的质心,然后重新将样本点聚类,不断循环重复“产生质心,重新聚类”这一过程,直至聚类效果不再发生明显变换。Hadoop的MapReduce计算框架虽然也能够实现这一算法,但是代码的实现过程实在是太恶心了,认识到Spark的简洁之后,义无反顾地投入到Spark的怀抱。,
写代码时没想太多,测试数据的样本点都是一维的,32个样本点分散在三个区间中,分别是0.2至0.8,1.8至2.4,3.4至4,如下图所示

,
下面是代码:,
,
[java] view plain copy

package kmeans_spark
import java.util.Random
import java.lang.Math._
import org.apache.spark.rdd.RDD
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vector
object KMeans {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName(“kmeans in Spark”)
    val sc = new SparkContext(conf)
    val input = args(0) //输入数据
    val output = args(1) //输出路径
    val k = args(2).toInt //聚类个数
    var s = 0d //聚类效果评价标准
    val shold = 0.1 //收敛阀值
    var s1 = Double.MaxValue
    var times = 0
    var readyForIteration = true
    val func1 = (x: (newVector, Int, Double), y: (newVector, Int, Double)) => {
      (x._1 + y._1, x._2 + y._2, x._3 + y._3)}
    val points = sc.textFile(input).map(line => {
          val temp = line.split(“\t”).map(ele => ele.toDouble)
          Vectors.dense(temp)}).cache() //将输入数据转换成RDD
    var centers = points.takeSample(false, k, new Random().nextLong()) //生成随机初始质心
    print(“————————————————\n”)
    print(“Print the centers for the next iteration: \n”)
    printCenters(centers)
    print(“Ready for the next iteration ? “+readyForIteration+“\n”)
    while (readyForIteration) {
      times += 1
      print(“Print the result of the clustering in iteration “+times+“\n”)
      val reClusteringResult = points.map(v => {
        val (centerId, minDistance) = getClosestCenter(centers, v)
        print(“Cluster id: “+centerId+“, “)
        print(“The point in the cluster “+centerId+“: “)
        v.toArray.foreach(x => print(x+“,”));print(“\n”)
        (centerId, (newVector(v), 1, minDistance))})
      val NewCentersRdd = reClusteringResult.reduceByKey(func1(_,_))
        .map(ele => {
        val centerId = ele._1
        val newCenter = (ele._2)._1 * (1d / ele._2._2)
        val sumOfDistance = (ele._2)._3
        (newCenter.point, sumOfDistance)})
      var s2 = getNewCenters(NewCentersRdd, centers)
      s = abs(s2 – s1)
      print(“s = “+s+“\n”)
      print(“————————————————\n”)
      print(“Print the centers for the next iteration: \n”)
      printCenters(centers)
      if (s
        readyForIteration = false
        reClusteringResult.map(ele => {
          var centerId = ele._1.toString()+“\t”
          val array = ele._2._1.point.toArray
          for (i 0 until array.length) {
            if (i == array.length – 1) {centerId = centerId + array(i).toString()}
            else {centerId = centerId + array(i).toString() + “\t”}
          }
          centerId
        }).saveAsTextFile(output) //如果算法收敛,输出结果
      }
      print(“to the next iteration ? “+readyForIteration+“\n”)
      s1 = s2
    }
    sc.stop()
  }
  case class newVector(point: Vector) {
    def *(a: Double): newVector = {
      var res = new Array[Double](point.size)
      for (i 0 until point.size) {
        res(i) = a*point.toArray.apply(i)
      }
      newVector(Vectors.dense(res))
    }
    def +(that: newVector): newVector = {
      var res = new Array[Double](point.size)
      for (i 0 until point.size) {
        res(i) = point.toArray.apply(i) + that.point.toArray.apply(i)
      }
      newVector(Vectors.dense(res))
    }
    def -(that: newVector): newVector = {
      this + (that * –1)
    }
    def pointLength(): Double = {
      var res = 0d
      for (i 0 until point.size) {
        res = res + pow(point.toArray.apply(i), 2)
      }
      res
    }
    def distanceTo(that: newVector): Double = {
      (this – that).pointLength()
    }
  }
  implicit def toNewVector(point: Vector) = newVector(point)
  def getClosestCenter(centers: Array[Vector], point: Vector): (Int, Double) = {
    var minDistance = Double.MaxValue
    var centerId = 0
    for (i 0 until centers.length) {
      if (point.distanceTo(centers(i))
        minDistance = point.distanceTo(centers(i))
        centerId = i
      }
    }
    (centerId, minDistance)
  }
  def getNewCenters(rdd: RDD[(Vector, Double)], centers: Array[Vector]): Double ={
    val res = rdd.take(centers.length)
    var sumOfDistance = 0d
    for (i 0 until centers.length) {
      centers(i) = res.apply(i)._1
      sumOfDistance += res.apply(i)._2
    }
    sumOfDistance
  }
  def printCenters(centers: Array[Vector]) {
    for (v
      v.toArray.foreach(x => print(x+“,”));print(“\n”)
    }
  }
}
[/ol]
,
将代码编译并打包成jar文件,启动Spark之后,在命令行环境下运行下图所示命令:,


以下是运行期间的部分截图:,

,
,

,
,

,
,

,
,

,
可以看出在示例代码上,算法收敛得很快,经过4次迭代之后就停止了,以下是聚类结果:,

,
可以看出在测试数据上,聚类结果很好,第一个字段是类别编号,第二个字段是点的坐标,可以看出点所处的区间和相应的类别是一致的,从代码量上看,确实要比用MapReduce框架实现要节省很多,主要还是得益于RDD上丰富的算子带来的强大的表达能力。,
转载请注明:数据分析 » Spark之K-Means聚类算法实现_Spark K-Means聚类算法
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|Archiver|手机版|DataValues ( 赣ICP备16006919号-3 点击这里给我发消息 DataValues

GMT+8, 2019-2-19 21:39 , Processed in 0.105429 second(s), 32 queries .

Powered by Discuz! X3.2

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表