Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-23861][sql-client]flink sql client support dynamic params #16877

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content.zh/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,13 @@ SET 'table.exec.spill-compression.enabled' = 'true';
SET 'table.exec.spill-compression.block-size' = '128kb';
```

```
-- flink SQL Client with -c/--conf k=v, like
./bin/sql-client.sh -i conf/init.sql -c execution.runtime-mode=batch \
-c yarn.application.name="test" -f sql/q1.sql -c sql-client.execution.result-mode=TABLEAU \
-c execution.target=yarn-per-job
```

This configuration:

- connects to Hive catalogs and uses `MyCatalog` as the current catalog with `MyDatabase` as the current database of the catalog,
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,13 @@ SET 'table.exec.spill-compression.enabled' = 'true';
SET 'table.exec.spill-compression.block-size' = '128kb';
```

```
-- Users can use flink SQL Client with -c/--conf k=v, like
./bin/sql-client.sh -i conf/init.sql -c execution.runtime-mode=batch \
-c yarn.application.name="test" -f sql/q1.sql -c sql-client.execution.result-mode=TABLEAU \
-c execution.target=yarn-per-job
```

This configuration:

- connects to Hive catalogs and uses `MyCatalog` as the current catalog with `MyDatabase` as the current database of the catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.client;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.client.cli.CliClient;
import org.apache.flink.table.client.cli.CliOptions;
import org.apache.flink.table.client.cli.CliOptionsParser;
Expand Down Expand Up @@ -82,6 +83,8 @@ private void start() {

DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
final Executor executor = new LocalExecutor(defaultContext);
Configuration flinkConfig = defaultContext.getFlinkConfig();
options.getDynamicConfMap().forEach(flinkConfig::setString);
executor.start();

// Open an new session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.net.URL;
import java.util.List;
import java.util.Map;

/**
* Command line options to configure the SQL client. Arguments that have not been specified by the
Expand All @@ -40,6 +41,7 @@ public class CliOptions {
private final String updateStatement;
private final String historyFilePath;
private final Configuration pythonConfiguration;
private final Map<String, String> dynamicConfMap;

public CliOptions(
boolean isPrintHelp,
Expand All @@ -50,7 +52,8 @@ public CliOptions(
List<URL> libraryDirs,
String updateStatement,
String historyFilePath,
Configuration pythonConfiguration) {
Configuration pythonConfiguration,
Map<String, String> dynamicConfMap) {
this.isPrintHelp = isPrintHelp;
this.sessionId = sessionId;
this.initFile = initFile;
Expand All @@ -60,6 +63,7 @@ public CliOptions(
this.updateStatement = updateStatement;
this.historyFilePath = historyFilePath;
this.pythonConfiguration = pythonConfiguration;
this.dynamicConfMap = dynamicConfMap;
}

public boolean isPrintHelp() {
Expand Down Expand Up @@ -97,4 +101,8 @@ public String getHistoryFilePath() {
public Configuration getPythonConfiguration() {
return pythonConfiguration;
}

public Map<String, String> getDynamicConfMap() {
return dynamicConfMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.lang.reflect.Method;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
Expand Down Expand Up @@ -137,6 +139,15 @@ public class CliOptionsParser {
+ "auto-generate one under your user's home directory.")
.build();

public static final Option OPTION_CONF = Option
.builder("c")
.required(false)
.longOpt("conf")
.numberOfArgs(1)
.argName("dynamic configuration")
.desc("The dynamic configuration that the specified key and value are separated by `=`.")
.build();

private static final Options EMBEDDED_MODE_CLIENT_OPTIONS =
getEmbeddedModeClientOptions(new Options());
private static final Options GATEWAY_MODE_CLIENT_OPTIONS =
Expand All @@ -162,6 +173,7 @@ public static Options getEmbeddedModeClientOptions(Options options) {
options.addOption(PYARCHIVE_OPTION);
options.addOption(PYEXEC_OPTION);
options.addOption(PYCLIENTEXEC_OPTION);
options.addOption(OPTION_CONF);
return options;
}

Expand Down Expand Up @@ -260,6 +272,30 @@ public static void printHelpGatewayModeGateway() {
// Line Parsing
// --------------------------------------------------------------------------------------------

private static Map<String, String> getDynamicConfiguration(CommandLine line) {
String[] optionValues = line.getOptionValues(CliOptionsParser.OPTION_CONF.getOpt());
Map<String, String> dynamicConfMap = new HashMap<>();
if (optionValues != null && optionValues.length > 0) {
for (String conf: optionValues) {
String[] kv = conf.split("=", 2);
if (kv.length == 1) {
continue;
}

String key = kv[0].trim();
String value = kv[1].trim();

// sanity check
if (key.length() == 0 || value.length() == 0) {
continue;
}

dynamicConfMap.put(key, value);
}
}
return dynamicConfMap;
}

public static CliOptions parseEmbeddedModeClient(String[] args) {
try {
DefaultParser parser = new DefaultParser();
Expand All @@ -273,7 +309,8 @@ public static CliOptions parseEmbeddedModeClient(String[] args) {
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
getPythonConfiguration(line));
getPythonConfiguration(line),
getDynamicConfiguration(line));
} catch (ParseException e) {
throw new SqlClientException(e.getMessage());
}
Expand All @@ -292,7 +329,8 @@ public static CliOptions parseGatewayModeClient(String[] args) {
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
getPythonConfiguration(line));
getPythonConfiguration(line),
getDynamicConfiguration(line));
} catch (ParseException e) {
throw new SqlClientException(e.getMessage());
}
Expand All @@ -311,7 +349,8 @@ public static CliOptions parseGatewayModeGateway(String[] args) {
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
null,
null,
getPythonConfiguration(line));
getPythonConfiguration(line),
getDynamicConfiguration(line));
} catch (ParseException e) {
throw new SqlClientException(e.getMessage());
}
Expand Down