Skip to content

Commit 58d4da1

Browse files
ankurdaveNgone51
authored andcommitted
[SPARK-35486][CORE] TaskMemoryManager: retry if other task takes memory freed by partial self-spill
### What changes were proposed in this pull request? When a memory reservation triggers a self-spill, `ExecutionMemoryPool#releaseMemory()` will immediately notify waiting tasks that memory has been freed. If there are any waiting tasks with less than 1/2N of the memory pool, they may acquire the newly-freed memory before the current task has a chance to do so. This will cause the original memory reservation to fail. If the initial spill did not release all available memory, the reservation could have been satisfied by asking it to spill again. This PR adds logic to TaskMemoryManager to detect this case and retry. ### Why are the changes needed? This bug affects queries with a MemoryConsumer that can spill part of its memory, such as BytesToBytesMap. If the MemoryConsumer is using all available memory and there is a waiting task, then attempting to acquire more memory on the MemoryConsumer will trigger a partial self-spill. However, because the waiting task gets priority, the attempt to acquire memory will fail even if it could have been satisfied by another spill. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a test to MemoryManagerSuite that previously failed and now passes. Closes #32625 from ankurdave/SPARK-35486. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: yi.wu <yi.wu@databricks.com>
1 parent 631077d commit 58d4da1

File tree

3 files changed

+92
-2
lines changed

3 files changed

+92
-2
lines changed

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,14 +202,22 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
202202
}
203203
}
204204

205-
// call spill() on itself
206-
if (got < required) {
205+
// Attempt to free up memory by self-spilling.
206+
//
207+
// When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
208+
// immediately notify other tasks that memory has been freed, and they may acquire the
209+
// newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
210+
// try again in the next loop iteration.
211+
while (got < required) {
207212
try {
208213
long released = consumer.spill(required - got, consumer);
209214
if (released > 0) {
210215
logger.debug("Task {} released {} from itself ({})", taskAttemptId,
211216
Utils.bytesToString(released), consumer);
212217
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
218+
} else {
219+
// Self-spilling could not free up any more memory.
220+
break;
213221
}
214222
} catch (ClosedByInterruptException e) {
215223
// This called by user to kill a task (e.g: speculative task).
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.memory;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* A TestMemoryConsumer which, when asked to spill, releases only enough memory to satisfy the
24+
* request rather than releasing all its memory.
25+
*/
26+
public class TestPartialSpillingMemoryConsumer extends TestMemoryConsumer {
27+
private long spilledBytes = 0L;
28+
29+
public TestPartialSpillingMemoryConsumer(TaskMemoryManager memoryManager, MemoryMode mode) {
30+
super(memoryManager, mode);
31+
}
32+
public TestPartialSpillingMemoryConsumer(TaskMemoryManager memoryManager) {
33+
super(memoryManager);
34+
}
35+
36+
@Override
37+
public long spill(long size, MemoryConsumer trigger) throws IOException {
38+
long used = getUsed();
39+
long released = Math.min(used, size);
40+
free(released);
41+
spilledBytes += released;
42+
return released;
43+
}
44+
45+
public long getSpilledBytes() {
46+
return spilledBytes;
47+
}
48+
}

core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,40 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
240240
assert(ThreadUtils.awaitResult(t2Result2, 200.millis) === 0L)
241241
}
242242

243+
test("SPARK-35486: memory freed by self-spilling is taken by another task") {
244+
val memoryManager = createMemoryManager(1000L)
245+
val t1MemManager = new TaskMemoryManager(memoryManager, 1)
246+
val t2MemManager = new TaskMemoryManager(memoryManager, 2)
247+
val c1 = new TestPartialSpillingMemoryConsumer(t1MemManager)
248+
val c2 = new TestMemoryConsumer(t2MemManager)
249+
val futureTimeout: Duration = 20.seconds
250+
251+
// t1 acquires 1000 bytes. This should succeed immediately.
252+
val t1Result1 = Future { c1.acquireMemory(1000L) }
253+
assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L)
254+
assert(c1.getUsed() === 1000L)
255+
assert(c1.getSpilledBytes() === 0L)
256+
257+
// t2 attempts to acquire 500 bytes. This should block since there is no memory available.
258+
val t2Result1 = Future { c2.acquireMemory(500L) }
259+
Thread.sleep(300)
260+
assert(!t2Result1.isCompleted)
261+
assert(c2.getUsed() === 0L)
262+
263+
// t1 attempts to acquire 500 bytes, causing its existing reservation to spill partially. After
264+
// the spill, t1 is still at its fair share of 500 bytes, so it cannot acquire memory and t2
265+
// gets the freed memory instead. t1 must try again, causing the rest of the reservation to
266+
// spill.
267+
val t1Result2 = Future { c1.acquireMemory(500L) }
268+
269+
// The spill should release enough memory for both t1's and t2's reservations to be satisfied.
270+
assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 500L)
271+
assert(ThreadUtils.awaitResult(t1Result2, futureTimeout) === 500L)
272+
assert(c1.getSpilledBytes() === 1000L)
273+
assert(c1.getUsed() === 500L)
274+
assert(c2.getUsed() === 500L)
275+
}
276+
243277
test("TaskMemoryManager.cleanUpAllAllocatedMemory") {
244278
val memoryManager = createMemoryManager(1000L)
245279
val t1MemManager = new TaskMemoryManager(memoryManager, 1)

0 commit comments

Comments
 (0)