Skip to content

[SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager #23272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 47 additions & 39 deletions core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,18 @@ private MapIterator(int numRecords, Location loc, boolean destructive) {
}

private void advanceToNextPage() {
// SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going
// to free a memory page by calling `freePage`. At the same time, it is possibly that another
// memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it
// acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep
// reference to the page to free and free it after releasing the lock of `MapIterator`.
MemoryBlock pageToFree = null;

synchronized (this) {
int nextIdx = dataPages.indexOf(currentPage) + 1;
if (destructive && currentPage != null) {
dataPages.remove(currentPage);
freePage(currentPage);
pageToFree = currentPage;
nextIdx --;
}
if (dataPages.size() > nextIdx) {
Expand All @@ -283,6 +290,9 @@ private void advanceToNextPage() {
}
}
}
if (pageToFree != null) {
freePage(pageToFree);
}
}

@Override
Expand Down Expand Up @@ -329,52 +339,50 @@ public Location next() {
}
}

public long spill(long numBytes) throws IOException {
synchronized (this) {
if (!destructive || dataPages.size() == 1) {
return 0L;
}
public synchronized long spill(long numBytes) throws IOException {
if (!destructive || dataPages.size() == 1) {
return 0L;
}

updatePeakMemoryUsed();
updatePeakMemoryUsed();

// TODO: use existing ShuffleWriteMetrics
ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
// TODO: use existing ShuffleWriteMetrics
ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();

long released = 0L;
while (dataPages.size() > 0) {
MemoryBlock block = dataPages.getLast();
// The currentPage is used, cannot be released
if (block == currentPage) {
break;
}
long released = 0L;
while (dataPages.size() > 0) {
MemoryBlock block = dataPages.getLast();
// The currentPage is used, cannot be released
if (block == currentPage) {
break;
}

Object base = block.getBaseObject();
long offset = block.getBaseOffset();
int numRecords = UnsafeAlignedOffset.getSize(base, offset);
int uaoSize = UnsafeAlignedOffset.getUaoSize();
offset += uaoSize;
final UnsafeSorterSpillWriter writer =
new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
while (numRecords > 0) {
int length = UnsafeAlignedOffset.getSize(base, offset);
writer.write(base, offset + uaoSize, length, 0);
offset += uaoSize + length + 8;
numRecords--;
}
writer.close();
spillWriters.add(writer);
Object base = block.getBaseObject();
long offset = block.getBaseOffset();
int numRecords = UnsafeAlignedOffset.getSize(base, offset);
int uaoSize = UnsafeAlignedOffset.getUaoSize();
offset += uaoSize;
final UnsafeSorterSpillWriter writer =
new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
while (numRecords > 0) {
int length = UnsafeAlignedOffset.getSize(base, offset);
writer.write(base, offset + uaoSize, length, 0);
offset += uaoSize + length + 8;
numRecords--;
}
writer.close();
spillWriters.add(writer);

dataPages.removeLast();
released += block.size();
freePage(block);
dataPages.removeLast();
released += block.size();
freePage(block);

if (released >= numBytes) {
break;
}
if (released >= numBytes) {
break;
}

return released;
}

return released;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
return used;
}

void use(long size) {
public void use(long size) {
long got = taskMemoryManager.acquireExecutionMemory(size, this);
used += got;
}

void free(long size) {
public void free(long size) {
used -= size;
taskMemoryManager.releaseExecutionMemory(size, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

import org.apache.spark.SparkConf;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TestMemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.network.util.JavaUtils;
Expand Down Expand Up @@ -667,4 +669,49 @@ public void testPeakMemoryUsed() {
}
}

@Test
public void avoidDeadlock() throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @viirya . Since this test case reproduces Deadlock situation, we need a timeout logic. Otherwise, it will hang (instead of failures) when we hit this issue later.

Copy link
Member Author

@viirya viirya Dec 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried several ways to set a timeout logic, but don't work. The deadlock always hangs the test and timeout logic.

memoryManager.limit(PAGE_SIZE_BYTES);
MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP;
TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode);
BytesToBytesMap map =
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024);

Thread thread = new Thread(() -> {
int i = 0;
long used = 0;
while (i < 10) {
c1.use(10000000);
used += 10000000;
i++;
}
c1.free(used);
});

try {
int i;
for (i = 0; i < 1024; i++) {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use for (int i = 0; ... here and line 708 because int i is not referenced outside of for loop.
Never mind. I found that this is the convention in this test suite.

final long[] arr = new long[]{i};
final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
}

// Starts to require memory at another memory consumer.
thread.start();

BytesToBytesMap.MapIterator iter = map.destructiveIterator();
for (i = 0; i < 1024; i++) {
iter.next();
}
assertFalse(iter.hasNext());
} finally {
map.free();
thread.join();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line where the test hangs without the fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When without this line, the test still hangs. The test thread hangs on the deadlock with the other thread of running memoryConsumer.

Copy link
Member Author

@viirya viirya Dec 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line just makes sure memoryConsumer to end and free acquired memory.

for (File spillFile : spillFilesCreated) {
assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up",
spillFile.exists());
}
}
}

}