Skip to content

Commit

Permalink
fix_flink14-15_extended (DataLinkDC#2700)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored Dec 20, 2023
1 parent 1f496ae commit bed180d
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.flink.table.planner.delegation;

import org.dinky.parser.DinkyExtendedParser;

import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
Expand Down Expand Up @@ -63,7 +65,7 @@ public class ParserImpl implements Parser {
private final Supplier<FlinkPlannerImpl> validatorSupplier;
private final Supplier<CalciteParser> calciteParserSupplier;
private final SqlExprToRexConverterFactory sqlExprToRexConverterFactory;
private static final ExtendedParser EXTENDED_PARSER = ExtendedParser.INSTANCE;
private static final ExtendedParser EXTENDED_PARSER = DinkyExtendedParser.INSTANCE;

public ParserImpl(
CatalogManager catalogManager,
Expand Down Expand Up @@ -122,6 +124,7 @@ public ResolvedExpression parseSqlExpression(
rexNode, TypeConversions.fromLogicalToDataType(logicalType), sqlExpression, sqlExpressionExpanded);
}

@Override
public String[] getCompletionHints(String statement, int cursor) {
List<String> candidates = new ArrayList<>(Arrays.asList(EXTENDED_PARSER.getCompletionHints(statement, cursor)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public TableEnvironment getTableEnvironment() {
return streamTableEnvironment;
}

@Override
public StreamExecutionEnvironment getStreamExecutionEnvironment() {
return ((StreamTableEnvironmentImpl) streamTableEnvironment).execEnv();
}
Expand All @@ -56,9 +57,11 @@ public ClassLoader getUserClassLoader() {
return userClassLoader;
}

@Override
public Planner getPlanner() {
return ((StreamTableEnvironmentImpl) streamTableEnvironment).getPlanner();
}

@Override
public abstract <T> void addConfiguration(ConfigOption<T> option, T value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
Expand Down Expand Up @@ -98,6 +99,14 @@
*/
@Slf4j
public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {
private final CustomExtendedOperationExecutorImpl extendedExecutor = new CustomExtendedOperationExecutorImpl(this);
private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
"Unsupported SQL query! executeSql() only accepts a single SQL statement of type "
+ "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+ "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, "
+ "USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONS"
+ "CREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD "
+ "MODULE, USE MODULES, SHOW [FULL] MODULES.";

public CustomTableEnvironmentImpl(
CatalogManager catalogManager,
Expand All @@ -122,7 +131,7 @@ public CustomTableEnvironmentImpl(
Thread.currentThread().setContextClassLoader(userClassLoader);
this.executor = executor;
injectParser(new CustomParserImpl(getPlanner().getParser()));
injectExtendedExecutor(new CustomExtendedOperationExecutorImpl(this));
injectExtendedExecutor(extendedExecutor);
}

public static CustomTableEnvironmentImpl create(
Expand Down Expand Up @@ -272,6 +281,7 @@ public JobPlanInfo getJobPlanInfo(List<String> statements) {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}

@Override
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for (String statement : statements) {
Expand Down Expand Up @@ -300,10 +310,12 @@ public StreamGraph getStreamGraphFromInserts(List<String> statements) {
}
}

@Override
public JobGraph getJobGraphFromInserts(List<String> statements) {
return getStreamGraphFromInserts(statements).getJobGraph();
}

@Override
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = getParser().parse(statement);
Expand Down Expand Up @@ -332,6 +344,7 @@ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extr
return record;
}

@Override
public boolean parseAndLoadConfiguration(String statement, Map<String, Object> setMap) {
List<Operation> operations = getParser().parse(statement);
for (Operation operation : operations) {
Expand Down Expand Up @@ -383,6 +396,7 @@ private void callReset(
}
}

@Override
public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
List<Expression> expressions = ExpressionParser.parseExpressionList(fields);
return fromDataStream(dataStream, expressions.toArray(new Expression[0]));
Expand Down Expand Up @@ -437,4 +451,24 @@ && getStreamExecutionEnvironment().getStreamTimeCharacteristic() != TimeCharacte
getStreamExecutionEnvironment().getStreamTimeCharacteristic()));
}
}

@Override
public TableResult executeSql(String statement) {
List<Operation> operations = getParser().parse(statement);

if (operations.size() != 1) {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}

return executeInternal(operations.get(0));
}

@Override
public TableResult executeInternal(Operation operation) {
Optional<? extends TableResult> tableResult = extendedExecutor.executeOperation(operation);
if (tableResult.isPresent()) {
return tableResult.get();
}
return super.executeInternal(operation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.flink.table.planner.delegation;

import org.dinky.parser.DinkyExtendedParser;

import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
Expand Down Expand Up @@ -65,7 +67,7 @@ public class ParserImpl implements Parser {
private final Supplier<FlinkPlannerImpl> validatorSupplier;
private final Supplier<CalciteParser> calciteParserSupplier;
private final SqlExprToRexConverterFactory sqlExprToRexConverterFactory;
private static final ExtendedParser EXTENDED_PARSER = ExtendedParser.INSTANCE;
private static final ExtendedParser EXTENDED_PARSER = DinkyExtendedParser.INSTANCE;

public ParserImpl(
CatalogManager catalogManager,
Expand Down Expand Up @@ -125,6 +127,7 @@ public ResolvedExpression parseSqlExpression(
rexNode, TypeConversions.fromLogicalToDataType(logicalType), sqlExpression, sqlExpressionExpanded);
}

@Override
public String[] getCompletionHints(String statement, int cursor) {
List<String> candidates = new ArrayList<>(Arrays.asList(EXTENDED_PARSER.getCompletionHints(statement, cursor)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
Expand All @@ -72,6 +75,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -87,6 +91,14 @@
* @since 2022/05/08
*/
public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {
private final CustomExtendedOperationExecutorImpl extendedExecutor = new CustomExtendedOperationExecutorImpl(this);
private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
"Unsupported SQL query! executeSql() only accepts a single SQL statement of type "
+ "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+ "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, "
+ "USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONS"
+ "CREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD "
+ "MODULE, USE MODULES, SHOW [FULL] MODULES.";

public CustomTableEnvironmentImpl(
CatalogManager catalogManager,
Expand All @@ -110,7 +122,7 @@ public CustomTableEnvironmentImpl(
userClassLoader));
Thread.currentThread().setContextClassLoader(userClassLoader);
injectParser(new CustomParserImpl(getPlanner().getParser()));
injectExtendedExecutor(new CustomExtendedOperationExecutorImpl(this));
injectExtendedExecutor(extendedExecutor);
}

public static CustomTableEnvironmentImpl create(
Expand Down Expand Up @@ -170,6 +182,7 @@ public static CustomTableEnvironmentImpl create(
classLoader);
}

@Override
public ObjectNode getStreamGraph(String statement) {
List<Operation> operations = super.getParser().parse(statement);
if (operations.size() != 1) {
Expand Down Expand Up @@ -203,6 +216,7 @@ public ObjectNode getStreamGraph(String statement) {
}
}

@Override
public void addJar(File... jarPath) {
Configuration configuration =
(Configuration) getStreamExecutionEnvironment().getConfiguration();
Expand Down Expand Up @@ -242,6 +256,7 @@ public JobPlanInfo getJobPlanInfo(List<String> statements) {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}

@Override
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for (String statement : statements) {
Expand All @@ -268,10 +283,12 @@ public StreamGraph getStreamGraphFromInserts(List<String> statements) {
return streamGraph;
}

@Override
public JobGraph getJobGraphFromInserts(List<String> statements) {
return getStreamGraphFromInserts(statements).getJobGraph();
}

@Override
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = getParser().parse(statement);
Expand Down Expand Up @@ -299,6 +316,7 @@ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extr
return record;
}

@Override
public boolean parseAndLoadConfiguration(String statement, Map<String, Object> setMap) {
List<Operation> operations = getParser().parse(statement);
for (Operation operation : operations) {
Expand Down Expand Up @@ -350,6 +368,7 @@ private void callReset(
}
}

@Override
public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
List<Expression> expressions = ExpressionParser.INSTANCE.parseExpressionList(fields);
return fromDataStream(dataStream, expressions.toArray(new Expression[0]));
Expand All @@ -375,4 +394,29 @@ public <T> void createTemporaryView(String s, DataStream<Row> dataStream, List<S
public <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields) {
createTemporaryView(path, fromDataStream(dataStream, fields));
}

@Override
public TableResult executeSql(String statement) {
List<Operation> operations = getParser().parse(statement);

if (operations.size() != 1) {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}

return executeInternal(operations.get(0));
}

@Override
public TableResultInternal executeInternal(Operation operation) {
Optional<? extends TableResult> tableResult = extendedExecutor.executeOperation(operation);
if (tableResult.isPresent()) {
TableResult result = tableResult.get();
return TableResultImpl.builder()
.resultKind(result.getResultKind())
.schema(result.getResolvedSchema())
.data(CollUtil.newArrayList(result.collect()))
.build();
}
return super.executeInternal(operation);
}
}

0 comments on commit bed180d

Please sign in to comment.