You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/mllib-clustering.md
+73-1Lines changed: 73 additions & 1 deletion
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -34,7 +34,7 @@ a given dataset, the algorithm returns the best clustering result).
34
34
**initializationSteps* determines the number of steps in the k-means\|\| algorithm.
35
35
**epsilon* determines the distance threshold within which we consider k-means to have converged.
36
36
37
-
## Examples
37
+
###Examples
38
38
39
39
<divclass="codetabs">
40
40
<divdata-lang="scala"markdown="1">
@@ -153,3 +153,75 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap
153
153
section of the Spark
154
154
Quick Start guide. Be sure to also include *spark-mllib* to your build file as
155
155
a dependency.
156
+
157
+
## Streaming clustering
158
+
159
+
When data arrive in a stream, we may want to estimate clusters dynamically, updating them as new data arrive. MLlib provides support for streaming KMeans clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch KMeans update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using:
Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$` is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; with `$\alpha$=0` only the most recent data will be used. This is analogous to an expontentially-weighted moving average.
169
+
170
+
### Examples
171
+
172
+
This example shows how to estimate clusters on streaming data.
Then we make an input stream of vectors for training, as well as one for testing. We assume a StreamingContext `ssc` has been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. For this example, we use vector data.
188
+
189
+
{% highlight scala %}
190
+
191
+
val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
192
+
val testData = ssc.textFileStream("/testing/data/dir").map(Vectors.parse)
193
+
194
+
{% endhighlight %}
195
+
196
+
We create a model with random clusters and specify the number of clusters to find
197
+
198
+
{% highlight scala %}
199
+
200
+
val numDimensions = 3
201
+
val numClusters = 2
202
+
val model = new StreamingKMeans()
203
+
.setK(numClusters)
204
+
.setDecayFactor(1.0)
205
+
.setRandomWeights(numDimensions)
206
+
207
+
{% endhighlight %}
208
+
209
+
Now register the streams for training and testing and start the job, printing the predicted cluster assignments on new data points as they arrive.
210
+
211
+
{% highlight scala %}
212
+
213
+
model.trainOn(trainingData)
214
+
model.predictOn(testData).print()
215
+
216
+
ssc.start()
217
+
ssc.awaitTermination()
218
+
219
+
{% endhighlight %}
220
+
221
+
As you add new text files with data the cluster centers will update. Each data point should be formatted as `[x1, x2, x3]`. Anytime a text file is placed in `/training/data/dir`
222
+
the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions. With new data, the cluster centers will change.
0 commit comments