Skip to content

Commit

Permalink
[Feature] [Flink SQL] Added ADD FILE syntax not only submits JAR to J…
Browse files Browse the repository at this point in the history
…ava ClassPath (DataLinkDC#3205)

Co-authored-by: zackyoungh <zackyoungh@users.noreply.github.com>
  • Loading branch information
zackyoungh and zackyoungh authored Mar 1, 2024
1 parent bbe6965 commit 93c43d1
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class PaimonCache extends AbstractValueAdaptingCache {
/**
* TIMEOUT CACHE
*/
private final cn.hutool.cache.Cache<Object, Object> cache = new TimedCache<>(1000 * 60 * 10);
private final cn.hutool.cache.Cache<Object, Object> cache = new TimedCache<>(1000 * 60);

public PaimonCache(String cacheName) {
super(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void uploadFile(Integer pid, String desc, File file) {
* @param size size
*/
@Transactional(rollbackFor = Exception.class)
private void upload(
public void upload(
Integer pid, String desc, Consumer<String> uploadAction, String fileName, Resources pResource, long size) {
Resources currentUploadResource = getOne(
new LambdaQueryWrapper<Resources>().eq(Resources::getPid, pid).eq(Resources::getFileName, fileName));
Expand All @@ -272,7 +272,7 @@ private void upload(
resources.setIsDirectory(false);
resources.setType(0);
String prefixPath = pResource == null ? "" : pResource.getFullName();
fullName = prefixPath + "/" + fileName;
fullName = StrUtil.removePrefix(prefixPath + "/" + fileName, StrUtil.SLASH);

resources.setFullName(fullName);
resources.setSize(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.dinky.resource.BaseResourceManager;
import org.dinky.trans.Operations;
import org.dinky.trans.dml.ExecuteJarOperation;
import org.dinky.trans.parse.AddFileSqlParseStrategy;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.url.RsURLStreamHandlerFactory;
Expand Down Expand Up @@ -274,6 +275,12 @@ public static Optional<JobClient> executeJarJob(String type, Executor executor,
if ("kubernetes-application".equals(type)) {
executor.addJar(info);
}
} else if (Operations.getOperationType(sqlStatement) == SqlType.ADD_FILE) {
File[] info = AddFileSqlParseStrategy.getInfo(sqlStatement);
Arrays.stream(info).forEach(executor.getDinkyClassLoader().getUdfPathContextHolder()::addFile);
if ("kubernetes-application".equals(type)) {
executor.addJar(info);
}
}
}
return jobClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public enum SqlType {

ADD_JAR("ADD_JAR", "^ADD\\s+JAR\\s+\\S+"),
ADD("ADD", "^ADD\\s+CUSTOMJAR\\s+\\S+"),
ADD_FILE("ADD_FILE", "^ADD\\s+FILE\\s+\\S+"),

PRINT("PRINT", "^PRINT.*"),

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.trans.ddl;

import org.dinky.executor.CustomTableEnvironment;
import org.dinky.trans.AbstractOperation;
import org.dinky.trans.ExtendOperation;

import org.apache.flink.table.api.TableResult;

import java.util.Optional;

/**
* @since 0.7.0
*/
public class AddFilerOperation extends AbstractOperation implements ExtendOperation {

public AddFilerOperation(String statement) {
super(statement);
}

public AddFilerOperation() {}

@Override
public Optional<? extends TableResult> execute(CustomTableEnvironment tEnv) {
return Optional.of(TABLE_RESULT_OK);
}

@Override
public String asSummaryString() {
return statement;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.apache.flink.table.api.TableResult;

import java.io.File;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import cn.hutool.core.convert.Convert;
Expand Down Expand Up @@ -63,11 +66,16 @@ public Optional<? extends TableResult> execute(CustomTableEnvironment tEnv) {
}

public StreamGraph getStreamGraph(CustomTableEnvironment tEnv) {
return getStreamGraph(tEnv, Collections.emptyList());
}

public StreamGraph getStreamGraph(CustomTableEnvironment tEnv, List<URL> classpaths) {
JarSubmitParam submitParam = JarSubmitParam.build(statement);
return getStreamGraph(submitParam, tEnv);
return getStreamGraph(submitParam, tEnv, classpaths);
}

public static StreamGraph getStreamGraph(JarSubmitParam submitParam, CustomTableEnvironment tEnv) {
public static StreamGraph getStreamGraph(
JarSubmitParam submitParam, CustomTableEnvironment tEnv, List<URL> classpaths) {
SavepointRestoreSettings savepointRestoreSettings = StrUtil.isBlank(submitParam.getSavepointPath())
? SavepointRestoreSettings.none()
: SavepointRestoreSettings.forPath(
Expand All @@ -85,12 +93,14 @@ public static StreamGraph getStreamGraph(JarSubmitParam submitParam, CustomTable
+ Opt.ofBlankAble(submitParam.getArgs()).orElse(""));
file = null;
}

program = PackagedProgram.newBuilder()
.setJarFile(file)
.setEntryPointClassName(submitParam.getMainClass())
.setConfiguration(configuration)
.setSavepointRestoreSettings(savepointRestoreSettings)
.setArguments(RunTimeUtil.handleCmds(submitParam.getArgs()))
.setUserClassPaths(classpaths)
.build();
int parallelism = StrUtil.isNumeric(submitParam.getParallelism())
? Convert.toInt(submitParam.getParallelism())
Expand All @@ -113,6 +123,10 @@ public StreamGraph explain(CustomTableEnvironment tEnv) {
return getStreamGraph(tEnv);
}

public StreamGraph explain(CustomTableEnvironment tEnv, List<URL> classpaths) {
return getStreamGraph(tEnv, classpaths);
}

@Setter
@Getter
public static class JarSubmitParam {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.trans.parse;

import org.dinky.data.exception.DinkyException;
import org.dinky.trans.ddl.AddFilerOperation;
import org.dinky.utils.URLUtils;

import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.parse.AbstractRegexParseStrategy;

import java.io.File;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;

/**
* @since 0.7.0
*/
public class AddFileSqlParseStrategy extends AbstractRegexParseStrategy {

private static final String ADD_FILE = "(add\\s+file)\\s+'(.*)'";
private static final Pattern ADD_FILE_PATTERN = Pattern.compile(ADD_FILE, Pattern.CASE_INSENSITIVE);
public static final AddFileSqlParseStrategy INSTANCE = new AddFileSqlParseStrategy();

public AddFileSqlParseStrategy() {
super(ADD_FILE_PATTERN);
}

public static File[] getInfo(String statement) {
return getAllFilePath(statement).toArray(new File[0]);
}

protected static List<String> patternStatements(String[] statements) {
return Stream.of(statements)
.filter(s -> ReUtil.isMatch(ADD_FILE_PATTERN, s))
.map(x -> ReUtil.findAllGroup0(ADD_FILE_PATTERN, x).get(0))
.collect(Collectors.toList());
}

public static Set<File> getAllFilePath(String... statements) {
Set<File> fileSet = new HashSet<>();
patternStatements(statements).stream()
.map(x -> ReUtil.findAll(ADD_FILE_PATTERN, x, 2).get(0))
.distinct()
.forEach(urlPath -> {
try {
File file = URLUtils.toFile(urlPath);
if (file == null || !file.exists()) {
throw new DinkyException(StrUtil.format("file : {} not exists!", urlPath));
}
fileSet.add(file);
} catch (Exception e) {
throw new DinkyException(StrUtil.format("url:{} request failed!", urlPath), e);
}
});
return fileSet;
}

public static Set<File> getAllFilePath(String statements) {
return getAllFilePath(new String[] {statements});
}

@Override
public Operation convert(String statement) {
return new AddFilerOperation(statement);
}

@Override
public String[] getHints() {
return new String[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,24 @@
import java.util.HashSet;
import java.util.Set;

/** @since 0.7.0 */
/**
* @since 0.7.0
*/
public class FlinkUdfPathContextHolder {

private final Set<File> UDF_PATH_CONTEXT = new HashSet<>();
private final Set<File> OTHER_PLUGINS_PATH_CONTEXT = new HashSet<>();
private final Set<File> PYTHON_UDF_FILE = new HashSet<>();
private final Set<File> FILES = new HashSet<>();

public void addUdfPath(File file) {
getUdfFile().add(file);
}

public void addFile(File file) {
getFiles().add(file);
}

public void addPyUdfPath(File file) {
getPyUdfFile().add(file);
}
Expand All @@ -53,4 +60,16 @@ public Set<File> getPyUdfFile() {
public Set<File> getOtherPluginsFiles() {
return OTHER_PLUGINS_PATH_CONTEXT;
}

public Set<File> getAllFileSet() {
Set<File> allFileSet = new HashSet<>();
allFileSet.addAll(getUdfFile());
allFileSet.addAll(getOtherPluginsFiles());
allFileSet.addAll(getFiles());
return allFileSet;
}

public Set<File> getFiles() {
return FILES;
}
}
14 changes: 12 additions & 2 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.dinky.parser.SqlType;
import org.dinky.trans.Operations;
import org.dinky.trans.dml.ExecuteJarOperation;
import org.dinky.trans.parse.AddFileSqlParseStrategy;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.utils.DinkyClassLoaderUtil;
Expand All @@ -51,6 +52,7 @@
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -119,6 +121,12 @@ public JobParam pretreatStatements(String[] statements) {
(executor.getDinkyClassLoader())
.addURLs(URLUtils.getURLs(
jobManager.getUdfPathContextHolder().getOtherPluginsFiles()));
} else if (operationType.equals(SqlType.ADD_FILE)) {
AddFileSqlParseStrategy.getAllFilePath(statement)
.forEach(t -> jobManager.getUdfPathContextHolder().addFile(t));
(executor.getDinkyClassLoader())
.addURLs(URLUtils.getURLs(
jobManager.getUdfPathContextHolder().getFiles()));
} else if (operationType.equals(SqlType.ADD_JAR)) {
Configuration combinationConfig = getCombinationConfig();
FileSystem.initialize(combinationConfig, null);
Expand Down Expand Up @@ -285,8 +293,10 @@ public ExplainResult explainSql(String statement) {
if (Asserts.isNull(sqlExplainResult)) {
sqlExplainResult = new SqlExplainResult();
} else if (ExecuteJarParseStrategy.INSTANCE.match(item.getValue())) {
StreamGraph streamGraph =
new ExecuteJarOperation(item.getValue()).explain(executor.getCustomTableEnvironment());

List<URL> allFileByAdd = jobManager.getAllFileSet();
StreamGraph streamGraph = new ExecuteJarOperation(item.getValue())
.explain(executor.getCustomTableEnvironment(), allFileByAdd);
sqlExplainResult.setExplain(streamGraph.getStreamingPlanAsJSON());
} else {
executor.executeSql(item.getValue());
Expand Down
14 changes: 14 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.dinky.job.builder.JobUDFBuilder;
import org.dinky.parser.SqlType;
import org.dinky.trans.Operations;
import org.dinky.trans.parse.AddFileSqlParseStrategy;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.JsonUtils;
Expand All @@ -81,15 +82,18 @@
import java.io.File;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.text.StrFormatter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -374,6 +378,9 @@ public IResult executeDDL(String statement) {
} else if (operationType.equals(SqlType.ADD) || operationType.equals(SqlType.ADD_JAR)) {
Set<File> allFilePath = AddJarSqlParseStrategy.getAllFilePath(item);
getExecutor().getDinkyClassLoader().addURLs(allFilePath);
} else if (operationType.equals(SqlType.ADD_FILE)) {
Set<File> allFilePath = AddFileSqlParseStrategy.getAllFilePath(item);
getExecutor().getDinkyClassLoader().addURLs(allFilePath);
}
LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(newStatement);
Expand Down Expand Up @@ -505,4 +512,11 @@ public String exportSql(String sql) {
sb.append(statement);
return sb.toString();
}

public List<URL> getAllFileSet() {
return CollUtil.isEmpty(getUdfPathContextHolder().getAllFileSet())
? Collections.emptyList()
: Arrays.asList(URLUtils.getURLs(
getUdfPathContextHolder().getAllFileSet().toArray(new File[0])));
}
}
Loading

0 comments on commit 93c43d1

Please sign in to comment.