Spark支援下面的模型
- K-means
- Gaussian mixture
- Power iteration clustering(PIC)
- Latent Dirichlet allocation(LDA)
- Bisecting k-means
- Streaming k-means
K-means
k-means是最常用的已知聚類數的聚類算法,spark.mllib的實作還包括了k-means++的并行變體kmeans||.它有下面幾個參數
- k 聚類數
- maxiterations最大遞歸數
- initializationMode是否随機初始聚類中心,還是通過kmeans||來選擇初始聚類中心
- runs 跑幾次kmeans算法(這是因為kmeans可能取到局部最優解)
- initializationSteps決定kmeans||算法步數
- epsilon決定我們認為kmeans算法收斂的距離門檻值
-
initialModel是一個可選的用于初始化的聚類中心集
最優的k通常為組内平方誤差和的拐角點
下面是個k=2的聚類例子
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cach
e()
// Cluster the data into two classes using KMeans
val numClusters =
val numIterations =
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
// Save and load model
clusters.save(sc, "myModelPath")
val sameModel = KMeansModel.load(sc, "myModelPath")
Gaussian mixture (GMM)
參數集
- k 聚類數
- convergenceTol是我們定義的算法收斂最大對數似然變化值
- maxiterations最大遞歸數
- initialModel是一個EM算法的開始點,如果省略,則随機在樣本資料中選擇
import org.apache.spark.mllib.clustering.GaussianMixture
import org.apache.spark.mllib.clustering.GaussianMixtureModel
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDoubl
e))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK().run(parsedData)
// Save and load model
gmm.save(sc, "myGMMModel")
val sameModel = GaussianMixtureModel.load(sc, "myGMMModel")
// output parameters of max-likelihood model
for (i <- until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
Power iteration clustering(PIC)
參數集
- k
- maxIterations
-
initializationMode This can be either “random”, which is the default,
to use a random vector as vertex properties, or “
degree” to use normalized sum similarities
import org.apache.spark.mllib.clustering.{PowerIterationClustering, Power
IterationClusteringModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/pic_data.txt")
val similarities = data.map { line =>
val parts = line.split(' ')
(parts().toLong, parts().toLong, parts().toDouble)
}
// Cluster the data into two classes using PowerIterationClustering
val pic = new PowerIterationClustering()
.setK()
.setMaxIterations()
val model = pic.run(similarities)
model.assignments.foreach { a =>
println(s"${a.id} -> ${a.cluster}")
}
// Save and load model
model.save(sc, "myModelPath")
val sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")
Latent Dirichlet allocation(LDA)
- k
- optimizer LDA模型的優化器,選項:EMLDAOptimizer,onlineLDAOptimizer
- docConcentration:Dirichlet參數
- topicConcentration
- maxIterations
-
checkpointInterval
所有LDA模型支援
- describeTopics
- topicsMatrix
ExpectationMaximization
Implemented in
andEMLDAOptimizer
DistributedLDAModel
.
參數
- docConcentration
- topicConcentratio
- maxIteration
DistributedLDAModel支援
- topTopicsPerDocument
- topDocumentsPerTopic
- logPrior:
- logLikelihood:
Online Variational Bayes
Implemented in
OnlineLDAOptimizer
and
LocalLDAModel
.
參數
- docConcentration
- topicConcentration
- maxIterations
另外OnlineLDAOptimizer還支援下面的參數
- miniBatchFraction: Fraction of corpus sampled and used at each iteration
-
optimizeDocConcentration: If set to true, performs maximum-likelihood estimation of the
hyperparameter docConcentration (aka alpha) after each minibatch and sets the optimized
docConcentration in the returned LocalLDAModel
-
tau0 and kappa: Used for learning-rate decay, which is computed by where
is the current number of iterations
OnlineLDAOptimizer produces a LocalLDAModel, which only stores the inferred topics. A
LocalLDAModel supports:
- logLikelihood(documents)
- logPerplexity(documents)
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK().run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + "
words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(, )) {
print("Topic " + topic + ":")
for (word <- Range(, ldaModel.vocabSize)) { print(" " + topics(word, topic));
}
println()
}
// Save and load model.
ldaModel.save(sc, "myLDAModel")
val sameModel = DistributedLDAModel.load(sc, "myLDAModel")
Bisecting k-means
Bisecting k-means通常比一般的Kmeans要快
它是一種層次聚類
參數有:
- k,葉子聚類數(預設4)
- maxIterations: the max number of k-means iterations to split clusters (default: 20)
-
minDivisibleClusterSize: the minimum number of points (if >= 1.0) or the minimum proportion
of points (if < 1.0) of a divisible cluster (default: 1)
- seed: a random seed (default: hash value of the class name)
import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// Clustering the data into clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK()
val model = bkm.run(data)
// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
println(s"Cluster Center ${idx}: ${center}")
}
Streaming k-means
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans
val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
val numDimensions =
val numClusters =
val model = new StreamingKMeans()
.setK(numClusters)
.setDecayFactor()
.setRandomCenters(numDimensions, )
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()