天天看点

SparkMLlib之六:Clustering

Spark支持下面的模型

  1. K-means
  2. Gaussian mixture
  3. Power iteration clustering(PIC)
  4. Latent Dirichlet allocation(LDA)
  5. Bisecting k-means
  6. Streaming k-means

K-means

k-means是最常用的已知聚类数的聚类算法,spark.mllib的实现还包括了k-means++的并行变体kmeans||.它有下面几个参数

  • k 聚类数
  • maxiterations最大递归数
  • initializationMode是否随机初始聚类中心,还是通过kmeans||来选择初始聚类中心
  • runs 跑几次kmeans算法(这是因为kmeans可能取到局部最优解)
  • initializationSteps决定kmeans||算法步数
  • epsilon决定我们认为kmeans算法收敛的距离阈值
  • initialModel是一个可选的用于初始化的聚类中心集

    最优的k通常为组内平方误差和的拐角点

下面是个k=2的聚类例子

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cach
e()
// Cluster the data into two classes using KMeans
val numClusters = 
val numIterations = 
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
// Save and load model
clusters.save(sc, "myModelPath")
val sameModel = KMeansModel.load(sc, "myModelPath")
           

Gaussian mixture (GMM)

参数集

  • k 聚类数
  • convergenceTol是我们定义的算法收敛最大对数似然变化值
  • maxiterations最大递归数
  • initialModel是一个EM算法的开始点,如果省略,则随机在样本数据中选择
import org.apache.spark.mllib.clustering.GaussianMixture
import org.apache.spark.mllib.clustering.GaussianMixtureModel
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDoubl
e))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK().run(parsedData)
// Save and load model
gmm.save(sc, "myGMMModel")
val sameModel = GaussianMixtureModel.load(sc, "myGMMModel")
// output parameters of max-likelihood model
for (i <-  until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
           

Power iteration clustering(PIC)

参数集

  1. k
  2. maxIterations
  3. initializationMode This can be either “random”, which is the default,

    to use a random vector as vertex properties, or “

    degree” to use normalized sum similarities

import org.apache.spark.mllib.clustering.{PowerIterationClustering, Power
IterationClusteringModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/pic_data.txt")
val similarities = data.map { line =>
val parts = line.split(' ')
(parts().toLong, parts().toLong, parts().toDouble)
}
// Cluster the data into two classes using PowerIterationClustering
val pic = new PowerIterationClustering()
.setK()
.setMaxIterations()
val model = pic.run(similarities)
model.assignments.foreach { a =>
println(s"${a.id} -> ${a.cluster}")
}
// Save and load model
model.save(sc, "myModelPath")
val sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")
           

Latent Dirichlet allocation(LDA)

  • k
  • optimizer LDA模型的优化器,选项:EMLDAOptimizer,onlineLDAOptimizer
  • docConcentration:Dirichlet参数
  • topicConcentration
  • maxIterations
  • checkpointInterval

    所有LDA模型支持

  • describeTopics
  • topicsMatrix

    ExpectationMaximization

    Implemented in

    EMLDAOptimizer

    and

    DistributedLDAModel

    .

    参数

  • docConcentration
  • topicConcentratio
  • maxIteration

DistributedLDAModel支持

  • topTopicsPerDocument
  • topDocumentsPerTopic
  • logPrior:
  • logLikelihood:

Online Variational Bayes

Implemented in

OnlineLDAOptimizer

and

LocalLDAModel

.

参数

  • docConcentration
  • topicConcentration
  • maxIterations

另外OnlineLDAOptimizer还支持下面的参数

  • miniBatchFraction: Fraction of corpus sampled and used at each iteration
  • optimizeDocConcentration: If set to true, performs maximum-likelihood estimation of the

    hyperparameter docConcentration (aka alpha) after each minibatch and sets the optimized

    docConcentration in the returned LocalLDAModel

  • tau0 and kappa: Used for learning-rate decay, which is computed by where

    is the current number of iterations

OnlineLDAOptimizer produces a LocalLDAModel, which only stores the inferred topics. A

LocalLDAModel supports:

  • logLikelihood(documents)
  • logPerplexity(documents)
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK().run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + "
words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(, )) {
print("Topic " + topic + ":")
for (word <- Range(, ldaModel.vocabSize)) { print(" " + topics(word, topic));
}
println()
}
// Save and load model.
ldaModel.save(sc, "myLDAModel")
val sameModel = DistributedLDAModel.load(sc, "myLDAModel")
           

Bisecting k-means

Bisecting k-means通常比一般的Kmeans要快

它是一种层次聚类

参数有:

  • k,叶子聚类数(默认4)
  • maxIterations: the max number of k-means iterations to split clusters (default: 20)
  • minDivisibleClusterSize: the minimum number of points (if >= 1.0) or the minimum proportion

    of points (if < 1.0) of a divisible cluster (default: 1)

  • seed: a random seed (default: hash value of the class name)
import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// Clustering the data into  clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK()
val model = bkm.run(data)
// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
println(s"Cluster Center ${idx}: ${center}")
}
           

Streaming k-means

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans

val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
val numDimensions = 
val numClusters = 
val model = new StreamingKMeans()
.setK(numClusters)
.setDecayFactor()
.setRandomCenters(numDimensions, )

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
           

继续阅读