Skip to content

Commit 7b884e8

Browse files
committed
Merge branch 'master' into refresh_table
2 parents fcc31f2 + b4844ee commit 7b884e8

File tree

48 files changed

+2163
-618
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2163
-618
lines changed

core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,65 +2,65 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1
22
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
33
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
44
------------------------------------------------------------------------------------------------------------------------
5-
Serialization 609 631 22 0.3 3043.8 1.0X
6-
Deserialization 840 897 67 0.2 4201.2 0.7X
5+
Serialization 205 213 13 1.0 1023.6 1.0X
6+
Deserialization 908 939 27 0.2 4540.2 0.2X
77

8-
Compressed Serialized MapStatus sizes: 393 bytes
9-
Compressed Serialized Broadcast MapStatus sizes: 3 MB
8+
Compressed Serialized MapStatus sizes: 400 bytes
9+
Compressed Serialized Broadcast MapStatus sizes: 2 MB
1010

1111

1212
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
1313
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
1414
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
1515
------------------------------------------------------------------------------------------------------------------------
16-
Serialization 591 599 8 0.3 2955.3 1.0X
17-
Deserialization 878 913 31 0.2 4392.2 0.7X
16+
Serialization 195 204 24 1.0 976.9 1.0X
17+
Deserialization 913 940 33 0.2 4566.7 0.2X
1818

19-
Compressed Serialized MapStatus sizes: 3 MB
19+
Compressed Serialized MapStatus sizes: 2 MB
2020
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
2121

2222

2323
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
2424
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
2525
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
2626
------------------------------------------------------------------------------------------------------------------------
27-
Serialization 1776 1778 2 0.1 8880.5 1.0X
28-
Deserialization 1086 1086 0 0.2 5427.9 1.6X
27+
Serialization 616 619 3 0.3 3079.1 1.0X
28+
Deserialization 936 954 22 0.2 4680.5 0.7X
2929

30-
Compressed Serialized MapStatus sizes: 411 bytes
31-
Compressed Serialized Broadcast MapStatus sizes: 15 MB
30+
Compressed Serialized MapStatus sizes: 418 bytes
31+
Compressed Serialized Broadcast MapStatus sizes: 14 MB
3232

3333

3434
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
3535
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
3636
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
3737
------------------------------------------------------------------------------------------------------------------------
38-
Serialization 1725 1726 1 0.1 8624.9 1.0X
39-
Deserialization 1093 1094 2 0.2 5463.6 1.6X
38+
Serialization 586 588 3 0.3 2928.8 1.0X
39+
Deserialization 929 933 4 0.2 4647.0 0.6X
4040

41-
Compressed Serialized MapStatus sizes: 15 MB
41+
Compressed Serialized MapStatus sizes: 14 MB
4242
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
4343

4444

4545
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
4646
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
4747
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
4848
------------------------------------------------------------------------------------------------------------------------
49-
Serialization 12421 12522 142 0.0 62104.4 1.0X
50-
Deserialization 3020 3043 32 0.1 15102.0 4.1X
49+
Serialization 4740 4916 249 0.0 23698.5 1.0X
50+
Deserialization 1578 1597 27 0.1 7890.6 3.0X
5151

52-
Compressed Serialized MapStatus sizes: 544 bytes
53-
Compressed Serialized Broadcast MapStatus sizes: 131 MB
52+
Compressed Serialized MapStatus sizes: 546 bytes
53+
Compressed Serialized Broadcast MapStatus sizes: 123 MB
5454

5555

5656
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
5757
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
5858
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
5959
------------------------------------------------------------------------------------------------------------------------
60-
Serialization 11719 11737 26 0.0 58595.3 1.0X
61-
Deserialization 3018 3051 46 0.1 15091.7 3.9X
60+
Serialization 4492 4573 115 0.0 22458.3 1.0X
61+
Deserialization 1533 1547 20 0.1 7664.8 2.9X
6262

63-
Compressed Serialized MapStatus sizes: 131 MB
63+
Compressed Serialized MapStatus sizes: 123 MB
6464
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
6565

6666

core/benchmarks/MapStatusesSerDeserBenchmark-results.txt

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,65 +2,65 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.
22
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
33
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
44
------------------------------------------------------------------------------------------------------------------------
5-
Serialization 625 639 9 0.3 3127.2 1.0X
6-
Deserialization 875 931 49 0.2 4376.2 0.7X
5+
Serialization 236 245 18 0.8 1179.1 1.0X
6+
Deserialization 842 885 37 0.2 4211.4 0.3X
77

8-
Compressed Serialized MapStatus sizes: 393 bytes
9-
Compressed Serialized Broadcast MapStatus sizes: 3 MB
8+
Compressed Serialized MapStatus sizes: 400 bytes
9+
Compressed Serialized Broadcast MapStatus sizes: 2 MB
1010

1111

1212
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
1313
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
1414
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
1515
------------------------------------------------------------------------------------------------------------------------
16-
Serialization 604 640 71 0.3 3018.4 1.0X
17-
Deserialization 889 903 17 0.2 4443.8 0.7X
16+
Serialization 213 219 8 0.9 1065.1 1.0X
17+
Deserialization 846 870 33 0.2 4228.6 0.3X
1818

19-
Compressed Serialized MapStatus sizes: 3 MB
19+
Compressed Serialized MapStatus sizes: 2 MB
2020
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
2121

2222

2323
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
2424
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
2525
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
2626
------------------------------------------------------------------------------------------------------------------------
27-
Serialization 1879 1880 2 0.1 9394.9 1.0X
28-
Deserialization 1147 1150 5 0.2 5733.8 1.6X
27+
Serialization 624 709 167 0.3 3121.1 1.0X
28+
Deserialization 885 908 22 0.2 4427.0 0.7X
2929

30-
Compressed Serialized MapStatus sizes: 411 bytes
31-
Compressed Serialized Broadcast MapStatus sizes: 15 MB
30+
Compressed Serialized MapStatus sizes: 418 bytes
31+
Compressed Serialized Broadcast MapStatus sizes: 14 MB
3232

3333

3434
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
3535
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
3636
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
3737
------------------------------------------------------------------------------------------------------------------------
38-
Serialization 1825 1826 1 0.1 9123.3 1.0X
39-
Deserialization 1147 1281 189 0.2 5735.7 1.6X
38+
Serialization 603 604 2 0.3 3014.9 1.0X
39+
Deserialization 892 895 5 0.2 4458.7 0.7X
4040

41-
Compressed Serialized MapStatus sizes: 15 MB
41+
Compressed Serialized MapStatus sizes: 14 MB
4242
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
4343

4444

4545
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
4646
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
4747
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
4848
------------------------------------------------------------------------------------------------------------------------
49-
Serialization 12327 12518 270 0.0 61634.3 1.0X
50-
Deserialization 3120 3133 18 0.1 15600.8 4.0X
49+
Serialization 4612 4945 471 0.0 23061.0 1.0X
50+
Deserialization 1493 1495 2 0.1 7466.3 3.1X
5151

52-
Compressed Serialized MapStatus sizes: 544 bytes
53-
Compressed Serialized Broadcast MapStatus sizes: 131 MB
52+
Compressed Serialized MapStatus sizes: 546 bytes
53+
Compressed Serialized Broadcast MapStatus sizes: 123 MB
5454

5555

5656
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
5757
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
5858
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
5959
------------------------------------------------------------------------------------------------------------------------
60-
Serialization 11928 11986 82 0.0 59642.2 1.0X
61-
Deserialization 3137 3138 2 0.1 15683.3 3.8X
60+
Serialization 4452 4595 202 0.0 22261.4 1.0X
61+
Deserialization 1464 1477 18 0.1 7321.4 3.0X
6262

63-
Compressed Serialized MapStatus sizes: 131 MB
63+
Compressed Serialized MapStatus sizes: 123 MB
6464
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
6565

6666

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
package org.apache.spark
1919

20-
import java.io._
20+
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
2121
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
2222
import java.util.concurrent.locks.ReentrantReadWriteLock
23-
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2423

2524
import scala.collection.JavaConverters._
2625
import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -29,6 +28,10 @@ import scala.concurrent.duration.Duration
2928
import scala.reflect.ClassTag
3029
import scala.util.control.NonFatal
3130

31+
import com.github.luben.zstd.ZstdInputStream
32+
import com.github.luben.zstd.ZstdOutputStream
33+
import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream}
34+
3235
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
3336
import org.apache.spark.internal.Logging
3437
import org.apache.spark.internal.config._
@@ -885,13 +888,18 @@ private[spark] object MapOutputTracker extends Logging {
885888
private val BROADCAST = 1
886889

887890
// Serialize an array of map output locations into an efficient byte format so that we can send
888-
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
891+
// it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will
889892
// generally be pretty compressible because many map outputs will be on the same hostname.
890893
def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
891894
isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
892-
val out = new ByteArrayOutputStream
893-
out.write(DIRECT)
894-
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
895+
// Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one
896+
// This implementation doesn't reallocate the whole memory block but allocates
897+
// additional buffers. This way no buffers need to be garbage collected and
898+
// the contents don't have to be copied to the new buffer.
899+
val out = new ApacheByteArrayOutputStream()
900+
val compressedOut = new ApacheByteArrayOutputStream()
901+
902+
val objOut = new ObjectOutputStream(out)
895903
Utils.tryWithSafeFinally {
896904
// Since statuses can be modified in parallel, sync on it
897905
statuses.synchronized {
@@ -900,18 +908,42 @@ private[spark] object MapOutputTracker extends Logging {
900908
} {
901909
objOut.close()
902910
}
903-
val arr = out.toByteArray
911+
912+
val arr: Array[Byte] = {
913+
val zos = new ZstdOutputStream(compressedOut)
914+
Utils.tryWithSafeFinally {
915+
compressedOut.write(DIRECT)
916+
// `out.writeTo(zos)` will write the uncompressed data from `out` to `zos`
917+
// without copying to avoid unnecessary allocation and copy of byte[].
918+
out.writeTo(zos)
919+
} {
920+
zos.close()
921+
}
922+
compressedOut.toByteArray
923+
}
904924
if (arr.length >= minBroadcastSize) {
905925
// Use broadcast instead.
906926
// Important arr(0) is the tag == DIRECT, ignore that while deserializing !
907927
val bcast = broadcastManager.newBroadcast(arr, isLocal)
908928
// toByteArray creates copy, so we can reuse out
909929
out.reset()
910-
out.write(BROADCAST)
911-
val oos = new ObjectOutputStream(new GZIPOutputStream(out))
912-
oos.writeObject(bcast)
913-
oos.close()
914-
val outArr = out.toByteArray
930+
val oos = new ObjectOutputStream(out)
931+
Utils.tryWithSafeFinally {
932+
oos.writeObject(bcast)
933+
} {
934+
oos.close()
935+
}
936+
val outArr = {
937+
compressedOut.reset()
938+
val zos = new ZstdOutputStream(compressedOut)
939+
Utils.tryWithSafeFinally {
940+
compressedOut.write(BROADCAST)
941+
out.writeTo(zos)
942+
} {
943+
zos.close()
944+
}
945+
compressedOut.toByteArray
946+
}
915947
logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
916948
(outArr, bcast)
917949
} else {
@@ -924,7 +956,7 @@ private[spark] object MapOutputTracker extends Logging {
924956
assert (bytes.length > 0)
925957

926958
def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
927-
val objIn = new ObjectInputStream(new GZIPInputStream(
959+
val objIn = new ObjectInputStream(new ZstdInputStream(
928960
new ByteArrayInputStream(arr, off, len)))
929961
Utils.tryWithSafeFinally {
930962
objIn.readObject()

docs/sql-data-sources-orc.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also
3131
<tr>
3232
<td><code>spark.sql.orc.impl</code></td>
3333
<td><code>native</code></td>
34-
<td>The name of ORC implementation. It can be one of <code>native</code> and <code>hive</code>. <code>native</code> means the native ORC support that is built on Apache ORC 1.4. `hive` means the ORC library in Hive 1.2.1.</td>
34+
<td>The name of ORC implementation. It can be one of <code>native</code> and <code>hive</code>. <code>native</code> means the native ORC support. <code>hive</code> means the ORC library in Hive.</td>
3535
</tr>
3636
<tr>
3737
<td><code>spark.sql.orc.enableVectorizedReader</code></td>

mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ class GBTClassifier @Since("1.4.0") (
203203
} else {
204204
GradientBoostedTrees.run(trainDataset, boostingStrategy, $(seed), $(featureSubsetStrategy))
205205
}
206+
baseLearners.foreach(copyValues(_))
206207

207208
val numFeatures = baseLearners.head.numFeatures
208209
instr.logNumFeatures(numFeatures)

mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ class RandomForestClassifier @Since("1.4.0") (
143143
val trees = RandomForest
144144
.run(instances, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr))
145145
.map(_.asInstanceOf[DecisionTreeClassificationModel])
146+
trees.foreach(copyValues(_))
146147

147148
val numFeatures = trees.head.numFeatures
148149
instr.logNumClasses(numClasses)

mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
181181
GradientBoostedTrees.run(trainDataset, boostingStrategy,
182182
$(seed), $(featureSubsetStrategy))
183183
}
184+
baseLearners.foreach(copyValues(_))
184185

185186
val numFeatures = baseLearners.head.numFeatures
186187
instr.logNumFeatures(numFeatures)

mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
130130
val trees = RandomForest
131131
.run(instances, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr))
132132
.map(_.asInstanceOf[DecisionTreeRegressionModel])
133+
trees.foreach(copyValues(_))
133134

134135
val numFeatures = trees.head.numFeatures
135136
instr.logNamedValue(Instrumentation.loggerTags.numFeatures, numFeatures)

mllib/src/test/scala/org/apache/spark/ml/classification/GBTClassifierSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,22 @@ class GBTClassifierSuite extends MLTest with DefaultReadWriteTest {
456456
}
457457
}
458458

459+
test("tree params") {
460+
val categoricalFeatures = Map.empty[Int, Int]
461+
val df: DataFrame = TreeTests.setMetadata(data, categoricalFeatures, numClasses = 2)
462+
val gbt = new GBTClassifier()
463+
.setMaxDepth(2)
464+
.setCheckpointInterval(5)
465+
.setSeed(123)
466+
val model = gbt.fit(df)
467+
468+
model.trees.foreach (i => {
469+
assert(i.getMaxDepth === model.getMaxDepth)
470+
assert(i.getCheckpointInterval === model.getCheckpointInterval)
471+
assert(i.getSeed === model.getSeed)
472+
})
473+
}
474+
459475
/////////////////////////////////////////////////////////////////////////////
460476
// Tests of model save/load
461477
/////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)