Skip to content

Commit a3bbca9

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-26265][CORE] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager
## What changes were proposed in this pull request? In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this `MapIterator` and then `TaskMemoryManager` when going to free a memory page by calling `freePage`. At the same time, it is possibly that another memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it acquires memory and causes spilling on this `MapIterator`. So it ends with the `MapIterator` object holds lock to the `MapIterator` object and waits for lock on `TaskMemoryManager`, and the other consumer holds lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object. To avoid deadlock here, this patch proposes to keep reference to the page to free and free it after releasing the lock of `MapIterator`. ## How was this patch tested? Added test and manually test by running the test 100 times to make sure there is no deadlock. Closes #23272 from viirya/SPARK-26265. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent bd7df6b commit a3bbca9

File tree

3 files changed

+96
-41
lines changed

3 files changed

+96
-41
lines changed

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,18 @@ private MapIterator(int numRecords, Location loc, boolean destructive) {
255255
}
256256

257257
private void advanceToNextPage() {
258+
// SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going
259+
// to free a memory page by calling `freePage`. At the same time, it is possibly that another
260+
// memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it
261+
// acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep
262+
// reference to the page to free and free it after releasing the lock of `MapIterator`.
263+
MemoryBlock pageToFree = null;
264+
258265
synchronized (this) {
259266
int nextIdx = dataPages.indexOf(currentPage) + 1;
260267
if (destructive && currentPage != null) {
261268
dataPages.remove(currentPage);
262-
freePage(currentPage);
269+
pageToFree = currentPage;
263270
nextIdx --;
264271
}
265272
if (dataPages.size() > nextIdx) {
@@ -283,6 +290,9 @@ private void advanceToNextPage() {
283290
}
284291
}
285292
}
293+
if (pageToFree != null) {
294+
freePage(pageToFree);
295+
}
286296
}
287297

288298
@Override
@@ -329,52 +339,50 @@ public Location next() {
329339
}
330340
}
331341

332-
public long spill(long numBytes) throws IOException {
333-
synchronized (this) {
334-
if (!destructive || dataPages.size() == 1) {
335-
return 0L;
336-
}
342+
public synchronized long spill(long numBytes) throws IOException {
343+
if (!destructive || dataPages.size() == 1) {
344+
return 0L;
345+
}
337346

338-
updatePeakMemoryUsed();
347+
updatePeakMemoryUsed();
339348

340-
// TODO: use existing ShuffleWriteMetrics
341-
ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
349+
// TODO: use existing ShuffleWriteMetrics
350+
ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
342351

343-
long released = 0L;
344-
while (dataPages.size() > 0) {
345-
MemoryBlock block = dataPages.getLast();
346-
// The currentPage is used, cannot be released
347-
if (block == currentPage) {
348-
break;
349-
}
352+
long released = 0L;
353+
while (dataPages.size() > 0) {
354+
MemoryBlock block = dataPages.getLast();
355+
// The currentPage is used, cannot be released
356+
if (block == currentPage) {
357+
break;
358+
}
350359

351-
Object base = block.getBaseObject();
352-
long offset = block.getBaseOffset();
353-
int numRecords = UnsafeAlignedOffset.getSize(base, offset);
354-
int uaoSize = UnsafeAlignedOffset.getUaoSize();
355-
offset += uaoSize;
356-
final UnsafeSorterSpillWriter writer =
357-
new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
358-
while (numRecords > 0) {
359-
int length = UnsafeAlignedOffset.getSize(base, offset);
360-
writer.write(base, offset + uaoSize, length, 0);
361-
offset += uaoSize + length + 8;
362-
numRecords--;
363-
}
364-
writer.close();
365-
spillWriters.add(writer);
360+
Object base = block.getBaseObject();
361+
long offset = block.getBaseOffset();
362+
int numRecords = UnsafeAlignedOffset.getSize(base, offset);
363+
int uaoSize = UnsafeAlignedOffset.getUaoSize();
364+
offset += uaoSize;
365+
final UnsafeSorterSpillWriter writer =
366+
new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
367+
while (numRecords > 0) {
368+
int length = UnsafeAlignedOffset.getSize(base, offset);
369+
writer.write(base, offset + uaoSize, length, 0);
370+
offset += uaoSize + length + 8;
371+
numRecords--;
372+
}
373+
writer.close();
374+
spillWriters.add(writer);
366375

367-
dataPages.removeLast();
368-
released += block.size();
369-
freePage(block);
376+
dataPages.removeLast();
377+
released += block.size();
378+
freePage(block);
370379

371-
if (released >= numBytes) {
372-
break;
373-
}
380+
if (released >= numBytes) {
381+
break;
374382
}
375-
376-
return released;
377383
}
384+
385+
return released;
378386
}
379387

380388
@Override

core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
3838
return used;
3939
}
4040

41-
void use(long size) {
41+
public void use(long size) {
4242
long got = taskMemoryManager.acquireExecutionMemory(size, this);
4343
used += got;
4444
}
4545

46-
void free(long size) {
46+
public void free(long size) {
4747
used -= size;
4848
taskMemoryManager.releaseExecutionMemory(size, this);
4949
}

core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
import org.apache.spark.SparkConf;
3535
import org.apache.spark.executor.ShuffleWriteMetrics;
36+
import org.apache.spark.memory.MemoryMode;
37+
import org.apache.spark.memory.TestMemoryConsumer;
3638
import org.apache.spark.memory.TaskMemoryManager;
3739
import org.apache.spark.memory.TestMemoryManager;
3840
import org.apache.spark.network.util.JavaUtils;
@@ -678,4 +680,49 @@ public void testPeakMemoryUsed() {
678680
}
679681
}
680682

683+
@Test
684+
public void avoidDeadlock() throws InterruptedException {
685+
memoryManager.limit(PAGE_SIZE_BYTES);
686+
MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP;
687+
TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode);
688+
BytesToBytesMap map =
689+
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024);
690+
691+
Thread thread = new Thread(() -> {
692+
int i = 0;
693+
long used = 0;
694+
while (i < 10) {
695+
c1.use(10000000);
696+
used += 10000000;
697+
i++;
698+
}
699+
c1.free(used);
700+
});
701+
702+
try {
703+
int i;
704+
for (i = 0; i < 1024; i++) {
705+
final long[] arr = new long[]{i};
706+
final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
707+
loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
708+
}
709+
710+
// Starts to require memory at another memory consumer.
711+
thread.start();
712+
713+
BytesToBytesMap.MapIterator iter = map.destructiveIterator();
714+
for (i = 0; i < 1024; i++) {
715+
iter.next();
716+
}
717+
assertFalse(iter.hasNext());
718+
} finally {
719+
map.free();
720+
thread.join();
721+
for (File spillFile : spillFilesCreated) {
722+
assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up",
723+
spillFile.exists());
724+
}
725+
}
726+
}
727+
681728
}

0 commit comments

Comments
 (0)