Skip to content

Commit

Permalink
[Bug] [UDF] fix udf some bug (DataLinkDC#3045)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored Jan 24, 2024
1 parent 3de42d7 commit 78a179b
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 100 deletions.
3 changes: 1 addition & 2 deletions dinky-admin/src/main/java/org/dinky/configure/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ public void addInterceptors(InterceptorRegistry registry) {
}))
.addPathPatterns("/api/**")
.excludePathPatterns(
"/api/login", "/api/ldap/ldapEnableStatus",
"/druid/**", "/openapi/**");
"/api/login", "/api/ldap/ldapEnableStatus", "/download/**", "/druid/**", "/openapi/**");

registry.addInterceptor(new TenantInterceptor())
.addPathPatterns("/api/**")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,17 +596,22 @@ public boolean saveOrUpdateTask(Task task) {
task.setStatement(code);
}
}
String className = "";
// to compiler udf
if (Asserts.isNotNullString(task.getDialect())
&& Dialect.JAVA.isDialect(task.getDialect())
&& Asserts.isNotNullString(task.getStatement())) {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement());
task.setSavePointPath(compiler.getFullClassName());
className = compiler.getFullClassName();
} else if (Dialect.PYTHON.isDialect(task.getDialect())) {
task.setSavePointPath(task.getName() + "." + UDFUtil.getPyUDFAttr(task.getStatement()));
className = task.getName() + "." + UDFUtil.getPyUDFAttr(task.getStatement());
} else if (Dialect.SCALA.isDialect(task.getDialect())) {
task.setSavePointPath(UDFUtil.getScalaFullClassName(task.getStatement()));
className = UDFUtil.getScalaFullClassName(task.getStatement());
}
if (!task.getConfigJson().getUdfConfig().getClassName().equals(className)) {
UdfCodePool.remove(task.getConfigJson().getUdfConfig().getClassName());
}
task.getConfigJson().getUdfConfig().setClassName(className);
UdfCodePool.addOrUpdate(UDFUtils.taskToUDF(task));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.python.PythonOptions;
Expand All @@ -56,13 +55,10 @@
import org.apache.flink.table.api.TableResult;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.SQLException;
Expand All @@ -82,6 +78,7 @@
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.http.HttpUtil;
import lombok.SneakyThrows;

/**
Expand Down Expand Up @@ -126,8 +123,7 @@ public static void submit(AppParamConfig config) throws SQLException {
executorConfig, new WeakReference<>(DinkyClassLoader.build()).get());

// 加载第三方jar //TODO 这里有问题,需要修一修
// loadDep(appTask.getType(),
// config.getTaskId(),DBUtil.getSysConfig(Status.SYS_ENV_SETTINGS_DINKYADDR.getKey()), executorConfig);
loadDep(appTask.getType(), config.getTaskId(), executorConfig);
log.info("The job configuration is as follows: {}", executorConfig);

String[] statements =
Expand Down Expand Up @@ -169,7 +165,8 @@ public static String buildSql(AppTask appTask) throws SQLException {
return sb.toString();
}

private static void loadDep(String type, Integer taskId, String dinkyAddr, ExecutorConfig executorConfig) {
private static void loadDep(String type, Integer taskId, ExecutorConfig executorConfig) {
String dinkyAddr = SystemConfiguration.getInstances().getDinkyAddr().getValue();
if (StringUtils.isBlank(dinkyAddr)) {
return;
}
Expand All @@ -187,44 +184,47 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu
if (exists) {
String depPath = flinkHome + "/dep";
ZipUtils.unzip(depZip, depPath);
log.info(
"download dep success, include :{}",
Arrays.stream(FileUtil.file(depPath).listFiles())
.map(File::getName)
.collect(Collectors.joining(",\n")));
// move all jar
FileUtil.listFileNames(depPath + "/jar").forEach(f -> {
FileUtil.moveContent(
FileUtil.file(depPath + "/jar/" + f), FileUtil.file(usrlib + "/" + f), true);
FileUtil.move(FileUtil.file(depPath + "/jar/" + f), FileUtil.file(usrlib + "/" + f), true);
});
URL[] jarUrls = FileUtil.listFileNames(usrlib).stream()
.map(f -> URLUtil.getURL(FileUtil.file(usrlib, f)))
.toArray(URL[]::new);
URL[] pyUrls = FileUtil.listFileNames(depPath + "/py/").stream()
.map(f -> URLUtil.getURL(FileUtil.file(depPath + "/py/", f)))
.toArray(URL[]::new);

addURLs(jarUrls);
executorConfig
.getConfig()
.put(
PipelineOptions.JARS.key(),
Arrays.stream(jarUrls).map(URL::toString).collect(Collectors.joining(";")));
if (ArrayUtil.isNotEmpty(pyUrls)) {
executorConfig
.getConfig()
.put(
PythonOptions.PYTHON_FILES.key(),
Arrays.stream(jarUrls)
.map(URL::toString)
.collect(Collectors.joining(",")));
if (FileUtil.isDirectory(usrlib)) {
URL[] jarUrls = FileUtil.listFileNames(usrlib).stream()
.map(f -> URLUtil.getURL(FileUtil.file(usrlib, f)))
.toArray(URL[]::new);
addURLs(jarUrls);
executor.getCustomTableEnvironment()
.addJar(FileUtil.file(usrlib).listFiles());
}
if (FileUtil.isDirectory(depPath + "/py/")) {
URL[] pyUrls = FileUtil.listFileNames(depPath + "/py/").stream()
.map(f -> URLUtil.getURL(FileUtil.file(depPath + "/py/", f)))
.toArray(URL[]::new);
if (ArrayUtil.isNotEmpty(pyUrls)) {
executor.getCustomTableEnvironment()
.addConfiguration(
PythonOptions.PYTHON_FILES,
Arrays.stream(pyUrls)
.map(URL::toString)
.collect(Collectors.joining(",")));
}
}
}
} catch (IOException e) {
log.error("");
throw new RuntimeException(e);
}
}
executorConfig.getConfig().put("python.files", "./python_udf.zip");
}

private static void addURLs(URL[] jarUrls) {
URLClassLoader urlClassLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
Thread.currentThread().setContextClassLoader(new DinkyClassLoader(new URL[] {}));
URLClassLoader urlClassLoader = (URLClassLoader) Thread.currentThread().getContextClassLoader();
try {
Method add = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
add.setAccessible(true);
Expand All @@ -238,23 +238,10 @@ private static void addURLs(URL[] jarUrls) {

public static boolean downloadFile(String url, String path) throws IOException {
try {
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
// 设置超时间为3秒
conn.setConnectTimeout(3 * 1000);
// 获取输入流
try (InputStream inputStream = conn.getInputStream()) {
// 获取输出流
try (FileOutputStream outputStream = new FileOutputStream(path)) {
// 每次下载1024位
byte[] b = new byte[1024];
int len = -1;
while ((len = inputStream.read(b)) != -1) {
outputStream.write(b, 0, len);
}
return true;
}
}
HttpUtil.downloadFile(url, path);
return true;
} catch (Exception e) {
log.error("download failed, Reason:", e);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,17 @@
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.DefaultExecutor;

import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.URLUtil;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -230,23 +224,10 @@ public ObjectNode getStreamGraph(String statement) {
}
}

@Override
public void addJar(File... jarPath) {
Configuration configuration =
(Configuration) getStreamExecutionEnvironment().getConfiguration();
List<String> pathList =
Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList());
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars != null) {
CollUtil.addAll(jars, pathList);
}
Map<String, Object> flinkConfigurationMap = getFlinkConfigurationMap();
flinkConfigurationMap.put(PipelineOptions.JARS.key(), jars);
}

@Override
public <T> void addConfiguration(ConfigOption<T> option, T value) {
Map<String, Object> flinkConfigurationMap = getFlinkConfigurationMap();
getConfig().addConfiguration(new Configuration().set(option, value));
flinkConfigurationMap.put(option.key(), value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,17 @@
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;

import java.io.File;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.URLUtil;

/**
* CustomTableEnvironmentImpl
Expand Down Expand Up @@ -210,32 +205,16 @@ public ObjectNode getStreamGraph(String statement) {
}
}

@Override
public void addJar(File... jarPath) {
Configuration configuration =
(Configuration) getStreamExecutionEnvironment().getConfiguration();
List<String> pathList =
Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList());
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars != null) {
CollUtil.addAll(jars, pathList);
}
Map<String, Object> flinkConfigurationMap = getFlinkConfigurationMap();
flinkConfigurationMap.put(PipelineOptions.JARS.key(), jars);
}

@Override
public <T> void addConfiguration(ConfigOption<T> option, T value) {
Map<String, Object> flinkConfigurationMap = getFlinkConfigurationMap();
getConfig().set(option, value);
flinkConfigurationMap.put(option.key(), value);
}

private Map<String, Object> getFlinkConfigurationMap() {
Field configuration = null;
try {
configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration) configuration.get(getStreamExecutionEnvironment());
Configuration o = getRootConfiguration();
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map<String, Object> temp = (Map<String, Object>) confData.get(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ default void addJar(File... jarPath) {
Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList());
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
configuration.set(PipelineOptions.JARS, pathList);
addConfiguration(PipelineOptions.JARS, pathList);
} else {
CollUtil.addAll(jars, pathList);
}
Expand Down
18 changes: 18 additions & 0 deletions dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,16 @@
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.URLUtil;
import lombok.extern.slf4j.Slf4j;

/**
* Executor
*
* @since 2021/11/17
*/
@Slf4j
public abstract class Executor {

private static final Logger logger = LoggerFactory.getLogger(Executor.class);
Expand Down Expand Up @@ -133,7 +136,22 @@ public String getTimeZone() {
return getTableConfig().getLocalTimeZone().getId();
}

private void initClassloader(DinkyClassLoader classLoader) {
if (classLoader != null) {
try {
StreamExecutionEnvironment env = this.environment;
// Fix the Classloader in the env above to appClassLoader, causing ckp to fail to compile
ReflectUtil.setFieldValue(env, "userClassloader", classLoader);
env.configure(env.getConfiguration(), classLoader);
} catch (Throwable e) {
log.warn(
"The version of flink does not have a Classloader field and the classloader cannot be set.", e);
}
}
}

protected void init(DinkyClassLoader classLoader) {
initClassloader(classLoader);
this.dinkyClassLoader = classLoader;
if (executorConfig.isValidParallelism()) {
environment.setParallelism(executorConfig.getParallelism());
Expand Down
6 changes: 6 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import java.io.File;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.time.LocalDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -235,6 +236,11 @@ private boolean failed() {
public boolean close() {
CustomTableEnvironmentContext.clear();
RowLevelPermissionsContext.clear();
try {
getExecutor().getDinkyClassLoader().close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public static void addOrUpdate(UDF udf) {
CODE_POOL.put(udf.getClassName(), udf);
}

public static void remove(String className) {
CODE_POOL.remove(className);
}

public static UDF getUDF(String className) {
UDF udf = CODE_POOL.get(className);
if (udf == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
public class UDFUtil {

public static final String FUNCTION_SQL_REGEX =
"^CREATE\\s+(?:(?:TEMPORARY|TEMPORARY\\s+SYSTEM)\\s+)?FUNCTION\\s+(?:IF\\s+NOT\\s+EXISTS\\s+)?(\\S+)\\s+AS\\s+'(\\S+)'\\s*(?:LANGUAGE\\s+(?:JAVA|SCALA|PYTHON)\\s+)?(?:USING\\s+JAR\\s+'(\\S+)'\\s*(?:,\\s*JAR\\s+'(\\S+)'\\s*)*)?";
"^CREATE\\s+(?:(?:TEMPORARY|TEMPORARY\\s+SYSTEM)\\s+)?FUNCTION\\s+(?:IF\\s+NOT\\s+EXISTS\\s+)?(\\S+)\\s+AS\\s+'(\\S+)'\\s*(?:LANGUAGE\\s+(?:JAVA|SCALA|PYTHON)\\s*)?(?:USING\\s+JAR\\s+'(\\S+)'\\s*(?:,\\s*JAR\\s+'(\\S+)'\\s*)*)?";
public static final Pattern PATTERN = Pattern.compile(FUNCTION_SQL_REGEX, Pattern.CASE_INSENSITIVE);

public static final String SESSION = "SESSION";
Expand All @@ -118,7 +118,7 @@ public class UDFUtil {
*/
protected static final Map<String, Integer> UDF_MD5_MAP = new HashMap<>();

public static final String PYTHON_UDF_ATTR = "(\\S)\\s+=\\s+ud(?:f|tf|af|taf)";
public static final String PYTHON_UDF_ATTR = "(\\S+)\\s*=\\s*ud(?:f|tf|af|taf)";
public static final String PYTHON_UDF_DEF = "@ud(?:f|tf|af|taf).*\\n+def\\s+(.*)\\(.*\\):";
public static final String SCALA_UDF_CLASS = "class\\s+(\\w+)(\\s*\\(.*\\)){0,1}\\s+extends";
public static final String SCALA_UDF_PACKAGE = "package\\s+(.*);";
Expand Down Expand Up @@ -343,7 +343,7 @@ public static UDF toUDF(String statement, DinkyClassLoader classLoader) {
String gitPackage = UdfCodePool.getGitPackage(className);

if (StrUtil.isNotBlank(gitPackage) && FileUtil.exist(gitPackage)) {
if (FileUtil.getSuffix(gitPackage).equals("jar")) {
if ("jar".equals(FileUtil.getSuffix(gitPackage))) {
udfPathContextHolder.addUdfPath(new File(gitPackage));
} else {
udfPathContextHolder.addPyUdfPath(new File(gitPackage));
Expand Down
Loading

0 comments on commit 78a179b

Please sign in to comment.