Skip to content

Commit 63867a9

Browse files
authored
Impose a memory limit on the bookie journal (apache#2710)
* Impose a memory limit on the bookie journal * Fixed checkstyle issues * Fixed more checkstyle issues * Added metrics for journal memory * More checkstyle.. * Unused import
1 parent 87579b0 commit 63867a9

File tree

8 files changed

+445
-2
lines changed

8 files changed

+445
-2
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.common.util;
20+
21+
import java.util.concurrent.atomic.AtomicLong;
22+
import java.util.concurrent.locks.Condition;
23+
import java.util.concurrent.locks.ReentrantLock;
24+
25+
/**
26+
* Controller for tracking the amount of memory used for some task.
27+
*/
28+
public class MemoryLimitController {
29+
30+
private final long memoryLimit;
31+
private final AtomicLong currentUsage = new AtomicLong();
32+
private final ReentrantLock mutex = new ReentrantLock(false);
33+
private final Condition condition = mutex.newCondition();
34+
35+
public MemoryLimitController(long memoryLimitBytes) {
36+
this.memoryLimit = memoryLimitBytes;
37+
}
38+
39+
public boolean tryReserveMemory(long size) {
40+
while (true) {
41+
long current = currentUsage.get();
42+
long newUsage = current + size;
43+
44+
// We allow one request to go over the limit, to make the notification
45+
// path simpler and more efficient
46+
if (current > memoryLimit && memoryLimit > 0) {
47+
return false;
48+
}
49+
50+
if (currentUsage.compareAndSet(current, newUsage)) {
51+
return true;
52+
}
53+
}
54+
}
55+
56+
public void reserveMemory(long size) throws InterruptedException {
57+
if (!tryReserveMemory(size)) {
58+
mutex.lock();
59+
60+
try {
61+
while (!tryReserveMemory(size)) {
62+
condition.await();
63+
}
64+
} finally {
65+
mutex.unlock();
66+
}
67+
}
68+
}
69+
70+
public void releaseMemory(long size) {
71+
long newUsage = currentUsage.addAndGet(-size);
72+
if (newUsage + size > memoryLimit && newUsage <= memoryLimit) {
73+
// We just crossed the limit. Now we have more space
74+
mutex.lock();
75+
try {
76+
condition.signalAll();
77+
} finally {
78+
mutex.unlock();
79+
}
80+
}
81+
}
82+
83+
public long currentUsage() {
84+
return currentUsage.get();
85+
}
86+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.common.util;
20+
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
23+
import static org.junit.Assert.assertTrue;
24+
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.TimeUnit;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
/**
34+
* Tests for {@link MemoryLimitController}.
35+
*/
36+
public class MemoryLimitControllerTest {
37+
38+
private ExecutorService executor;
39+
40+
@Before
41+
public void setup() {
42+
executor = Executors.newCachedThreadPool();
43+
}
44+
45+
@After
46+
public void teardown() {
47+
executor.shutdownNow();
48+
}
49+
50+
@Test
51+
public void testLimit() throws Exception {
52+
MemoryLimitController mlc = new MemoryLimitController(100);
53+
54+
for (int i = 0; i < 101; i++) {
55+
mlc.reserveMemory(1);
56+
}
57+
58+
assertEquals(101, mlc.currentUsage());
59+
assertFalse(mlc.tryReserveMemory(1));
60+
mlc.releaseMemory(1);
61+
assertEquals(100, mlc.currentUsage());
62+
63+
assertTrue(mlc.tryReserveMemory(1));
64+
assertEquals(101, mlc.currentUsage());
65+
}
66+
67+
@Test
68+
public void testBlocking() throws Exception {
69+
MemoryLimitController mlc = new MemoryLimitController(100);
70+
71+
for (int i = 0; i < 101; i++) {
72+
mlc.reserveMemory(1);
73+
}
74+
75+
CountDownLatch l1 = new CountDownLatch(1);
76+
executor.submit(() -> {
77+
try {
78+
mlc.reserveMemory(1);
79+
l1.countDown();
80+
} catch (InterruptedException e) {
81+
}
82+
});
83+
84+
CountDownLatch l2 = new CountDownLatch(1);
85+
executor.submit(() -> {
86+
try {
87+
mlc.reserveMemory(1);
88+
l2.countDown();
89+
} catch (InterruptedException e) {
90+
}
91+
});
92+
93+
CountDownLatch l3 = new CountDownLatch(1);
94+
executor.submit(() -> {
95+
try {
96+
mlc.reserveMemory(1);
97+
l3.countDown();
98+
} catch (InterruptedException e) {
99+
}
100+
});
101+
102+
// The threads are blocked since the quota is full
103+
assertFalse(l1.await(100, TimeUnit.MILLISECONDS));
104+
assertFalse(l2.await(100, TimeUnit.MILLISECONDS));
105+
assertFalse(l3.await(100, TimeUnit.MILLISECONDS));
106+
107+
assertEquals(101, mlc.currentUsage());
108+
mlc.releaseMemory(3);
109+
110+
assertTrue(l1.await(1, TimeUnit.SECONDS));
111+
assertTrue(l2.await(1, TimeUnit.SECONDS));
112+
assertTrue(l3.await(1, TimeUnit.SECONDS));
113+
assertEquals(101, mlc.currentUsage());
114+
}
115+
116+
@Test
117+
public void testStepRelease() throws Exception {
118+
MemoryLimitController mlc = new MemoryLimitController(100);
119+
120+
for (int i = 0; i < 101; i++) {
121+
mlc.reserveMemory(1);
122+
}
123+
124+
CountDownLatch l1 = new CountDownLatch(1);
125+
executor.submit(() -> {
126+
try {
127+
mlc.reserveMemory(1);
128+
l1.countDown();
129+
} catch (InterruptedException e) {
130+
}
131+
});
132+
133+
CountDownLatch l2 = new CountDownLatch(1);
134+
executor.submit(() -> {
135+
try {
136+
mlc.reserveMemory(1);
137+
l2.countDown();
138+
} catch (InterruptedException e) {
139+
}
140+
});
141+
142+
CountDownLatch l3 = new CountDownLatch(1);
143+
executor.submit(() -> {
144+
try {
145+
mlc.reserveMemory(1);
146+
l3.countDown();
147+
} catch (InterruptedException e) {
148+
}
149+
});
150+
151+
// The threads are blocked since the quota is full
152+
assertFalse(l1.await(100, TimeUnit.MILLISECONDS));
153+
assertFalse(l2.await(100, TimeUnit.MILLISECONDS));
154+
assertFalse(l3.await(100, TimeUnit.MILLISECONDS));
155+
156+
assertEquals(101, mlc.currentUsage());
157+
158+
mlc.releaseMemory(1);
159+
mlc.releaseMemory(1);
160+
mlc.releaseMemory(1);
161+
162+
assertTrue(l1.await(1, TimeUnit.SECONDS));
163+
assertTrue(l2.await(1, TimeUnit.SECONDS));
164+
assertTrue(l3.await(1, TimeUnit.SECONDS));
165+
assertEquals(101, mlc.currentUsage());
166+
}
167+
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public interface BookKeeperServerStats {
113113
String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
114114
String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY";
115115
String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY";
116+
String JOURNAL_MEMORY_MAX = "JOURNAL_MEMORY_MAX";
117+
String JOURNAL_MEMORY_USED = "JOURNAL_MEMORY_USED";
116118

117119
// Ledger Storage Stats
118120
String STORAGE_GET_OFFSET = "STORAGE_GET_OFFSET";

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
package org.apache.bookkeeper.bookie;
2323

24+
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_MAX;
25+
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_USED;
2426
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
2527
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
2628
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
@@ -86,8 +88,10 @@
8688
import org.apache.bookkeeper.net.DNS;
8789
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
8890
import org.apache.bookkeeper.proto.SimpleBookieServiceInfoProvider;
91+
import org.apache.bookkeeper.stats.Gauge;
8992
import org.apache.bookkeeper.stats.NullStatsLogger;
9093
import org.apache.bookkeeper.stats.StatsLogger;
94+
import org.apache.bookkeeper.stats.annotations.StatsDoc;
9195
import org.apache.bookkeeper.util.BookKeeperConstants;
9296
import org.apache.bookkeeper.util.DiskChecker;
9397
import org.apache.bookkeeper.util.IOUtils;
@@ -146,6 +150,18 @@ public class Bookie extends BookieCriticalThread {
146150

147151
private final ByteBufAllocator allocator;
148152

153+
@StatsDoc(
154+
name = JOURNAL_MEMORY_MAX,
155+
help = "The max amount of memory in bytes that can be used by the bookie journal"
156+
)
157+
private final Gauge<Long> journalMemoryMaxStats;
158+
159+
@StatsDoc(
160+
name = JOURNAL_MEMORY_USED,
161+
help = "The actual amount of memory in bytes currently used by the bookie journal"
162+
)
163+
private final Gauge<Long> journalMemoryUsedStats;
164+
149165
/**
150166
* Exception is thrown when no such a ledger is found in this bookie.
151167
*/
@@ -812,6 +828,37 @@ public void start() {
812828

813829
// Expose Stats
814830
this.bookieStats = new BookieStats(statsLogger);
831+
journalMemoryMaxStats = new Gauge<Long>() {
832+
final long journalMaxMemory = conf.getJournalMaxMemorySizeMb() * 1024 * 1024;
833+
834+
@Override
835+
public Long getDefaultValue() {
836+
return journalMaxMemory;
837+
}
838+
839+
@Override
840+
public Long getSample() {
841+
return journalMaxMemory;
842+
}
843+
};
844+
statsLogger.scope(JOURNAL_SCOPE).registerGauge(JOURNAL_MEMORY_MAX, journalMemoryMaxStats);
845+
846+
journalMemoryUsedStats = new Gauge<Long>() {
847+
@Override
848+
public Long getDefaultValue() {
849+
return -1L;
850+
}
851+
852+
@Override
853+
public Long getSample() {
854+
long totalMemory = 0L;
855+
for (int i = 0; i < journals.size(); i++) {
856+
totalMemory += journals.get(i).getMemoryUsage();
857+
}
858+
return totalMemory;
859+
}
860+
};
861+
statsLogger.scope(JOURNAL_SCOPE).registerGauge(JOURNAL_MEMORY_USED, journalMemoryUsedStats);
815862
}
816863

817864
StateManager initializeStateManager() throws IOException {

0 commit comments

Comments
 (0)