Skip to content

Commit 4aee7df

Browse files
Support shared RocksDB rate limiter in Fluss
1 parent 872554b commit 4aee7df

File tree

12 files changed

+555
-14
lines changed

12 files changed

+555
-14
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,6 +1524,16 @@ public class ConfigOptions {
15241524
"The max size of the consumed memory for RocksDB batch write, "
15251525
+ "will flush just based on item count if this config set to 0.");
15261526

1527+
public static final ConfigOption<MemorySize> KV_SHARED_RATE_LIMITER_BYTES_PER_SEC =
1528+
key("kv.rocksdb.shared-rate-limiter.bytes-per-sec")
1529+
.memoryType()
1530+
.defaultValue(MemorySize.parse("100mb"))
1531+
.withDescription(
1532+
"The shared rate limit in bytes per second for RocksDB flush and compaction operations "
1533+
+ "across all RocksDB instances in the TabletServer. "
1534+
+ "All KV tablets share a single global RateLimiter to prevent disk IO from being saturated. "
1535+
+ "The default value is `100MB/s`. Set to 0 to disable rate limiting.");
1536+
15271537
// --------------------------------------------------------------------------
15281538
// Provided configurable ColumnFamilyOptions within Fluss
15291539
// --------------------------------------------------------------------------
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.ConfigOptions;
21+
import org.apache.fluss.config.MemorySize;
22+
import org.apache.fluss.config.cluster.ConfigEntry;
23+
24+
import org.apache.flink.table.annotation.ProcedureHint;
25+
import org.apache.flink.table.procedure.ProcedureContext;
26+
27+
import java.util.Collection;
28+
29+
/**
30+
* Procedure to get current RocksDB rate limiter configuration from cluster config.
31+
*
32+
* <p>Usage:
33+
*
34+
* <pre>
35+
* CALL sys.get_shared_rocksdb_rate_limiter();
36+
* </pre>
37+
*/
38+
public class GetSharedRocksDBRateLimiterProcedure extends ProcedureBase {
39+
40+
@ProcedureHint(argument = {})
41+
public String[] call(ProcedureContext context) throws Exception {
42+
43+
try {
44+
// Get cluster configuration
45+
Collection<ConfigEntry> configs = admin.describeClusterConfigs().get();
46+
47+
// Find shared rate limiter configuration
48+
String rateLimiterKey = ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key();
49+
50+
for (ConfigEntry entry : configs) {
51+
if (entry.key().equals(rateLimiterKey)) {
52+
String value = entry.value();
53+
long bytesPerSec = MemorySize.parseBytes(value);
54+
55+
if (bytesPerSec == 0) {
56+
return new String[] {
57+
"Shared RocksDB rate limiter is disabled (0 bytes/sec)"
58+
};
59+
}
60+
61+
String source =
62+
entry.source() != null ? " [source: " + entry.source() + "]" : "";
63+
return new String[] {
64+
String.format(
65+
"Shared RocksDB rate limiter: %s%s",
66+
new MemorySize(bytesPerSec).toHumanReadableString(), source)
67+
};
68+
}
69+
}
70+
71+
// Not found, return default value
72+
MemorySize defaultValue =
73+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.defaultValue();
74+
75+
return new String[] {
76+
String.format(
77+
"Shared RocksDB rate limiter: %s (default)",
78+
defaultValue.toHumanReadableString())
79+
};
80+
81+
} catch (Exception e) {
82+
throw new RuntimeException(
83+
"Failed to get shared RocksDB rate limiter config: " + e.getMessage(), e);
84+
}
85+
}
86+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ private static Map<String, Class<? extends ProcedureBase>> initProcedureMap() {
6969
private enum ProcedureEnum {
7070
ADD_ACL("sys.add_acl", AddAclProcedure.class),
7171
DROP_ACL("sys.drop_acl", DropAclProcedure.class),
72-
List_ACL("sys.list_acl", ListAclProcedure.class);
72+
List_ACL("sys.list_acl", ListAclProcedure.class),
73+
SET_SHARED_ROCKSDB_RATE_LIMITER(
74+
"sys.set_shared_rocksdb_rate_limiter", SetSharedRocksDBRateLimiterProcedure.class),
75+
GET_SHARED_ROCKSDB_RATE_LIMITER(
76+
"sys.get_shared_rocksdb_rate_limiter", GetSharedRocksDBRateLimiterProcedure.class);
7377

7478
private final String path;
7579
private final Class<? extends ProcedureBase> procedureClass;
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.ConfigOptions;
21+
import org.apache.fluss.config.MemorySize;
22+
import org.apache.fluss.config.cluster.AlterConfig;
23+
import org.apache.fluss.config.cluster.AlterConfigOpType;
24+
import org.apache.fluss.config.cluster.ConfigEntry;
25+
26+
import org.apache.flink.table.annotation.ArgumentHint;
27+
import org.apache.flink.table.annotation.DataTypeHint;
28+
import org.apache.flink.table.annotation.ProcedureHint;
29+
import org.apache.flink.table.procedure.ProcedureContext;
30+
31+
import java.util.Collection;
32+
import java.util.Collections;
33+
34+
/**
35+
* Procedure to set RocksDB rate limiter dynamically via cluster configuration. The configuration
36+
* will be persisted in ZooKeeper and applied to all TabletServers.
37+
*
38+
* <p>Usage examples:
39+
*
40+
* <pre>
41+
* -- Set rate limiter to 200MB/s
42+
* CALL sys.set_shared_rocksdb_rate_limiter('200MB');
43+
*
44+
* -- Set rate limiter to 1GB/s
45+
* CALL sys.set_shared_rocksdb_rate_limiter('1GB');
46+
* </pre>
47+
*/
48+
public class SetSharedRocksDBRateLimiterProcedure extends ProcedureBase {
49+
50+
@ProcedureHint(
51+
argument = {@ArgumentHint(name = "bytes_per_second", type = @DataTypeHint("STRING"))})
52+
public String[] call(ProcedureContext context, String bytesPerSecondStr) throws Exception {
53+
54+
try {
55+
// Parse and validate input format first
56+
long bytesPerSecond = MemorySize.parse(bytesPerSecondStr).getBytes();
57+
58+
// Get cluster configuration
59+
Collection<ConfigEntry> configs = admin.describeClusterConfigs().get();
60+
61+
ConfigEntry configEntry = null;
62+
for (ConfigEntry entry : configs) {
63+
if (entry.key().equals(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) {
64+
configEntry = entry;
65+
}
66+
}
67+
68+
if (null == configEntry) {
69+
throw new IllegalStateException(
70+
"Failed to set shared RocksDB rate limiter: previous config not found, server does not support rate limiter setting.");
71+
}
72+
73+
long oldValue = MemorySize.parseBytes(configEntry.value());
74+
75+
if (oldValue < 0 && bytesPerSecond >= 0) {
76+
throw new IllegalArgumentException(
77+
"Failed to set shared RocksDB rate limiter: rate limiter is disabled, cannot enable dynamically.");
78+
}
79+
80+
if (oldValue >= 0 && bytesPerSecond < 0) {
81+
throw new IllegalArgumentException(
82+
"Failed to set shared RocksDB rate limiter: rate limiter is enabled, cannot disable dynamically.");
83+
}
84+
85+
// Construct configuration modification operation
86+
AlterConfig alterConfig =
87+
new AlterConfig(
88+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
89+
bytesPerSecondStr,
90+
AlterConfigOpType.SET);
91+
92+
// Call Admin API to modify cluster configuration
93+
admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get();
94+
95+
return new String[] {
96+
String.format(
97+
"Successfully set shared RocksDB rate limiter to %s (%d bytes/sec) for all TabletServers. "
98+
+ "The configuration is persisted in ZooKeeper and will survive server restarts.",
99+
bytesPerSecondStr, bytesPerSecond)
100+
};
101+
102+
} catch (Exception e) {
103+
// Wrap other exceptions with more context
104+
throw new RuntimeException(
105+
"Failed to set shared RocksDB rate limiter: " + e.getMessage(), e);
106+
}
107+
}
108+
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,12 @@ void testShowProcedures() throws Exception {
9191
try (CloseableIterator<Row> showProceduresIterator =
9292
tEnv.executeSql("show procedures").collect()) {
9393
List<String> expectedShowProceduresResult =
94-
Arrays.asList("+I[sys.add_acl]", "+I[sys.drop_acl]", "+I[sys.list_acl]");
94+
Arrays.asList(
95+
"+I[sys.add_acl]",
96+
"+I[sys.drop_acl]",
97+
"+I[sys.get_shared_rocksdb_rate_limiter]",
98+
"+I[sys.list_acl]",
99+
"+I[sys.set_shared_rocksdb_rate_limiter]");
95100
// make sure no more results is unread.
96101
assertResultsIgnoreOrder(showProceduresIterator, expectedShowProceduresResult, true);
97102
}
@@ -221,6 +226,99 @@ void testUnsupportedPermissionType() {
221226
"No enum constant org.apache.fluss.security.acl.PermissionType.UNKNOWN");
222227
}
223228

229+
@Test
230+
void testGetAndSetSharedRocksDBRateLimiter() throws Exception {
231+
// Test get current shared rate limiter configuration
232+
try (CloseableIterator<Row> getRateLimiterIterator =
233+
tEnv.executeSql(
234+
String.format(
235+
"CALL %s.sys.get_shared_rocksdb_rate_limiter()",
236+
CATALOG_NAME))
237+
.collect()) {
238+
assertThat(getRateLimiterIterator.hasNext()).isTrue();
239+
Row row = getRateLimiterIterator.next();
240+
String result = row.getField(0).toString();
241+
// Should contain the initial value (100MB from config)
242+
assertThat(result).contains("100");
243+
assertThat(getRateLimiterIterator.hasNext()).isFalse();
244+
}
245+
246+
// Test set shared rate limiter to 200MB
247+
try (CloseableIterator<Row> setRateLimiterIterator =
248+
tEnv.executeSql(
249+
String.format(
250+
"CALL %s.sys.set_shared_rocksdb_rate_limiter('200MB')",
251+
CATALOG_NAME))
252+
.collect()) {
253+
assertThat(setRateLimiterIterator.hasNext()).isTrue();
254+
Row row = setRateLimiterIterator.next();
255+
String result = row.getField(0).toString();
256+
assertThat(result).contains("Successfully");
257+
assertThat(result).contains("200MB");
258+
assertThat(setRateLimiterIterator.hasNext()).isFalse();
259+
}
260+
261+
// Wait a moment for configuration to propagate
262+
Thread.sleep(2000);
263+
264+
// Verify the change
265+
try (CloseableIterator<Row> verifyIterator =
266+
tEnv.executeSql(
267+
String.format(
268+
"CALL %s.sys.get_shared_rocksdb_rate_limiter()",
269+
CATALOG_NAME))
270+
.collect()) {
271+
assertThat(verifyIterator.hasNext()).isTrue();
272+
Row row = verifyIterator.next();
273+
String result = row.getField(0).toString();
274+
// Should now show 200MB
275+
assertThat(result).contains("200");
276+
assertThat(verifyIterator.hasNext()).isFalse();
277+
}
278+
279+
// Test set shared rate limiter to 500MB
280+
try (CloseableIterator<Row> set500MBIterator =
281+
tEnv.executeSql(
282+
String.format(
283+
"CALL %s.sys.set_shared_rocksdb_rate_limiter('500MB')",
284+
CATALOG_NAME))
285+
.collect()) {
286+
assertThat(set500MBIterator.hasNext()).isTrue();
287+
Row row = set500MBIterator.next();
288+
String result = row.getField(0).toString();
289+
assertThat(result).contains("Successfully");
290+
assertThat(result).contains("500MB");
291+
}
292+
293+
Thread.sleep(2000);
294+
295+
// Final verification
296+
try (CloseableIterator<Row> finalVerifyIterator =
297+
tEnv.executeSql(
298+
String.format(
299+
"CALL %s.sys.get_shared_rocksdb_rate_limiter()",
300+
CATALOG_NAME))
301+
.collect()) {
302+
assertThat(finalVerifyIterator.hasNext()).isTrue();
303+
Row row = finalVerifyIterator.next();
304+
String result = row.getField(0).toString();
305+
assertThat(result).contains("500");
306+
}
307+
}
308+
309+
@Test
310+
void testSetSharedRocksDBRateLimiterInvalidFormat() {
311+
// Test invalid memory format
312+
assertThatThrownBy(
313+
() ->
314+
tEnv.executeSql(
315+
String.format(
316+
"CALL %s.sys.set_shared_rocksdb_rate_limiter('invalid')",
317+
CATALOG_NAME))
318+
.collect())
319+
.hasMessageContaining("Failed to set shared RocksDB rate limiter");
320+
}
321+
224322
@Test
225323
void testDisableAuthorization() throws Exception {
226324
String catalogName = "disable_acl_catalog";
@@ -272,6 +370,9 @@ private static Configuration initConfig() {
272370
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb"));
273371
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
274372

373+
// Enable shared RocksDB rate limiter for testing
374+
conf.set(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC, MemorySize.parse("100mb"));
375+
275376
// set security information.
276377
conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
277378
conf.setString("security.sasl.enabled.mechanisms", "plain");

fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29+
import java.util.Arrays;
2930
import java.util.Collections;
3031
import java.util.HashMap;
3132
import java.util.HashSet;
@@ -35,6 +36,7 @@
3536
import java.util.concurrent.locks.ReentrantReadWriteLock;
3637

3738
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
39+
import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
3840
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
3941
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
4042

@@ -49,7 +51,9 @@ class DynamicServerConfig {
4951

5052
private static final Logger LOG = LoggerFactory.getLogger(DynamicServerConfig.class);
5153
private static final Set<String> ALLOWED_CONFIG_KEYS =
52-
Collections.singleton(DATALAKE_FORMAT.key());
54+
new HashSet<>(
55+
Arrays.asList(
56+
DATALAKE_FORMAT.key(), KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()));
5357
private static final Set<String> ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake.");
5458

5559
private final ReadWriteLock lock = new ReentrantReadWriteLock();

0 commit comments

Comments
 (0)