Skip to content

Commit 3ddbb07

Browse files
helloliuxg@gmail.comleonardBang
authored andcommitted
[FLINK-36974][cdc-cli] Support overwrite flink configuration via command line
This closes #3823
1 parent 0f67506 commit 3ddbb07

File tree

3 files changed

+150
-28
lines changed

3 files changed

+150
-28
lines changed

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
2121
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
2222
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
23+
import org.apache.flink.cdc.common.configuration.ConfigOption;
24+
import org.apache.flink.cdc.common.configuration.ConfigOptions;
2325
import org.apache.flink.cdc.common.configuration.Configuration;
26+
import org.apache.flink.cdc.common.utils.StringUtils;
2427
import org.apache.flink.cdc.composer.PipelineExecution;
2528
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
2629
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -39,8 +42,10 @@
3942
import java.util.Arrays;
4043
import java.util.List;
4144
import java.util.Optional;
45+
import java.util.Properties;
4246
import java.util.stream.Collectors;
4347

48+
import static org.apache.flink.cdc.cli.CliFrontendOptions.FLINK_CONFIG;
4449
import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
4550
import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE;
4651
import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION;
@@ -91,6 +96,9 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
9196
Path flinkHome = getFlinkHome(commandLine);
9297
Configuration flinkConfig = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome);
9398

99+
// To override the Flink configuration
100+
overrideFlinkConfiguration(flinkConfig, commandLine);
101+
94102
// Savepoint
95103
SavepointRestoreSettings savepointSettings = createSavepointRestoreSettings(commandLine);
96104

@@ -114,6 +122,25 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
114122
savepointSettings);
115123
}
116124

125+
private static void overrideFlinkConfiguration(
126+
Configuration flinkConfig, CommandLine commandLine) {
127+
Properties properties = commandLine.getOptionProperties(FLINK_CONFIG.getOpt());
128+
LOG.info("Dynamic flink config items found: {}", properties);
129+
for (String key : properties.stringPropertyNames()) {
130+
String value = properties.getProperty(key);
131+
if (StringUtils.isNullOrWhitespaceOnly(key)
132+
|| StringUtils.isNullOrWhitespaceOnly(value)) {
133+
throw new IllegalArgumentException(
134+
String.format(
135+
"null or white space argument for key or value: %s=%s",
136+
key, value));
137+
}
138+
ConfigOption<String> configOption =
139+
ConfigOptions.key(key.trim()).stringType().defaultValue(value.trim());
140+
flinkConfig.set(configOption, value.trim());
141+
}
142+
}
143+
117144
private static SavepointRestoreSettings createSavepointRestoreSettings(
118145
CommandLine commandLine) {
119146
if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,17 @@ public class CliFrontendOptions {
9494
+ "program that was part of the program when the savepoint was triggered.")
9595
.build();
9696

97+
public static final Option FLINK_CONFIG =
98+
Option.builder("D")
99+
.required(false)
100+
.numberOfArgs(2)
101+
.valueSeparator('=')
102+
.argName("Session dynamic flink config key=val")
103+
.desc(
104+
"Allows specifying multiple flink generic configuration options. The available"
105+
+ "options can be found at https://nightlies.apache.org/flink/flink-docs-stable/ops/config.html")
106+
.build();
107+
97108
public static Options initializeOptions() {
98109
return new Options()
99110
.addOption(HELP)
@@ -105,6 +116,7 @@ public static Options initializeOptions() {
105116
.addOption(USE_MINI_CLUSTER)
106117
.addOption(SAVEPOINT_PATH_OPTION)
107118
.addOption(SAVEPOINT_CLAIM_MODE)
108-
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
119+
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION)
120+
.addOption(FLINK_CONFIG);
109121
}
110122
}

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java

Lines changed: 110 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import org.apache.commons.cli.CommandLineParser;
2828
import org.apache.commons.cli.DefaultParser;
2929
import org.apache.commons.cli.Options;
30+
import org.assertj.core.api.Assertions;
3031
import org.junit.jupiter.api.BeforeEach;
3132
import org.junit.jupiter.api.Test;
3233

3334
import java.io.ByteArrayOutputStream;
3435
import java.io.PrintStream;
3536
import java.net.URL;
3637
import java.nio.file.Paths;
38+
import java.util.Map;
3739

3840
import static org.assertj.core.api.Assertions.assertThat;
3941
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -145,6 +147,70 @@ void testPipelineExecuting() throws Exception {
145147
assertThat(executionInfo.getDescription()).isEqualTo("fake-description");
146148
}
147149

150+
@Test
151+
void testPipelineExecutingWithFlinkConfig() throws Exception {
152+
// the command line arguments to submit job to exists remote host on yarn session
153+
CliExecutor executor =
154+
createExecutor(
155+
pipelineDef(),
156+
"--flink-home",
157+
flinkHome(),
158+
"--global-config",
159+
globalPipelineConfig(),
160+
"-D",
161+
"execution.target= yarn-session",
162+
"-D",
163+
"rest.bind-port =42689",
164+
"-D",
165+
"yarn.application.id=application_1714009558476_3563",
166+
"-D",
167+
"rest.bind-address=10.1.140.140");
168+
Map<String, String> configMap = executor.getFlinkConfig().toMap();
169+
assertThat(configMap)
170+
.containsEntry("execution.target", "yarn-session")
171+
.containsEntry("rest.bind-port", "42689")
172+
.containsEntry("yarn.application.id", "application_1714009558476_3563")
173+
.containsEntry("rest.bind-address", "10.1.140.140");
174+
}
175+
176+
@Test
177+
void testPipelineExecutingWithUnValidFlinkConfig() throws Exception {
178+
Assertions.assertThatThrownBy(
179+
() ->
180+
createExecutor(
181+
pipelineDef(),
182+
"--flink-home",
183+
flinkHome(),
184+
"-D",
185+
"=execution.target"))
186+
.isInstanceOf(IllegalArgumentException.class)
187+
.hasMessage(
188+
String.format(
189+
"null or white space argument for key or value: %s=%s",
190+
"", "execution.target"));
191+
192+
Assertions.assertThatThrownBy(
193+
() ->
194+
createExecutor(
195+
pipelineDef(),
196+
"--flink-home",
197+
flinkHome(),
198+
"-D",
199+
"execution.target="))
200+
.isInstanceOf(IllegalArgumentException.class)
201+
.hasMessage(
202+
String.format(
203+
"null or white space argument for key or value: %s=%s",
204+
"execution.target", ""));
205+
206+
Assertions.assertThatThrownBy(
207+
() -> createExecutor(pipelineDef(), "--flink-home", flinkHome(), "-D", "="))
208+
.isInstanceOf(IllegalArgumentException.class)
209+
.hasMessage(
210+
String.format(
211+
"null or white space argument for key or value: %s=%s", "", ""));
212+
}
213+
148214
private CliExecutor createExecutor(String... args) throws Exception {
149215
Options cliOptions = CliFrontendOptions.initializeOptions();
150216
CommandLineParser parser = new DefaultParser();
@@ -168,33 +234,50 @@ private String globalPipelineConfig() throws Exception {
168234

169235
private static final String HELP_MESSAGE =
170236
"usage:\n"
171-
+ " -cm,--claim-mode <arg> Defines how should we restore from the given\n"
172-
+ " savepoint. Supported options: [claim - claim\n"
173-
+ " ownership of the savepoint and delete once it\n"
174-
+ " is subsumed, no_claim (default) - do not\n"
175-
+ " claim ownership, the first checkpoint will\n"
176-
+ " not reuse any files from the restored one,\n"
177-
+ " legacy - the old behaviour, do not assume\n"
178-
+ " ownership of the savepoint files, but can\n"
179-
+ " reuse some shared files\n"
180-
+ " --flink-home <arg> Path of Flink home directory\n"
181-
+ " --global-config <arg> Path of the global configuration file for\n"
182-
+ " Flink CDC pipelines\n"
183-
+ " -h,--help Display help message\n"
184-
+ " --jar <arg> JARs to be submitted together with the\n"
185-
+ " pipeline\n"
186-
+ " -n,--allow-nonRestored-state Allow to skip savepoint state that cannot be\n"
187-
+ " restored. You need to allow this if you\n"
188-
+ " removed an operator from your program that\n"
189-
+ " was part of the program when the savepoint\n"
190-
+ " was triggered.\n"
191-
+ " -s,--from-savepoint <arg> Path to a savepoint to restore the job from\n"
192-
+ " (for example hdfs:///flink/savepoint-1537\n"
193-
+ " -t,--target <arg> The deployment target for the execution. This\n"
194-
+ " can take one of the following values\n"
195-
+ " local/remote/yarn-session/yarn-application/ku\n"
196-
+ " bernetes-session/kubernetes-application\n"
197-
+ " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n";
237+
+ " -cm,--claim-mode <arg> Defines how should we restore\n"
238+
+ " from the given savepoint.\n"
239+
+ " Supported options: [claim -\n"
240+
+ " claim ownership of the savepoint\n"
241+
+ " and delete once it is subsumed,\n"
242+
+ " no_claim (default) - do not\n"
243+
+ " claim ownership, the first\n"
244+
+ " checkpoint will not reuse any\n"
245+
+ " files from the restored one,\n"
246+
+ " legacy - the old behaviour, do\n"
247+
+ " not assume ownership of the\n"
248+
+ " savepoint files, but can reuse\n"
249+
+ " some shared files\n"
250+
+ " -D <Session dynamic flink config key=val> Allows specifying multiple flink\n"
251+
+ " generic configuration options.\n"
252+
+ " The availableoptions can be\n"
253+
+ " found at\n"
254+
+ " https://nightlies.apache.org/fli\n"
255+
+ " nk/flink-docs-stable/ops/config.\n"
256+
+ " html\n"
257+
+ " --flink-home <arg> Path of Flink home directory\n"
258+
+ " --global-config <arg> Path of the global configuration\n"
259+
+ " file for Flink CDC pipelines\n"
260+
+ " -h,--help Display help message\n"
261+
+ " --jar <arg> JARs to be submitted together\n"
262+
+ " with the pipeline\n"
263+
+ " -n,--allow-nonRestored-state Allow to skip savepoint state\n"
264+
+ " that cannot be restored. You\n"
265+
+ " need to allow this if you\n"
266+
+ " removed an operator from your\n"
267+
+ " program that was part of the\n"
268+
+ " program when the savepoint was\n"
269+
+ " triggered.\n"
270+
+ " -s,--from-savepoint <arg> Path to a savepoint to restore\n"
271+
+ " the job from (for example\n"
272+
+ " hdfs:///flink/savepoint-1537\n"
273+
+ " -t,--target <arg> The deployment target for the\n"
274+
+ " execution. This can take one of\n"
275+
+ " the following values\n"
276+
+ " local/remote/yarn-session/yarn-a\n"
277+
+ " pplication/kubernetes-session/ku\n"
278+
+ " bernetes-application\n"
279+
+ " --use-mini-cluster Use Flink MiniCluster to run the\n"
280+
+ " pipeline\n";
198281

199282
private static class NoOpComposer implements PipelineComposer {
200283

0 commit comments

Comments
 (0)