Skip to content

Commit 3f6db85

Browse files
committed
Read execution mode from configuration. Make table config function have matching flow control.
1 parent 1f52073 commit 3f6db85

File tree

3 files changed

+73
-54
lines changed

3 files changed

+73
-54
lines changed

src/main/java/io/ecraft/SqlRunner.java

Lines changed: 60 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.flink.core.fs.FSDataInputStream;
1313
import org.apache.flink.core.fs.FileSystem;
1414
import org.apache.flink.core.fs.Path;
15+
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.Environment;
1516
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
1617
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
1718
import org.apache.flink.table.api.EnvironmentSettings;
@@ -38,33 +39,6 @@ public class SqlRunner {
3839
private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))";
3940

4041
public static void main(String[] args) throws Exception {
41-
42-
EnvironmentSettings settings = EnvironmentSettings
43-
.newInstance()
44-
.inStreamingMode()
45-
.build();
46-
TableEnvironment tableEnv = TableEnvironment.create(settings);
47-
48-
String name = "hive";
49-
String defaultDatabase = "default";
50-
String hiveConfDir = "/conf/hive-conf";
51-
52-
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
53-
tableEnv.registerCatalog(name, hive);
54-
55-
// set the HiveCatalog as the current catalog of the session
56-
tableEnv.useCatalog(name);
57-
58-
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
59-
60-
LOG.debug("Current catalog: {}", tableEnv.getCurrentCatalog());
61-
LOG.debug("Current database: {}", tableEnv.getCurrentDatabase());
62-
LOG.debug("Available tables:");
63-
64-
for (String t: tableEnv.listTables()) {
65-
LOG.debug(" - {}", t);
66-
}
67-
6842
ParameterTool parameters = ParameterTool.fromArgs(args);
6943

7044
// Debug log the keys and values of the parameters
@@ -75,6 +49,14 @@ public static void main(String[] args) throws Exception {
7549
String archiveUri = parameters.getRequired("archiveUri");
7650
String environment = parameters.getRequired("environment");
7751

52+
53+
54+
String name = "hive";
55+
String defaultDatabase = "default";
56+
String hiveConfDir = "/conf/hive-conf";
57+
58+
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
59+
7860
Path remoteArchivePath = new Path(archiveUri);
7961

8062
// Read the tar file from azure blob store to a local file
@@ -114,6 +96,23 @@ public static void main(String[] args) throws Exception {
11496
while ((inputStr = jsonStreamReader.readLine()) != null)
11597
responseStrBuilder.append(inputStr);
11698
JSONObject deployableConfiguration = new JSONObject(responseStrBuilder.toString());
99+
100+
EnvironmentSettings settings = configureEnvironmentSettings(environment, deployableConfiguration, EnvironmentSettings.newInstance()).build();
101+
TableEnvironment tableEnv = TableEnvironment.create(settings);
102+
tableEnv.registerCatalog(name, hive);
103+
104+
// set the HiveCatalog as the current catalog of the session
105+
tableEnv.useCatalog(name);
106+
107+
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
108+
109+
LOG.debug("Current catalog: {}", tableEnv.getCurrentCatalog());
110+
LOG.debug("Current database: {}", tableEnv.getCurrentDatabase());
111+
LOG.debug("Available tables:");
112+
113+
for (String t: tableEnv.listTables()) {
114+
LOG.debug(" - {}", t);
115+
}
117116
configureTableEnvironment(environment, deployableConfiguration, tableEnv);
118117

119118
// Read the sql file
@@ -133,37 +132,44 @@ public static void main(String[] args) throws Exception {
133132
}
134133
}
135134

136-
public static void configureTableEnvironment(String currentEnv, JSONObject deployableConfiguration, TableEnvironment tableEnvironment) {
137-
TableConfig tableConfig = tableEnvironment.getConfig();
138-
139-
if (!deployableConfiguration.has("environments")) {
140-
// If there is no environment config, do nothing
141-
return;
142-
}
143-
144-
// Extract the environment configuration
145-
JSONObject environments = deployableConfiguration.getJSONObject("environments");
146-
if (!environments.has(currentEnv)) {
147-
// If the current environment has no config defined, do nothing
148-
return;
149-
}
150-
151-
JSONObject currentEnvironment = environments.getJSONObject(currentEnv);
152-
if (!currentEnvironment.has("tableConfig")) {
153-
// If the "tableConfig" is not set, do nothing
154-
return;
135+
public static EnvironmentSettings.Builder configureEnvironmentSettings(String currentEnv, JSONObject deployableConfiguration, EnvironmentSettings.Builder builder) {
136+
if (deployableConfiguration.has("environments")) {
137+
JSONObject environments = deployableConfiguration.getJSONObject("environments");
138+
if (environments.has(currentEnv)) {
139+
JSONObject currentEnvironment = environments.getJSONObject(currentEnv);
140+
if (currentEnvironment.has("mode")) {
141+
String mode = currentEnvironment.getString("mode");
142+
if (mode.equals("batch")) {
143+
builder.inBatchMode();
144+
} else if (mode.equals("streaming")) {
145+
builder.inStreamingMode();
146+
} else {
147+
throw new RuntimeException("Invalid deployable configuration: '"+ mode + "' is not a valid mode");
148+
}
149+
}
150+
}
155151
}
156152

157-
// Extract the tableConfig from the current environment
158-
JSONObject tableConfigJson = currentEnvironment.getJSONObject("tableConfig");
153+
return builder;
154+
}
159155

160-
// Iterate over the keys in the tableConfig and set them in the TableConfig object
161-
for (String key : tableConfigJson.keySet()) {
162-
String value = tableConfigJson.getString(key);
163-
tableConfig.getConfiguration().setString(key, value);
156+
public static void configureTableEnvironment(String currentEnv, JSONObject deployableConfiguration, TableEnvironment tableEnvironment) {
157+
TableConfig tableConfig = tableEnvironment.getConfig();
164158

165-
// Log the value that was set
166-
LOG.debug("Setting table config {} to {}", key, value);
159+
if (deployableConfiguration.has("environments")) {
160+
JSONObject environments = deployableConfiguration.getJSONObject("environments");
161+
if (environments.has(currentEnv)) {
162+
JSONObject currentEnvironment = environments.getJSONObject(currentEnv);
163+
if (currentEnvironment.has("tableConfig")) {
164+
JSONObject tableConfigJson = currentEnvironment.getJSONObject("tableConfig");
165+
for (String key : tableConfigJson.keySet()) {
166+
String value = tableConfigJson.getString(key);
167+
tableConfig.getConfiguration().setString(key, value);
168+
169+
LOG.debug("Setting table config {} to {}", key, value);
170+
}
171+
}
172+
}
167173
}
168174
}
169175

src/test/java/io/ecraft/SqlRunnerTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,18 @@ public void testTableConfig() throws Exception {
7171
assertEquals(tableEnv.getConfig().getConfiguration().get(sourceIdleTimeout), "5 min");
7272
}
7373

74+
@Test
75+
public void testEnvironmentConfig() throws Exception {
76+
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();
77+
78+
String filePath = "src/test/java/io/ecraft/fixtures/deployableconfig1.json";
79+
JSONObject jsonConfig = readJsonFile(filePath);
80+
81+
EnvironmentSettings settings = SqlRunner.configureEnvironmentSettings("dev", jsonConfig, builder).build();
82+
83+
assertEquals(settings.isStreamingMode(), true);
84+
}
85+
7486
public static JSONObject readJsonFile(String filePath) throws IOException {
7587
String content = new String(Files.readAllBytes(Paths.get(filePath)));
7688
return new JSONObject(content);

src/test/java/io/ecraft/fixtures/deployableconfig1.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"type": "sql",
44
"environments": {
55
"dev": {
6+
"mode": "streaming",
67
"tableConfig": {
78
"table.exec.mini-batch.enabled": true,
89
"table.exec.source.idle-timeout": "5 min"

0 commit comments

Comments
 (0)