Skip to content

Commit

Permalink
Save
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Oct 7, 2024
1 parent 34596c5 commit d216e50
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.spark;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.spark.internal.Logging;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TaskMemoryManager;
Expand All @@ -36,6 +38,7 @@ public class CometTaskMemoryManager {
private final TaskMemoryManager internal;
private final NativeMemoryConsumer nativeMemoryConsumer;
private final boolean unifiedMemory;
private static AtomicBoolean initialized = new AtomicBoolean(false);
private static long available = 0;

public CometTaskMemoryManager(long id, boolean unifiedMemory, long available) {
Expand All @@ -44,11 +47,11 @@ public CometTaskMemoryManager(long id, boolean unifiedMemory, long available) {
this.nativeMemoryConsumer = new NativeMemoryConsumer();
this.unifiedMemory = unifiedMemory;

synchronized (CometTaskMemoryManager.class) {
if (CometTaskMemoryManager.available == 0) {
if (CometTaskMemoryManager.initialized.compareAndSet(false, true)) {
synchronized (CometTaskMemoryManager.class) {
// TODO use Spark logger
System.out.println("Initializing Comet memory pool to " + available + " bytes");
CometTaskMemoryManager.available = available;
} else {
assert (CometTaskMemoryManager.available == available);
}
}
}
Expand All @@ -59,8 +62,8 @@ public long acquireMemory(long size) {
if (unifiedMemory) {
return internal.acquireExecutionMemory(size, nativeMemoryConsumer);
} else {
synchronized (this) {
if (size <= available) {
synchronized (CometTaskMemoryManager.class) {
if (size <= CometTaskMemoryManager.available) {
available -= size;
return size;
} else {
Expand All @@ -77,7 +80,7 @@ public void releaseMemory(long size) {
if (unifiedMemory) {
internal.releaseExecutionMemory(size, nativeMemoryConsumer);
} else {
synchronized (this) {
synchronized (CometTaskMemoryManager.class) {
available += size;
}
}
Expand Down

0 comments on commit d216e50

Please sign in to comment.