Skip to content

Commit

Permalink
feat: add operation execute compatible for flink 1.18 (DataLinkDC#2637)
Browse files Browse the repository at this point in the history
* feat: add operation execute compatible for flink 1.18

Signed-off-by: licho <lecho.sun@gmail.com>

* Spotless Apply

* Spotless Apply

---------

Signed-off-by: licho <lecho.sun@gmail.com>
Co-authored-by: leechor <leechor@users.noreply.github.com>
  • Loading branch information
leechor and leechor authored Dec 13, 2023
1 parent 5267fb5 commit 7655efe
Show file tree
Hide file tree
Showing 19 changed files with 267 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.parser.CustomParserImpl;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LineageContext;

Expand Down Expand Up @@ -119,6 +120,8 @@ public CustomTableEnvironmentImpl(
isStreamingMode,
userClassLoader));
this.executor = executor;
injectParser(new CustomParserImpl(getPlanner().getParser()));
injectExtendedExecutor(new CustomExtendedOperationExecutorImpl(this));
}

public static CustomTableEnvironmentImpl create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.parser.CustomParserImpl;
import org.dinky.utils.LineageContext;

import org.apache.flink.api.common.RuntimeExecutionMode;
Expand Down Expand Up @@ -108,6 +109,8 @@ public CustomTableEnvironmentImpl(
executor,
isStreamingMode,
userClassLoader));
injectParser(new CustomParserImpl(getPlanner().getParser()));
injectExtendedExecutor(new CustomExtendedOperationExecutorImpl(this));
}

public static CustomTableEnvironmentImpl create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.parser.CustomParserImpl;
import org.dinky.trans.ddl.CustomSetOperation;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LineageContext;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {

public CustomTableEnvironmentImpl(StreamTableEnvironment streamTableEnvironment) {
super(streamTableEnvironment);
injectParser(new CustomParserImpl(getPlanner().getParser()));
injectExtendedExecutor(new CustomExtendedOperationExecutorImpl(this));
}

public static CustomTableEnvironmentImpl create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.parser.CustomParserImpl;
import org.dinky.utils.LineageContext;

import org.apache.flink.api.dag.Transformation;
Expand Down Expand Up @@ -79,6 +80,8 @@ public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {

public CustomTableEnvironmentImpl(StreamTableEnvironment streamTableEnvironment) {
super(streamTableEnvironment);
injectParser(new CustomParserImpl(getPlanner().getParser()));
injectExtendedExecutor(new CustomExtendedOperationExecutorImpl(this));
}

public static CustomTableEnvironmentImpl create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.operations.CustomNewParserImpl;
import org.dinky.utils.LineageContext;

import org.apache.flink.api.dag.Transformation;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {

public CustomTableEnvironmentImpl(StreamTableEnvironment streamTableEnvironment) {
super(streamTableEnvironment);
injectParser(new CustomNewParserImpl(this, getPlanner().getParser()));
}

public static CustomTableEnvironmentImpl create(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.operations;

import org.dinky.parser.CustomParserImpl;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.planner.parse.ExtendedParser;

public class CustomNewParserImpl extends CustomParserImpl {

private final DinkyParser dinkyParser;

public CustomNewParserImpl(TableEnvironment tableEnvironment, Parser parser) {
super(parser);
this.dinkyParser = new DinkyParser(tableEnvironment);
}

@Override
public ExtendedParser getDinkyParser() {
return this.dinkyParser;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.operations;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.operations.ExecutableOperation;
import org.apache.flink.table.operations.Operation;

public class DinkyExecutableOperation implements ExecutableOperation {

private final Operation innerOperation;
private final TableEnvironment tableEnvironment;

public DinkyExecutableOperation(TableEnvironment tableEnvironment, Operation innerOperation) {
this.tableEnvironment = tableEnvironment;
this.innerOperation = innerOperation;
}

@Override
public TableResultInternal execute(Context ctx) {
DinkyOperationExecutor operationExecutor = new DinkyOperationExecutor(tableEnvironment, ctx);
return operationExecutor.executeOperation(innerOperation).get();
}

public Operation getInnerOperation() {
return innerOperation;
}

@Override
public String asSummaryString() {
return innerOperation.asSummaryString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.operations;

import org.dinky.executor.CustomTableEnvironment;
import org.dinky.trans.ExtendOperation;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.operations.ExecutableOperation;
import org.apache.flink.table.operations.Operation;

import java.util.Optional;

public class DinkyOperationExecutor {
private final ExecutableOperation.Context context;

private final TableEnvironment tableEnvironment;

public DinkyOperationExecutor(TableEnvironment tableEnvironment, ExecutableOperation.Context context) {
this.tableEnvironment = tableEnvironment;
this.context = context;
}

public Optional<TableResultInternal> executeOperation(Operation operation) {
ExtendOperation extendOperation = (ExtendOperation) operation;
return Optional.of((TableResultInternal) extendOperation
.execute((CustomTableEnvironment) tableEnvironment)
.get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.operations;

import org.dinky.parser.DinkyExtendedParser;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.parse.ExtendedParseStrategy;

import java.util.Optional;

public class DinkyParser extends DinkyExtendedParser {
private final TableEnvironment tableEnvironment;

public DinkyParser(TableEnvironment tableEnvironment) {
this.tableEnvironment = tableEnvironment;
}

@Override
public Optional<Operation> parse(String statement) {
for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) {
if (strategy.match(statement)) {
return Optional.of(new DinkyExecutableOperation(this.tableEnvironment, strategy.convert(statement)));
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.parse.ExtendedParser;

import java.util.List;

Expand All @@ -30,6 +31,8 @@ public interface CustomParser {

Parser getParser();

ExtendedParser getDinkyParser();

SqlNode parseExpression(String sqlExpression);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,15 @@
package org.dinky.parser;

import org.dinky.executor.CustomParser;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.trans.parse.CreateAggTableSelectSqlParseStrategy;
import org.dinky.trans.parse.CreateTemporalTableFunctionParseStrategy;
import org.dinky.trans.parse.SetSqlParseStrategy;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.parse.ExtendedParseStrategy;
import org.apache.flink.table.planner.parse.ExtendedParser;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand All @@ -44,7 +38,6 @@

public class CustomParserImpl implements CustomParser {

private static final DinkyExtendedParser DINKY_EXTENDED_PARSER = DinkyExtendedParser.INSTANCE;
private final Parser parser;
private final Supplier<FlinkPlannerImpl> validatorSupplier;
private final Supplier<CalciteParser> calciteParserSupplier;
Expand Down Expand Up @@ -77,7 +70,7 @@ public static Supplier<FlinkPlannerImpl> getValidatorSupplier(Parser parser) {

@Override
public List<Operation> parse(String statement) {
Optional<Operation> command = DINKY_EXTENDED_PARSER.parse(statement);
Optional<Operation> command = getDinkyParser().parse(statement);

// note: null represent not custom parser;
return command.map(Collections::singletonList).orElse(null);
Expand All @@ -88,6 +81,11 @@ public Parser getParser() {
return parser;
}

@Override
public ExtendedParser getDinkyParser() {
return DinkyExtendedParser.INSTANCE;
}

@Override
public SqlNode parseExpression(String sqlExpression) {
return calciteParserSupplier.get().parseExpression(sqlExpression);
Expand All @@ -103,24 +101,4 @@ public SqlNode validate(SqlNode sqlNode) {
FlinkPlannerImpl flinkPlanner = validatorSupplier.get();
return flinkPlanner.validate(sqlNode);
}

public static class DinkyExtendedParser extends ExtendedParser {
public static final DinkyExtendedParser INSTANCE = new DinkyExtendedParser();

private static final List<ExtendedParseStrategy> PARSE_STRATEGIES = Arrays.asList(
AddJarSqlParseStrategy.INSTANCE,
CreateAggTableSelectSqlParseStrategy.INSTANCE,
SetSqlParseStrategy.INSTANCE,
CreateTemporalTableFunctionParseStrategy.INSTANCE);

@Override
public Optional<Operation> parse(String statement) {
for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) {
if (strategy.match(statement)) {
return Optional.of(strategy.convert(statement));
}
}
return Optional.empty();
}
}
}
Loading

0 comments on commit 7655efe

Please sign in to comment.