Skip to content

Commit c7b2704

Browse files
manishamdepwendell
authored andcommitted
SPARK-1544 Add support for deep decision trees.
@etrain and I came with a PR for arbitrarily deep decision trees at the cost of multiple passes over the data at deep tree levels. To summarize: 1) We take a parameter that indicates the amount of memory users want to reserve for computation on each worker (and 2x that at the driver). 2) Using that information, we calculate two things - the maximum depth to which we train as usual (which is, implicitly, the maximum number of nodes we want to train in parallel), and the size of the groups we should use in the case where we exceed this depth. cc: @atalwalkar, @hirakendu, @mengxr Author: Manish Amde <manish9ue@gmail.com> Author: manishamde <manish9ue@gmail.com> Author: Evan Sparks <sparks@cs.berkeley.edu> Closes #475 from manishamde/deep_tree and squashes the following commits: 968ca9d [Manish Amde] merged master 7fc9545 [Manish Amde] added docs ce004a1 [Manish Amde] minor formatting b27ad2c [Manish Amde] formatting 426bb28 [Manish Amde] programming guide blurb 8053fed [Manish Amde] more formatting 5eca9e4 [Manish Amde] grammar 4731cda [Manish Amde] formatting 5e82202 [Manish Amde] added documentation, fixed off by 1 error in max level calculation cbd9f14 [Manish Amde] modified scala.math to math dad9652 [Manish Amde] removed unused imports e0426ee [Manish Amde] renamed parameter 718506b [Manish Amde] added unit test 1517155 [Manish Amde] updated documentation 9dbdabe [Manish Amde] merge from master 719d009 [Manish Amde] updating user documentation fecf89a [manishamde] Merge pull request #6 from etrain/deep_tree 0287772 [Evan Sparks] Fixing scalastyle issue. 2f1e093 [Manish Amde] minor: added doc for maxMemory parameter 2f6072c [manishamde] Merge pull request #5 from etrain/deep_tree abc5a23 [Evan Sparks] Parameterizing max memory. 50b143a [Manish Amde] adding support for very deep trees (cherry picked from commit f269b01) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
1 parent 0972b62 commit c7b2704

File tree

5 files changed

+177
-33
lines changed

5 files changed

+177
-33
lines changed

docs/mllib-decision-tree.md

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,14 @@ The recursive tree construction is stopped at a node when one of the two conditi
9393
1. The node depth is equal to the `maxDepth` training parameter
9494
2. No split candidate leads to an information gain at the node.
9595

96+
### Max memory requirements
97+
98+
For faster processing, the decision tree algorithm performs simultaneous histogram computations for all nodes at each level of the tree. This could lead to high memory requirements at deeper levels of the tree leading to memory overflow errors. To alleviate this problem, a 'maxMemoryInMB' training parameter is provided which specifies the maximum amount of memory at the workers (twice as much at the master) to be allocated to the histogram computation. The default value is conservatively chosen to be 128 MB to allow the decision algorithm to work in most scenarios. Once the memory requirements for a level-wise computation crosses the `maxMemoryInMB` threshold, the node training tasks at each subsequent level is split into smaller tasks.
99+
96100
### Practical limitations
97101

98-
1. The tree implementation stores an `Array[Double]` of size *O(#features \* #splits \* 2^maxDepth)*
99-
in memory for aggregating histograms over partitions. The current implementation might not scale
100-
to very deep trees since the memory requirement grows exponentially with tree depth.
101-
2. The implemented algorithm reads both sparse and dense data. However, it is not optimized for
102-
sparse input.
103-
3. Python is not supported in this release.
104-
105-
We are planning to solve these problems in the near future. Please drop us a line if you encounter
106-
any issues.
102+
1. The implemented algorithm reads both sparse and dense data. However, it is not optimized for sparse input.
103+
2. Python is not supported in this release.
107104

108105
## Examples
109106

examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ object DecisionTreeRunner {
5151
algo: Algo = Classification,
5252
maxDepth: Int = 5,
5353
impurity: ImpurityType = Gini,
54-
maxBins: Int = 20)
54+
maxBins: Int = 100)
5555

5656
def main(args: Array[String]) {
5757
val defaultParams = Params()

mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,13 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo
5454
// Find the splits and the corresponding bins (interval between the splits) using a sample
5555
// of the input data.
5656
val (splits, bins) = DecisionTree.findSplitsBins(input, strategy)
57-
logDebug("numSplits = " + bins(0).length)
57+
val numBins = bins(0).length
58+
logDebug("numBins = " + numBins)
5859

5960
// depth of the decision tree
6061
val maxDepth = strategy.maxDepth
6162
// the max number of nodes possible given the depth of the tree
62-
val maxNumNodes = scala.math.pow(2, maxDepth).toInt - 1
63+
val maxNumNodes = math.pow(2, maxDepth).toInt - 1
6364
// Initialize an array to hold filters applied to points for each node.
6465
val filters = new Array[List[Filter]](maxNumNodes)
6566
// The filter at the top node is an empty list.
@@ -68,7 +69,28 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo
6869
val parentImpurities = new Array[Double](maxNumNodes)
6970
// dummy value for top node (updated during first split calculation)
7071
val nodes = new Array[Node](maxNumNodes)
72+
// num features
73+
val numFeatures = input.take(1)(0).features.size
74+
75+
// Calculate level for single group construction
7176

77+
// Max memory usage for aggregates
78+
val maxMemoryUsage = strategy.maxMemoryInMB * 1024 * 1024
79+
logDebug("max memory usage for aggregates = " + maxMemoryUsage + " bytes.")
80+
val numElementsPerNode =
81+
strategy.algo match {
82+
case Classification => 2 * numBins * numFeatures
83+
case Regression => 3 * numBins * numFeatures
84+
}
85+
86+
logDebug("numElementsPerNode = " + numElementsPerNode)
87+
val arraySizePerNode = 8 * numElementsPerNode // approx. memory usage for bin aggregate array
88+
val maxNumberOfNodesPerGroup = math.max(maxMemoryUsage / arraySizePerNode, 1)
89+
logDebug("maxNumberOfNodesPerGroup = " + maxNumberOfNodesPerGroup)
90+
// nodes at a level is 2^level. level is zero indexed.
91+
val maxLevelForSingleGroup = math.max(
92+
(math.log(maxNumberOfNodesPerGroup) / math.log(2)).floor.toInt, 0)
93+
logDebug("max level for single group = " + maxLevelForSingleGroup)
7294

7395
/*
7496
* The main idea here is to perform level-wise training of the decision tree nodes thus
@@ -88,7 +110,7 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo
88110

89111
// Find best split for all nodes at a level.
90112
val splitsStatsForLevel = DecisionTree.findBestSplits(input, parentImpurities, strategy,
91-
level, filters, splits, bins)
113+
level, filters, splits, bins, maxLevelForSingleGroup)
92114

93115
for ((nodeSplitStats, index) <- splitsStatsForLevel.view.zipWithIndex) {
94116
// Extract info for nodes at the current level.
@@ -98,7 +120,7 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo
98120
filters)
99121
logDebug("final best split = " + nodeSplitStats._1)
100122
}
101-
require(scala.math.pow(2, level) == splitsStatsForLevel.length)
123+
require(math.pow(2, level) == splitsStatsForLevel.length)
102124
// Check whether all the nodes at the current level at leaves.
103125
val allLeaf = splitsStatsForLevel.forall(_._2.gain <= 0)
104126
logDebug("all leaf = " + allLeaf)
@@ -109,6 +131,10 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo
109131
}
110132
}
111133

134+
logDebug("#####################################")
135+
logDebug("Extracting tree model")
136+
logDebug("#####################################")
137+
112138
// Initialize the top or root node of the tree.
113139
val topNode = nodes(0)
114140
// Build the full tree using the node info calculated in the level-wise best split calculations.
@@ -127,7 +153,7 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo
127153
nodes: Array[Node]): Unit = {
128154
val split = nodeSplitStats._1
129155
val stats = nodeSplitStats._2
130-
val nodeIndex = scala.math.pow(2, level).toInt - 1 + index
156+
val nodeIndex = math.pow(2, level).toInt - 1 + index
131157
val isLeaf = (stats.gain <= 0) || (level == strategy.maxDepth - 1)
132158
val node = new Node(nodeIndex, stats.predict, isLeaf, Some(split), None, None, Some(stats))
133159
logDebug("Node = " + node)
@@ -148,7 +174,7 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo
148174
var i = 0
149175
while (i <= 1) {
150176
// Calculate the index of the node from the node level and the index at the current level.
151-
val nodeIndex = scala.math.pow(2, level + 1).toInt - 1 + 2 * index + i
177+
val nodeIndex = math.pow(2, level + 1).toInt - 1 + 2 * index + i
152178
if (level < maxDepth - 1) {
153179
val impurity = if (i == 0) {
154180
nodeSplitStats._2.leftImpurity
@@ -249,7 +275,8 @@ object DecisionTree extends Serializable with Logging {
249275
private val InvalidBinIndex = -1
250276

251277
/**
252-
* Returns an array of optimal splits for all nodes at a given level
278+
* Returns an array of optimal splits for all nodes at a given level. Splits the task into
279+
* multiple groups if the level-wise training task could lead to memory overflow.
253280
*
254281
* @param input RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] used as training data
255282
* for DecisionTree
@@ -260,6 +287,7 @@ object DecisionTree extends Serializable with Logging {
260287
* @param filters Filters for all nodes at a given level
261288
* @param splits possible splits for all features
262289
* @param bins possible bins for all features
290+
* @param maxLevelForSingleGroup the deepest level for single-group level-wise computation.
263291
* @return array of splits with best splits for all nodes at a given level.
264292
*/
265293
protected[tree] def findBestSplits(
@@ -269,7 +297,57 @@ object DecisionTree extends Serializable with Logging {
269297
level: Int,
270298
filters: Array[List[Filter]],
271299
splits: Array[Array[Split]],
272-
bins: Array[Array[Bin]]): Array[(Split, InformationGainStats)] = {
300+
bins: Array[Array[Bin]],
301+
maxLevelForSingleGroup: Int): Array[(Split, InformationGainStats)] = {
302+
// split into groups to avoid memory overflow during aggregation
303+
if (level > maxLevelForSingleGroup) {
304+
// When information for all nodes at a given level cannot be stored in memory,
305+
// the nodes are divided into multiple groups at each level with the number of groups
306+
// increasing exponentially per level. For example, if maxLevelForSingleGroup is 10,
307+
// numGroups is equal to 2 at level 11 and 4 at level 12, respectively.
308+
val numGroups = math.pow(2, (level - maxLevelForSingleGroup)).toInt
309+
logDebug("numGroups = " + numGroups)
310+
var bestSplits = new Array[(Split, InformationGainStats)](0)
311+
// Iterate over each group of nodes at a level.
312+
var groupIndex = 0
313+
while (groupIndex < numGroups) {
314+
val bestSplitsForGroup = findBestSplitsPerGroup(input, parentImpurities, strategy, level,
315+
filters, splits, bins, numGroups, groupIndex)
316+
bestSplits = Array.concat(bestSplits, bestSplitsForGroup)
317+
groupIndex += 1
318+
}
319+
bestSplits
320+
} else {
321+
findBestSplitsPerGroup(input, parentImpurities, strategy, level, filters, splits, bins)
322+
}
323+
}
324+
325+
/**
326+
* Returns an array of optimal splits for a group of nodes at a given level
327+
*
328+
* @param input RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] used as training data
329+
* for DecisionTree
330+
* @param parentImpurities Impurities for all parent nodes for the current level
331+
* @param strategy [[org.apache.spark.mllib.tree.configuration.Strategy]] instance containing
332+
* parameters for construction the DecisionTree
333+
* @param level Level of the tree
334+
* @param filters Filters for all nodes at a given level
335+
* @param splits possible splits for all features
336+
* @param bins possible bins for all features
337+
* @param numGroups total number of node groups at the current level. Default value is set to 1.
338+
* @param groupIndex index of the node group being processed. Default value is set to 0.
339+
* @return array of splits with best splits for all nodes at a given level.
340+
*/
341+
private def findBestSplitsPerGroup(
342+
input: RDD[LabeledPoint],
343+
parentImpurities: Array[Double],
344+
strategy: Strategy,
345+
level: Int,
346+
filters: Array[List[Filter]],
347+
splits: Array[Array[Split]],
348+
bins: Array[Array[Bin]],
349+
numGroups: Int = 1,
350+
groupIndex: Int = 0): Array[(Split, InformationGainStats)] = {
273351

274352
/*
275353
* The high-level description for the best split optimizations are noted here.
@@ -296,20 +374,23 @@ object DecisionTree extends Serializable with Logging {
296374
*/
297375

298376
// common calculations for multiple nested methods
299-
val numNodes = scala.math.pow(2, level).toInt
377+
val numNodes = math.pow(2, level).toInt / numGroups
300378
logDebug("numNodes = " + numNodes)
301379
// Find the number of features by looking at the first sample.
302380
val numFeatures = input.first().features.size
303381
logDebug("numFeatures = " + numFeatures)
304382
val numBins = bins(0).length
305383
logDebug("numBins = " + numBins)
306384

385+
// shift when more than one group is used at deep tree level
386+
val groupShift = numNodes * groupIndex
387+
307388
/** Find the filters used before reaching the current code. */
308389
def findParentFilters(nodeIndex: Int): List[Filter] = {
309390
if (level == 0) {
310391
List[Filter]()
311392
} else {
312-
val nodeFilterIndex = scala.math.pow(2, level).toInt - 1 + nodeIndex
393+
val nodeFilterIndex = math.pow(2, level).toInt - 1 + nodeIndex + groupShift
313394
filters(nodeFilterIndex)
314395
}
315396
}
@@ -878,7 +959,7 @@ object DecisionTree extends Serializable with Logging {
878959
// Iterating over all nodes at this level
879960
var node = 0
880961
while (node < numNodes) {
881-
val nodeImpurityIndex = scala.math.pow(2, level).toInt - 1 + node
962+
val nodeImpurityIndex = math.pow(2, level).toInt - 1 + node + groupShift
882963
val binsForNode: Array[Double] = getBinDataForNode(node)
883964
logDebug("nodeImpurityIndex = " + nodeImpurityIndex)
884965
val parentNodeImpurity = parentImpurities(nodeImpurityIndex)

mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ import org.apache.spark.mllib.tree.configuration.QuantileStrategy._
3535
* k) implies the feature n is categorical with k categories 0,
3636
* 1, 2, ... , k-1. It's important to note that features are
3737
* zero-indexed.
38+
* @param maxMemoryInMB maximum memory in MB allocated to histogram aggregation. Default value is
39+
* 128 MB.
40+
*
3841
*/
3942
@Experimental
4043
class Strategy (
@@ -43,4 +46,5 @@ class Strategy (
4346
val maxDepth: Int,
4447
val maxBins: Int = 100,
4548
val quantileCalculationStrategy: QuantileStrategy = Sort,
46-
val categoricalFeaturesInfo: Map[Int, Int] = Map[Int, Int]()) extends Serializable
49+
val categoricalFeaturesInfo: Map[Int, Int] = Map[Int, Int](),
50+
val maxMemoryInMB: Int = 128) extends Serializable

0 commit comments

Comments
 (0)