Skip to content

Commit

Permalink
feat: could watch table view (DataLinkDC#1810)
Browse files Browse the repository at this point in the history
* feat: could watch table view

* feat: assume watch is insert sql

* refactor: add CTAS sqlTYpe

* feat:add CTAS table

* refactor: remove redundance

* Spotless Apply

---------

Co-authored-by: leechor <leechor@users.noreply.github.com>
  • Loading branch information
leechor and leechor authored Mar 31, 2023
1 parent ac84664 commit 9b7e8b4
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 15 deletions.
8 changes: 2 additions & 6 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,10 @@ public JobParam pretreatStatements(String[] statements) {
WatchStatementExplainer watchStatementExplainer =
new WatchStatementExplainer(statement);
String tableName = watchStatementExplainer.getTableName();
ddl.add(
new StatementParam(
watchStatementExplainer.getCreateStatement(tableName),
SqlType.CREATE));
trans.add(
new StatementParam(
watchStatementExplainer.getInsertStatement(tableName),
SqlType.INSERT));
watchStatementExplainer.getCreateStatement(tableName),
SqlType.CTAS));
} else {
UDF udf = UDFUtil.toUDF(statement);
if (Asserts.isNotNull(udf)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public class WatchStatementExplainer {
public static final String CREATE_SQL_TEMPLATE =
"CREATE TABLE print_{0} WITH (''connector'' = ''printnet'', "
+ "''port''=''{2,number,#}'', ''hostName'' = ''{1}'', ''sink.parallelism''=''{3}'')\n"
+ "LIKE {0};";
public static final String INSERT_SQL_TEMPLATE = "insert into print_{0} select * from {0};";
+ "AS SELECT * FROM {0}";
public static final int PORT = 7125;

private final String statement;
Expand All @@ -56,10 +55,6 @@ public String getCreateStatement(String tableName) {
return MessageFormat.format(CREATE_SQL_TEMPLATE, tableName, ip, PORT, 1);
}

public String getInsertStatement(String tableName) {
return MessageFormat.format(INSERT_SQL_TEMPLATE, tableName);
}

private static Optional<InetAddress> getSystemLocalIp() {
try {
InetAddress ip = InetAddress.getLocalHost();
Expand Down
2 changes: 1 addition & 1 deletion dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ public JobResult executeSql(String statement) {
} else if (useStatementSet && !useGateway) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
if (item.getType().isInsert()) {
if (item.getType().isInsert() || item.getType().equals(SqlType.CTAS)) {
inserts.add(item.getValue());
}
}
Expand Down
5 changes: 3 additions & 2 deletions dinky-executor/src/main/java/org/dinky/parser/SqlType.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public enum SqlType {
RESET("RESET"),
EXECUTE("EXECUTE"),
ADD("ADD"),
UNKNOWN("UNKNOWN"),
WATCH("WATCH");
WATCH("WATCH"),
CTAS("CTAS"),
UNKNOWN("UNKNOWN");

private String type;

Expand Down

0 comments on commit 9b7e8b4

Please sign in to comment.