Skip to content

Commit e75f43b

Browse files
heary-caoJackey Lee
authored andcommitted
[SPARK-26289][CORE] cleanup enablePerfMetrics parameter from BytesToBytesMap
## What changes were proposed in this pull request? `enablePerfMetrics `was originally designed in `BytesToBytesMap `to control `getNumHashCollisions getTimeSpentResizingNs getAverageProbesPerLookup`. However, as the Spark version gradual progress. this parameter is only used for `getAverageProbesPerLookup ` and always given to true when using `BytesToBytesMap`. it is also dangerous to determine whether `getAverageProbesPerLookup `opens and throws an `IllegalStateException `exception. So this pr will be remove `enablePerfMetrics `parameter from `BytesToBytesMap`. thanks. ## How was this patch tested? the existed test cases. Closes apache#23244 from heary-cao/enablePerfMetrics. Authored-by: caoxuewen <cao.xuewen@zte.com.cn> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 191d3fc commit e75f43b

File tree

4 files changed

+12
-33
lines changed

4 files changed

+12
-33
lines changed

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
159159
*/
160160
private final Location loc;
161161

162-
private final boolean enablePerfMetrics;
162+
private long numProbes = 0L;
163163

164-
private long numProbes = 0;
165-
166-
private long numKeyLookups = 0;
164+
private long numKeyLookups = 0L;
167165

168166
private long peakMemoryUsedBytes = 0L;
169167

@@ -180,16 +178,14 @@ public BytesToBytesMap(
180178
SerializerManager serializerManager,
181179
int initialCapacity,
182180
double loadFactor,
183-
long pageSizeBytes,
184-
boolean enablePerfMetrics) {
181+
long pageSizeBytes) {
185182
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
186183
this.taskMemoryManager = taskMemoryManager;
187184
this.blockManager = blockManager;
188185
this.serializerManager = serializerManager;
189186
this.loadFactor = loadFactor;
190187
this.loc = new Location();
191188
this.pageSizeBytes = pageSizeBytes;
192-
this.enablePerfMetrics = enablePerfMetrics;
193189
if (initialCapacity <= 0) {
194190
throw new IllegalArgumentException("Initial capacity must be greater than 0");
195191
}
@@ -209,23 +205,14 @@ public BytesToBytesMap(
209205
TaskMemoryManager taskMemoryManager,
210206
int initialCapacity,
211207
long pageSizeBytes) {
212-
this(taskMemoryManager, initialCapacity, pageSizeBytes, false);
213-
}
214-
215-
public BytesToBytesMap(
216-
TaskMemoryManager taskMemoryManager,
217-
int initialCapacity,
218-
long pageSizeBytes,
219-
boolean enablePerfMetrics) {
220208
this(
221209
taskMemoryManager,
222210
SparkEnv.get() != null ? SparkEnv.get().blockManager() : null,
223211
SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null,
224212
initialCapacity,
225213
// In order to re-use the longArray for sorting, the load factor cannot be larger than 0.5.
226214
0.5,
227-
pageSizeBytes,
228-
enablePerfMetrics);
215+
pageSizeBytes);
229216
}
230217

231218
/**
@@ -462,15 +449,12 @@ public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash)
462449
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc, int hash) {
463450
assert(longArray != null);
464451

465-
if (enablePerfMetrics) {
466-
numKeyLookups++;
467-
}
452+
numKeyLookups++;
453+
468454
int pos = hash & mask;
469455
int step = 1;
470456
while (true) {
471-
if (enablePerfMetrics) {
472-
numProbes++;
473-
}
457+
numProbes++;
474458
if (longArray.get(pos * 2) == 0) {
475459
// This is a new key.
476460
loc.with(pos, hash, false);
@@ -860,9 +844,6 @@ public long getPeakMemoryUsedBytes() {
860844
* Returns the average number of probes per key lookup.
861845
*/
862846
public double getAverageProbesPerLookup() {
863-
if (!enablePerfMetrics) {
864-
throw new IllegalStateException();
865-
}
866847
return (1.0 * numProbes) / numKeyLookups;
867848
}
868849

core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ public void failureToGrow() {
530530
@Test
531531
public void spillInIterator() throws IOException {
532532
BytesToBytesMap map = new BytesToBytesMap(
533-
taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false);
533+
taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024);
534534
try {
535535
int i;
536536
for (i = 0; i < 1024; i++) {
@@ -569,7 +569,7 @@ public void spillInIterator() throws IOException {
569569
@Test
570570
public void multipleValuesForSameKey() {
571571
BytesToBytesMap map =
572-
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false);
572+
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024);
573573
try {
574574
int i;
575575
for (i = 0; i < 1024; i++) {

sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public UnsafeFixedWidthAggregationMap(
9898
this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
9999
this.groupingKeySchema = groupingKeySchema;
100100
this.map = new BytesToBytesMap(
101-
taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes, true);
101+
taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes);
102102

103103
// Initialize the buffer for aggregation value
104104
final UnsafeProjection valueProjection = UnsafeProjection.create(aggregationBufferSchema);

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ private[joins] class UnsafeHashedRelation(
248248
binaryMap = new BytesToBytesMap(
249249
taskMemoryManager,
250250
(nKeys * 1.5 + 1).toInt, // reduce hash collision
251-
pageSizeBytes,
252-
true)
251+
pageSizeBytes)
253252

254253
var i = 0
255254
var keyBuffer = new Array[Byte](1024)
@@ -299,8 +298,7 @@ private[joins] object UnsafeHashedRelation {
299298
taskMemoryManager,
300299
// Only 70% of the slots can be used before growing, more capacity help to reduce collision
301300
(sizeEstimate * 1.5 + 1).toInt,
302-
pageSizeBytes,
303-
true)
301+
pageSizeBytes)
304302

305303
// Create a mapping of buildKeys -> rows
306304
val keyGenerator = UnsafeProjection.create(key)

0 commit comments

Comments
 (0)