Skip to content

Commit 7bab505

Browse files
author
zhangbin
committed
flink sql client support dynamic params
1 parent 2abfd40 commit 7bab505

File tree

3 files changed

+54
-4
lines changed

3 files changed

+54
-4
lines changed

flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.client;
2020

2121
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.configuration.Configuration;
2223
import org.apache.flink.table.client.cli.CliClient;
2324
import org.apache.flink.table.client.cli.CliOptions;
2425
import org.apache.flink.table.client.cli.CliOptionsParser;
@@ -82,6 +83,8 @@ private void start() {
8283

8384
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
8485
final Executor executor = new LocalExecutor(defaultContext);
86+
Configuration flinkConfig = defaultContext.getFlinkConfig();
87+
options.getDynamicConfMap().forEach(flinkConfig::setString);
8588
executor.start();
8689

8790
// Open an new session

flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.net.URL;
2626
import java.util.List;
27+
import java.util.Map;
2728

2829
/**
2930
* Command line options to configure the SQL client. Arguments that have not been specified by the
@@ -40,6 +41,7 @@ public class CliOptions {
4041
private final String updateStatement;
4142
private final String historyFilePath;
4243
private final Configuration pythonConfiguration;
44+
private final Map<String, String> dynamicConfMap;
4345

4446
public CliOptions(
4547
boolean isPrintHelp,
@@ -50,7 +52,8 @@ public CliOptions(
5052
List<URL> libraryDirs,
5153
String updateStatement,
5254
String historyFilePath,
53-
Configuration pythonConfiguration) {
55+
Configuration pythonConfiguration,
56+
Map<String, String> dynamicConfMap) {
5457
this.isPrintHelp = isPrintHelp;
5558
this.sessionId = sessionId;
5659
this.initFile = initFile;
@@ -60,6 +63,7 @@ public CliOptions(
6063
this.updateStatement = updateStatement;
6164
this.historyFilePath = historyFilePath;
6265
this.pythonConfiguration = pythonConfiguration;
66+
this.dynamicConfMap = dynamicConfMap;
6367
}
6468

6569
public boolean isPrintHelp() {
@@ -97,4 +101,8 @@ public String getHistoryFilePath() {
97101
public Configuration getPythonConfiguration() {
98102
return pythonConfiguration;
99103
}
104+
105+
public Map<String, String> getDynamicConfMap() {
106+
return dynamicConfMap;
107+
}
100108
}

flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import java.lang.reflect.Method;
3535
import java.net.URL;
3636
import java.util.Arrays;
37+
import java.util.HashMap;
3738
import java.util.List;
39+
import java.util.Map;
3840
import java.util.stream.Collectors;
3941

4042
import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
@@ -137,6 +139,15 @@ public class CliOptionsParser {
137139
+ "auto-generate one under your user's home directory.")
138140
.build();
139141

142+
public static final Option OPTION_CONF = Option
143+
.builder("c")
144+
.required(false)
145+
.longOpt("conf")
146+
.numberOfArgs(1)
147+
.argName("dynamic configuration")
148+
.desc("The dynamic configuration that the specified key and value are separated by `=`.")
149+
.build();
150+
140151
private static final Options EMBEDDED_MODE_CLIENT_OPTIONS =
141152
getEmbeddedModeClientOptions(new Options());
142153
private static final Options GATEWAY_MODE_CLIENT_OPTIONS =
@@ -162,6 +173,7 @@ public static Options getEmbeddedModeClientOptions(Options options) {
162173
options.addOption(PYARCHIVE_OPTION);
163174
options.addOption(PYEXEC_OPTION);
164175
options.addOption(PYCLIENTEXEC_OPTION);
176+
options.addOption(OPTION_CONF);
165177
return options;
166178
}
167179

@@ -260,6 +272,30 @@ public static void printHelpGatewayModeGateway() {
260272
// Line Parsing
261273
// --------------------------------------------------------------------------------------------
262274

275+
private static Map<String, String> getDynamicConfiguration(CommandLine line) {
276+
String[] optionValues = line.getOptionValues(CliOptionsParser.OPTION_CONF.getOpt());
277+
Map<String, String> dynamicConfMap = new HashMap<>();
278+
if (optionValues != null && optionValues.length > 0) {
279+
for (String conf: optionValues) {
280+
String[] kv = conf.split("=", 2);
281+
if (kv.length == 1) {
282+
continue;
283+
}
284+
285+
String key = kv[0].trim();
286+
String value = kv[1].trim();
287+
288+
// sanity check
289+
if (key.length() == 0 || value.length() == 0) {
290+
continue;
291+
}
292+
293+
dynamicConfMap.put(key, value);
294+
}
295+
}
296+
return dynamicConfMap;
297+
}
298+
263299
public static CliOptions parseEmbeddedModeClient(String[] args) {
264300
try {
265301
DefaultParser parser = new DefaultParser();
@@ -273,7 +309,8 @@ public static CliOptions parseEmbeddedModeClient(String[] args) {
273309
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
274310
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
275311
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
276-
getPythonConfiguration(line));
312+
getPythonConfiguration(line),
313+
getDynamicConfiguration(line));
277314
} catch (ParseException e) {
278315
throw new SqlClientException(e.getMessage());
279316
}
@@ -292,7 +329,8 @@ public static CliOptions parseGatewayModeClient(String[] args) {
292329
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
293330
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
294331
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
295-
getPythonConfiguration(line));
332+
getPythonConfiguration(line),
333+
getDynamicConfiguration(line));
296334
} catch (ParseException e) {
297335
throw new SqlClientException(e.getMessage());
298336
}
@@ -311,7 +349,8 @@ public static CliOptions parseGatewayModeGateway(String[] args) {
311349
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
312350
null,
313351
null,
314-
getPythonConfiguration(line));
352+
getPythonConfiguration(line),
353+
getDynamicConfiguration(line));
315354
} catch (ParseException e) {
316355
throw new SqlClientException(e.getMessage());
317356
}

0 commit comments

Comments
 (0)