Skip to content

Commit 399b146

Browse files
support generic flink procedure
1 parent d44ea65 commit 399b146

File tree

14 files changed

+937
-391
lines changed

14 files changed

+937
-391
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.config.cluster;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.exception.ConfigException;
22+
23+
import javax.annotation.Nullable;
24+
25+
/**
26+
* Validator for a single dynamic configuration key.
27+
*
28+
* <p>Unlike {@link ServerReconfigurable}, validators are stateless and only perform validation
29+
* logic without requiring component instances. This allows coordinators to validate configurations
30+
* for components they don't run (e.g., KvManager).
31+
*
32+
* <p>Example use case: CoordinatorServer needs to validate KV-related configurations even though it
33+
* doesn't have a KvManager instance. A {@link ConfigValidator} can be registered on both
34+
* CoordinatorServer (for validation) and TabletServer (for both validation and actual
35+
* reconfiguration via {@link ServerReconfigurable}).
36+
*
37+
* <p>Each validator monitors a single configuration key. The validator will only be invoked when
38+
* that specific key changes, improving validation efficiency.
39+
*
40+
* <p>This interface is designed to be stateless and thread-safe. Implementations should not rely on
41+
* any mutable component state.
42+
*/
43+
@PublicEvolving
44+
public interface ConfigValidator {
45+
46+
/**
47+
* Returns the configuration key this validator monitors.
48+
*
49+
* <p>The validator will only be invoked when this specific configuration key changes. This
50+
* allows efficient filtering of validators and clear declaration of dependencies.
51+
*
52+
* @return the configuration key to monitor, must not be null or empty
53+
*/
54+
String configKey();
55+
56+
/**
57+
* Validates a configuration value change.
58+
*
59+
* <p>This method is called when the monitored configuration key changes. It should check
60+
* whether the new value is valid, potentially considering the old value and validation rules.
61+
*
62+
* <p>The method should be stateless and deterministic - given the same old and new values, it
63+
* should always produce the same validation result.
64+
*
65+
* @param oldValue the previous value of the configuration key, null if the key was not set
66+
* before
67+
* @param newValue the new value of the configuration key, null if the key is being deleted
68+
* @throws ConfigException if the configuration change is invalid, with a descriptive error
69+
* message explaining why the change cannot be applied
70+
*/
71+
void validate(@Nullable String oldValue, @Nullable String newValue) throws ConfigException;
72+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.cluster.ConfigEntry;
21+
22+
import org.apache.flink.table.annotation.ArgumentHint;
23+
import org.apache.flink.table.annotation.DataTypeHint;
24+
import org.apache.flink.table.annotation.ProcedureHint;
25+
import org.apache.flink.table.procedure.ProcedureContext;
26+
import org.apache.flink.types.Row;
27+
28+
import javax.annotation.Nullable;
29+
30+
import java.util.ArrayList;
31+
import java.util.Collection;
32+
import java.util.List;
33+
34+
/**
35+
* Procedure to get cluster configuration(s).
36+
*
37+
* <p>This procedure allows querying dynamic cluster configurations. It can retrieve:
38+
*
39+
* <ul>
40+
* <li>A specific configuration key
41+
* <li>All configurations (when key parameter is null or empty)
42+
* </ul>
43+
*
44+
* <p>Usage examples:
45+
*
46+
* <pre>
47+
* -- Get a specific configuration
48+
* CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');
49+
*
50+
* -- Get all cluster configurations
51+
* CALL sys.get_cluster_config();
52+
* </pre>
53+
*/
54+
public class GetClusterConfigProcedure extends ProcedureBase {
55+
56+
@ProcedureHint(
57+
output =
58+
@DataTypeHint(
59+
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
60+
public Row[] call(ProcedureContext context) throws Exception {
61+
return getConfigs(null);
62+
}
63+
64+
@ProcedureHint(
65+
argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))},
66+
output =
67+
@DataTypeHint(
68+
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
69+
public Row[] call(ProcedureContext context, String configKey) throws Exception {
70+
return getConfigs(configKey);
71+
}
72+
73+
private Row[] getConfigs(@Nullable String configKey) throws Exception {
74+
try {
75+
// Get all cluster configurations
76+
Collection<ConfigEntry> configs = admin.describeClusterConfigs().get();
77+
78+
List<Row> results = new ArrayList<>();
79+
80+
if (configKey == null || configKey.isEmpty()) {
81+
// Return all configurations
82+
for (ConfigEntry entry : configs) {
83+
results.add(
84+
Row.of(
85+
entry.key(),
86+
entry.value(),
87+
entry.source() != null ? entry.source().name() : "UNKNOWN"));
88+
}
89+
90+
if (results.isEmpty()) {
91+
return new Row[] {Row.of("No cluster configurations found", null, null)};
92+
}
93+
} else {
94+
// Find specific configuration
95+
for (ConfigEntry entry : configs) {
96+
if (entry.key().equals(configKey)) {
97+
results.add(
98+
Row.of(
99+
entry.key(),
100+
entry.value(),
101+
entry.source() != null
102+
? entry.source().name()
103+
: "UNKNOWN"));
104+
break;
105+
}
106+
}
107+
108+
if (results.isEmpty()) {
109+
return new Row[] {
110+
Row.of(
111+
String.format("Configuration key '%s' not found", configKey),
112+
null,
113+
null)
114+
};
115+
}
116+
}
117+
118+
return results.toArray(new Row[0]);
119+
120+
} catch (Exception e) {
121+
throw new RuntimeException("Failed to get cluster config: " + e.getMessage(), e);
122+
}
123+
}
124+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetSharedRocksDBRateLimiterProcedure.java

Lines changed: 0 additions & 86 deletions
This file was deleted.

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,8 @@ private enum ProcedureEnum {
7070
ADD_ACL("sys.add_acl", AddAclProcedure.class),
7171
DROP_ACL("sys.drop_acl", DropAclProcedure.class),
7272
List_ACL("sys.list_acl", ListAclProcedure.class),
73-
SET_SHARED_ROCKSDB_RATE_LIMITER(
74-
"sys.set_shared_rocksdb_rate_limiter", SetSharedRocksDBRateLimiterProcedure.class),
75-
GET_SHARED_ROCKSDB_RATE_LIMITER(
76-
"sys.get_shared_rocksdb_rate_limiter", GetSharedRocksDBRateLimiterProcedure.class);
73+
SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class),
74+
GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class);
7775

7876
private final String path;
7977
private final Class<? extends ProcedureBase> procedureClass;

0 commit comments

Comments
 (0)