Skip to content

Commit

Permalink
[Feature #1906][git] support python udf develop & auto upload (#1993)
Browse files Browse the repository at this point in the history
* add py udf

* fix log repeat

* update sql &console

* update sql &console
  • Loading branch information
zackyoungh authored May 30, 2023
1 parent 4473c44 commit 6c516cb
Show file tree
Hide file tree
Showing 27 changed files with 357 additions and 112 deletions.
28 changes: 26 additions & 2 deletions dinky-admin/src/main/java/org/dinky/Dinky.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.freemarker.FreeMarkerAutoConfiguration;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import cn.hutool.system.SystemUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* Dinky 启动器
*
Expand All @@ -33,9 +39,27 @@
@EnableTransactionManagement
@SpringBootApplication(exclude = FreeMarkerAutoConfiguration.class)
@EnableCaching
@Slf4j
public class Dinky {

@SneakyThrows
public static void main(String[] args) {
SpringApplication.run(Dinky.class, args);
String ipAddress = SystemUtil.getHostInfo().getAddress();
System.setProperty("ipAddr", ipAddress);
SpringApplication app = new SpringApplication(Dinky.class);
ConfigurableApplicationContext application = app.run(args);
Environment env = application.getEnvironment();
String port = env.getProperty("server.port");
log.info(
"\n----------------------------------------------------------\n\t"
+ "Application 'Dinky' is running! Access URLs:\n\t"
+ "Local: \t\thttp://localhost:{}\n\t"
+ "External: \thttp://{}:{}\n\t"
+ "Doc: \thttp://{}:{}/doc.html\n"
+ "----------------------------------------------------------",
port,
ipAddress,
port,
ipAddress,
port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public Object round(ProceedingJoinPoint proceedingJoinPoint) {
if (!(e instanceof DinkyException)) {
throw new DinkyException(e);
}
e.printStackTrace();
throw (DinkyException) e;
} finally {
if (proceed instanceof JobResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.exception.BusException;
import org.dinky.function.constant.PathConstant;
import org.dinky.function.util.ZipWriter;
import org.dinky.model.FlinkUdfManifest;

import java.io.File;
import java.io.InputStream;
Expand All @@ -40,9 +41,9 @@
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.extra.servlet.ServletUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;

/** @since 0.7.0 */
Expand All @@ -61,34 +62,45 @@ public void downloadJavaUDF(@PathVariable Integer taskId, HttpServletResponse re
if (!depManifestFile.exists()) {
return;
}
JSONObject jsonObject = new JSONObject(FileUtil.readUtf8String(depManifestFile));
JSONArray jars = jsonObject.getJSONArray("jars");
List<String> filePath = jars.stream().map(Convert::toStr).collect(Collectors.toList());
FlinkUdfManifest flinkUdfManifest =
JSONUtil.toBean(FileUtil.readUtf8String(depManifestFile), FlinkUdfManifest.class);
List<String> filePath =
flinkUdfManifest.getJars().stream()
.map(Convert::toStr)
.collect(Collectors.toList());
List<String> pyFilePath =
flinkUdfManifest.getPythonFiles().stream()
.map(Convert::toStr)
.collect(Collectors.toList());
String[] jarNameList =
filePath.stream()
.map(FileUtil::getName)
.map(x -> "jar/" + x)
.toArray(String[]::new);
String[] pyFileNameList =
pyFilePath.stream()
.map(FileUtil::getName)
.map(x -> "py/" + x)
.toArray(String[]::new);

File zipFile = FileUtil.file(udfPackagePath + PathConstant.DEP_ZIP);
InputStream[] inputStreams =
filePath.stream().map(FileUtil::getInputStream).toArray(InputStream[]::new);
InputStream[] pyInputStreams =
pyFilePath.stream().map(FileUtil::getInputStream).toArray(InputStream[]::new);
try (ZipWriter zip = new ZipWriter(zipFile, Charset.defaultCharset())) {
zip.add(jarNameList, inputStreams);
if (ArrayUtil.isNotEmpty(jarNameList)) {
zip.add(jarNameList, inputStreams);
}
if (ArrayUtil.isNotEmpty(pyFileNameList)) {
zip.add(pyFileNameList, pyInputStreams);
}
zip.add(depManifestFile.getName(), FileUtil.getInputStream(depManifestFile));
}
ServletUtil.write(resp, FileUtil.getInputStream(zipFile));
FileUtil.del(zipFile);
}

@GetMapping("downloadPythonUDF/{taskId}")
public void downloadPythonUDF(@PathVariable Integer taskId, HttpServletResponse resp) {
ServletUtil.write(
resp,
FileUtil.file(
PathConstant.getUdfPackagePath(taskId) + PathConstant.UDF_PYTHON_NAME));
}

/**
* 提供docker通过http下载dinky-app.jar
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class GitController {
* @return {@link Result} of {@link Void}
*/
@PutMapping("/saveOrUpdate")
public Result<Void> saveOrUpdate(@Validated @RequestBody GitProject gitProject) {
public Result<Void> saveOrUpdate(@Validated @RequestBody GitProjectDTO gitProject) {
gitProjectService.saveOrUpdate(gitProject);
GitRepository gitRepository =
new GitRepository(BeanUtil.copyProperties(gitProject, GitProjectDTO.class));
Expand Down
2 changes: 2 additions & 0 deletions dinky-admin/src/main/java/org/dinky/dto/GitProjectDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,6 @@ public class GitProjectDTO {

/** */
@NotNull private Boolean enabled;

private Integer orderLine;
}
3 changes: 3 additions & 0 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.dinky.scheduler.config.DolphinSchedulerProperties;
import org.dinky.scheduler.exception.SchedulerException;
import org.dinky.scheduler.model.Project;
import org.dinky.service.GitProjectService;
import org.dinky.service.JobInstanceService;
import org.dinky.service.SysConfigService;
import org.dinky.service.TaskService;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class SystemInit implements ApplicationRunner {
private final TaskService taskService;
private final TenantService tenantService;
private final DolphinSchedulerProperties dolphinSchedulerProperties;
private final GitProjectService gitProjectService;
private static Project project;

@Override
Expand Down Expand Up @@ -125,6 +127,7 @@ public void registerUDF() {
taskService.getAllUDF().stream()
.map(UDFUtils::taskToUDF)
.collect(Collectors.toList()));
UdfCodePool.updateGitPool(gitProjectService.getGitPool());
TenantContextHolder.set(null);
}
}
3 changes: 3 additions & 0 deletions dinky-admin/src/main/java/org/dinky/model/GitProject.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public class GitProject extends SuperEntity<GitProject> {
@TableField(value = "udf_class_map_list")
private String udfClassMapList;

@TableField(value = "order_line")
private Integer orderLine;

@TableField(exist = false)
private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dinky.model.GitProject;

import java.util.List;
import java.util.Map;

/**
* @author ZackYoung
Expand All @@ -38,6 +39,8 @@ public interface GitProjectService extends ISuperService<GitProject> {
*/
void saveOrUpdate(GitProjectDTO gitProjectDTO);

Map<String, String> getGitPool();

/**
* 更新状态
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.dinky.service.impl;

import org.dinky.db.service.impl.SuperServiceImpl;
import org.dinky.dto.GitAnalysisJarDTO;
import org.dinky.dto.GitProjectDTO;
import org.dinky.dto.JarClassesDTO;
import org.dinky.dto.TreeNodeDTO;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.mapper.GitProjectMapper;
import org.dinky.model.GitProject;
import org.dinky.process.exception.DinkyException;
Expand All @@ -34,7 +36,9 @@
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -43,6 +47,8 @@

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil;

/**
Expand All @@ -66,9 +72,55 @@ public void saveOrUpdate(GitProjectDTO gitProjectDTO) {
}

GitProject gitProject = BeanUtil.toBean(gitProjectDTO, GitProject.class);
if (gitProject.getOrderLine() == null) {
Integer maxOrderLine =
Opt.ofNullable(
baseMapper
.selectOne(
new LambdaQueryWrapper<GitProject>()
.orderByAsc(GitProject::getOrderLine)
.last(" limit 1"))
.getOrderLine())
.orElse(999);
gitProject.setOrderLine(maxOrderLine + 1);
}
BeanUtil.copyProperties(gitProjectDTO, gitProject);

gitProject.insertOrUpdate();

ThreadUtil.execAsync(() -> UdfCodePool.updateGitPool(getGitPool()));
}

@Override
public Map<String, String> getGitPool() {
List<GitProject> list = list();
Map<String, String> gitPool = new LinkedHashMap<>();
Opt.ofEmptyAble(list)
.ifPresent(
l -> {
for (GitProject gitProject : list) {
List<GitAnalysisJarDTO> gitAnalysisJarList =
JSONUtil.toList(
gitProject.getUdfClassMapList(),
GitAnalysisJarDTO.class);
for (GitAnalysisJarDTO analysisJarDTO : gitAnalysisJarList) {
analysisJarDTO
.getClassList()
.forEach(
udf -> {
gitPool.computeIfAbsent(
udf,
k -> analysisJarDTO.getJarPath());
});
}
}
});
return gitPool;
}

@Override
public List<GitProject> list() {
return list(new LambdaQueryWrapper<GitProject>().orderByAsc(GitProject::getOrderLine));
}

@Override
Expand All @@ -87,7 +139,8 @@ public void updateState(Integer id) {
@Override
public List<TreeNodeDTO> getProjectCode(Integer id) {
GitProject gitProject = getById(id);
File projectDir = GitRepository.getProjectDir(gitProject.getName());
File projectDir =
new File(GitRepository.getProjectDir(gitProject.getName()), gitProject.getBranch());
return TreeUtil.treeNodeData(projectDir, true);
}

Expand Down
11 changes: 10 additions & 1 deletion dinky-admin/src/main/java/org/dinky/sse/DoneStepSse.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

package org.dinky.sse;

import org.dinky.function.pool.UdfCodePool;
import org.dinky.service.GitProjectService;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import cn.hutool.core.lang.Dict;
import cn.hutool.extra.spring.SpringUtil;

/**
* @author ZackYoung
Expand All @@ -43,5 +47,10 @@ public DoneStepSse(
}

@Override
public void exec() {}
public void exec() {
addFileMsgCusLog("Updating UDF pool");
GitProjectService gitProjectService = SpringUtil.getBean(GitProjectService.class);
UdfCodePool.updateGitPool(gitProjectService.getGitPool());
addFileMsgCusLog("The UDF pool has been updated");
}
}
2 changes: 2 additions & 0 deletions dinky-admin/src/main/resources/mapper/GitProjectMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@
${ew.sqlSegment}
</if>
</where>
order by order_line asc

</select>
</mapper>
Loading

0 comments on commit 6c516cb

Please sign in to comment.