Skip to content
124 changes: 62 additions & 62 deletions src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,96 +25,96 @@ import com.github.javacliparser._
import org.apache.spark.streamdm.core.specification.ExampleSpecification

/**
* Implements the StreamKM++ algorithm for data streams. StreamKM++ computes a
* small (weighted) sample of the stream by using <i>coresets</i>, and then uses
* it as an input to a k-means++ algorithm. It uses a data structure called
* <tt>BucketManager</tt> to handle the coresets.
*
* <p>It uses the following options:
* <ul>
* <li> Number of microclusters (<b>-m</b>)
* <li> Initial buffer size (<b>-b</b>)
* <li> Size of coresets (<b>-s</b>)
* <li> Learning window (<b>-w</b>) * </ul>
*/
* Implements the StreamKM++ algorithm for data streams. StreamKM++ computes a
* small (weighted) sample of the stream by using <i>coresets</i>, and then uses
* it as an input to a k-means++ algorithm. It uses a data structure called
* <tt>BucketManager</tt> to handle the coresets.
*
* <p>It uses the following options:
* <ul>
* <li> Number of microclusters (<b>-m</b>)
* <li> Initial buffer size (<b>-b</b>)
* <li> Size of coresets (<b>-s</b>)
* <li> Learning window (<b>-w</b>) * </ul>
*/
class StreamKM extends Clusterer {

type T = BucketManager

type T = BucketManager
var bucketmanager: BucketManager = null
var numInstances: Long = 0
var initialBuffer: Array[Example] = Array[Example]()
var clusters: Array[Example] = null

val kOption: IntOption = new IntOption("numClusters", 'k',
"Number of clusters for output", 10, 1, Integer.MAX_VALUE)

val repOption: IntOption = new IntOption("kMeansIters", 'i',
"Number of k-means iterations", 1000, 1, Integer.MAX_VALUE)

val sizeCoresetOption: IntOption = new IntOption("sizeCoreset", 's',
"Size of coreset", 10000, 1, Integer.MAX_VALUE)

val widthOption: IntOption = new IntOption("width",
'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE);
'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE);

var exampleLearnerSpecification: ExampleSpecification = null
/**
* Init the StreamKM++ algorithm.
*/

/**
* Init the StreamKM++ algorithm.
*/
def init(exampleSpecification: ExampleSpecification) : Unit = {
exampleLearnerSpecification = exampleSpecification
bucketmanager = new BucketManager(widthOption.getValue, sizeCoresetOption.getValue)
}

/**
* Maintain the BucketManager for coreset extraction, given an input DStream of Example.
* @param input a stream of instances
*/
def train(input: DStream[Example]): Unit = {
input.foreachRDD(rdd => {
rdd.foreach(ex => {
bucketmanager = bucketmanager.update(ex)
numInstances += 1
})
})
}



def train(input: DStream[Example]): Unit = {}

/**
* Gets the current Model used for the Learner.
* @return the Model object used for training
*/
* Gets the current Model used for the Learner.
* @return the Model object used for training
*/
def getModel: BucketManager = bucketmanager
/**
* Get the currently computed clusters
* @return an Array of Examples representing the clusters
*/

/**
* Get the currently computed clusters
* @return an Array of Examples representing the clusters
*/
def getClusters: Array[Example] = {
if(numInstances <= sizeCoresetOption.getValue) {
bucketmanager.buckets(0).points.toArray
}
}
else {
val streamingCoreset = bucketmanager.getCoreset
KMeans.cluster(streamingCoreset, kOption.getValue, repOption.getValue)
val streamingCoreset = bucketmanager.getCoreset
KMeans.cluster(streamingCoreset, kOption.getValue, repOption.getValue)
}
}

/**
* Assigns examples to clusters, given the current Clusters data structure.
* @param input the DStream of Examples to be assigned a cluster
* @return a DStream of tuples containing the original Example and the
* assigned cluster.
*/
* Maintain the BucketManager for coreset extraction, given an input DStream of Example.
* @param input a stream of instances
* @return a DStream of tuples containing the original Example and the
* assigned cluster.
*/
def assign(input: DStream[Example]): DStream[(Example,Double)] = {
input.map(x => {
val assignedCl = getClusters.foldLeft((0,Double.MaxValue,0))(
(cl,centr) => {
val dist = centr.in.distanceTo(x.in)
if(dist<cl._2) ((cl._3,dist,cl._3+1))
else ((cl._1,cl._2,cl._3+1))
input.map(ex=> {
numInstances += 1
bucketmanager = bucketmanager.update(ex)
if(numInstances <= sizeCoresetOption.getValue){
clusters = KMeans.cluster(bucketmanager.buckets(0).points.toArray,kOption.getValue,repOption.getValue)
}
else
{
val streamingCoreset = bucketmanager.getCoreset
clusters = KMeans.cluster(streamingCoreset,kOption.getValue,repOption.getValue)
}

val assignedCl = clusters.foldLeft((0, Double.MaxValue, 0))(
(cl, centr) => {
val dist = centr.in.distanceTo(ex.in)
if (dist < cl._2) ((cl._3, dist, cl._3 + 1))
else ((cl._1, cl._2, cl._3 + 1))
})._1
(x,assignedCl)
(ex,assignedCl)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.streamdm.clusterers.clusters

import org.apache.spark.streamdm.core._
import scala.collection.mutable.Queue
import scala.util.control.Breaks._


/**
Expand All @@ -31,7 +30,6 @@ import scala.util.control.Breaks._
class BucketManager(val n : Int, val maxsize : Int) extends Clusters {

type T = BucketManager

/**
* Inner class Bucket for new instance management, this class has two buffers for
* recursively computing the coresets.
Expand All @@ -57,7 +55,7 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters {
// Check if there is enough space in the first bucket
if(buckets(0).isFull){
var curbucket : Int = 0
var nextbucket : Int =1
var nextbucket : Int = 1
// Check if the next bucket is empty
if(!buckets(nextbucket).isFull) {
// Copy curbucket points to nextbucket points
Expand Down Expand Up @@ -122,25 +120,29 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters {
* @return the coreset for the examples entered into the buckets.
*/
def getCoreset: Array[Example] = {
if(buckets(L-1).isFull) {
buckets(L-1).points.toArray
}else {
var isFound: Boolean = false
if (buckets(L - 1).isFull) {
buckets(L - 1).points.toArray
} else {
var i = 0
var coreset = Array[Example]()
for(i <- 0 until L) {
if(buckets(i).isFull) {

for (i <- 0 until L) {
if (buckets(i).isFull && isFound == false) {
coreset = buckets(i).points.toArray
break
isFound=true
}
}
val start = i+1
for(j <- start until L) {
val start = i + 1
for (j <- start until L) {
val examples = buckets(j).points.toArray ++ coreset
val tree = new TreeCoreset
coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize),
new Array[Example](0))
new Array[Example](0))
}
coreset
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,16 @@ class TreeCoreset {
* Select a new centre from the leaf node for splitting.
* @return the new centre
*/
def chooseCentre() : Example = {
def chooseCentre() : Example = {
val funcost = this.weightedLeaf().cost
val points = elem.points
var sum = 0.0

for(point <- points) {
sum += costOfPoint(point)/funcost
if(sum >= Random.nextDouble)
return point
}
elem.centre
for (point <- points) {
sum += costOfPoint(point) / funcost
if (sum >= Random.nextDouble)
return point
}
elem.centre
}
}

Expand All @@ -119,34 +118,49 @@ class TreeCoreset {
* @param leaf coreset tree leaf for spliting
* @return a coreset tree node with two leaves
*/
private def splitCoresetTreeLeaf(leaf : CoresetTreeLeaf) : CoresetTreeNode = {
// Select a example from the points associated with the leaf as a new centre
// for one of the new leaf
private def splitCoresetTreeLeaf(leaf : CoresetTreeLeaf) : CoresetTreeNode = {
// Select a example from the points associated with the leaf as a new centre
// for one of the new leaf

val newcentre = leaf.chooseCentre

// The original centre as the other leaf centre
val oldcentre = leaf.elem.centre
// The points associated with the orignial leaf, the points will be assigned the new leaves
val points = leaf.elem.points

// Assign points to leftpoints and rightpoints
var leftpoints = new Array[Example](0)
var rightpoints = new Array[Example](0)
for(point <- points) {
if(squaredDistance(point, newcentre) < squaredDistance(point, oldcentre))
for (point <- points) {
if (squaredDistance(point, newcentre) < squaredDistance(point, oldcentre))
leftpoints = leftpoints :+ point
else
rightpoints = rightpoints :+ point
}

// Create new leaves
//prevent assigning all points to one child
//resplit points to leftpoints and rightpoints
if((leftpoints.length == 0 || rightpoints.length==0 ) && points.length>1){
val newcentre = leaf.chooseCentre
var leftpoints = new Array[Example](0)
var rightpoints = new Array[Example](0)
for (point <- points) {
if (squaredDistance(point, newcentre) < squaredDistance(point, oldcentre))
leftpoints = leftpoints :+ point
else
rightpoints = rightpoints :+ point
}
}

// Create new leaves
val leftElem = new CoresetTreeElem(leftpoints.length, leftpoints, newcentre)
val leftleaf = CoresetTreeLeaf(leftElem, 0.0).weightedLeaf

val rightElem = new CoresetTreeElem(rightpoints.length, rightpoints, oldcentre)
val rightleaf = CoresetTreeLeaf(rightElem, 0.0).weightedLeaf

// Return a coreset tree node with two leaves
new CoresetTreeNode(leaf.elem, leftleaf, rightleaf, leftleaf.cost+rightleaf.cost)
new CoresetTreeNode(leaf.elem, leftleaf, rightleaf, leftleaf.cost + rightleaf.cost)

}

/**
Expand All @@ -159,15 +173,39 @@ class TreeCoreset {
splitCoresetTreeLeaf(CoresetTreeLeaf(e, c))
}
case CoresetTreeNode(e, l, r, c) => {
if (Random.nextDouble > 0.5) {
val lchild = splitCoresetTree(l)
val newcost = lchild.cost + r.cost
CoresetTreeNode(e, lchild, r, newcost)
if(l.cost == 0 && r.cost == 0) {
if (l.elem.n == 0) {
val rchild = splitCoresetTree(r)
val newcost = l.cost + rchild.cost
CoresetTreeNode(e, l, rchild, newcost)
}
if (r.elem.n == 0) {
val lchild = splitCoresetTree(l)
val newcost = lchild.cost + r.cost
CoresetTreeNode(e, lchild, r, newcost)
}
else if (Random.nextDouble > 0.5) {
val lchild = splitCoresetTree(l)
val newcost = lchild.cost + r.cost
CoresetTreeNode(e, lchild, r, newcost)
}
else {
val rchild = splitCoresetTree(r)
val newcost = l.cost + rchild.cost
CoresetTreeNode(e, l, rchild, newcost)
}
}
else {
val rchild = splitCoresetTree(r)
val newcost = l.cost + rchild.cost
CoresetTreeNode(e, l, rchild, newcost)
else
{
if(Random.nextDouble < l.cost/root.cost){
val lchild = splitCoresetTree(l)
val newcost = lchild.cost + r.cost
CoresetTreeNode(e, lchild, r, newcost)
} else {
val rchild = splitCoresetTree(r)
val newcost = l.cost + rchild.cost
CoresetTreeNode(e, l, rchild, newcost)
}
}
}
}
Expand Down