Skip to content

Commit

Permalink
[refactor][k8s cluster]K8s doc optimize and refactor code (DataLinkDC…
Browse files Browse the repository at this point in the history
…#1635)

* flink ClusterConfiguration change

* refactor flink ClusterConfiguration code

* refactor flink ClusterConfiguration code

* update sql update file

* change code

* change code

* add private

* add k8s remote desc

* optimize k8s test and change web

* optimize k8s test and change web

* optimize k8s test and change web

* change github action

* changeCode

* changeCode

* changeCode
  • Loading branch information
zackyoungh authored Feb 11, 2023
1 parent 14d9c91 commit e1a5feb
Show file tree
Hide file tree
Showing 18 changed files with 269 additions and 133 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ jobs:
fail-fast: true
matrix:
jdk: [8, 11]
flink: [ 1.12, 1.13, 1.14, 1.15, 1.16 ]
exclude:
- jdk: 11
flink: 1.12

timeout-minutes: 30
env:
Expand All @@ -106,17 +110,25 @@ jobs:
with:
path: |
~/.m2/repository/*/*/*
!~/.m2/repository/org/apache/flink
key: ${{ runner.os }}-maven-${{ hashFiles('pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Cache local Flink repository
uses: actions/cache@v3
with:
path: ~/.m2/repository/org/apache/flink
key: ${{ runner.os }}-${{ matrix.flink }}-maven-${{ hashFiles('pom.xml') }}
restore-keys: |
${{ runner.os }}-${{ matrix.flink }}-maven-
- name: Build and Package
run: |
./mvnw -B clean install \
-Dmaven.test.skip=false \
-Dspotless.check.skip=true \
-Denforcer.skip=true \
-Dmaven.javadoc.skip=true \
-P prod,scala-2.12,flink-all,maven-central \
-P prod,scala-2.12,flink-${{ matrix.flink }},maven-central \
--no-snapshot-updates
# 检查打包的大小
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.FileUtil;
import cn.hutool.extra.servlet.ServletUtil;
Expand Down Expand Up @@ -90,4 +91,21 @@ public void downloadPythonUDF(@PathVariable Integer taskId, HttpServletResponse
FileUtil.file(
PathConstant.getUdfPackagePath(taskId) + PathConstant.UDF_PYTHON_NAME));
}

/**
* 提供docker通过http下载dinky-app.jar
*
* @param version 版本
* @param resp resp
*/
@GetMapping("downloadAppJar/{version}")
public void downloadAppJar(@PathVariable String version, HttpServletResponse resp) {
List<File> files =
FileUtil.loopFiles(
PathConstant.WORK_DIR + "/jar",
pathname -> pathname.getName().contains("dinky-app-" + version));
if (CollUtil.isNotEmpty(files)) {
ServletUtil.write(resp, files.get(0));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.dinky.config.Docker;
import org.dinky.db.service.impl.SuperServiceImpl;
import org.dinky.function.constant.PathConstant;
import org.dinky.gateway.GatewayType;
import org.dinky.gateway.config.ClusterConfig;
import org.dinky.gateway.config.FlinkConfig;
Expand All @@ -34,9 +33,6 @@
import org.dinky.service.ClusterConfigurationService;
import org.dinky.utils.DockerClientUtils;

import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -48,9 +44,9 @@
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;

/**
* ClusterConfigServiceImpl
Expand Down Expand Up @@ -113,37 +109,24 @@ public TestResult testGateway(ClusterConfiguration clusterConfiguration) {
} else if (config.getType() == FlinkClusterConfiguration.Type.Kubernetes) {
gatewayConfig.setType(GatewayType.KUBERNETES_APPLICATION);

Dict kubernetesConfig = Dict.of(config.getKubernetesConfig());
Map<String, String> kubernetesConfig = config.getKubernetesConfig();

Opt.ofBlankAble(kubernetesConfig.getStr("kubernetes.namespace"))
.ifPresent(v -> flinkConfigMap.put("kubernetes.namespace", v));
// filter str blank value
kubernetesConfig =
MapUtil.filter(kubernetesConfig, entry -> !StrUtil.isBlank(entry.getValue()));

Opt.ofBlankAble(kubernetesConfig.getStr("kubernetes.cluster-id"))
.ifPresentOrElse(
v -> flinkConfigMap.put("kubernetes.cluster-id", v),
() ->
flinkConfigMap.put(
"kubernetes.cluster-id", UUID.randomUUID().toString()));
// set default value
kubernetesConfig.putIfAbsent("kubernetes.cluster-id", UUID.randomUUID().toString());

Opt.ofBlankAble(kubernetesConfig.getStr("kubernetes.container.image"))
.ifPresent(v -> flinkConfigMap.put("kubernetes.container.image", v));
flinkConfigMap.putAll(kubernetesConfig);

String fileDir =
FileUtil.isDirectory(PathConstant.WORK_DIR + "/docker")
? PathConstant.WORK_DIR + "/docker"
: PathConstant.WORK_DIR;
File dockerFile;
try {
dockerFile =
FileUtil.writeUtf8String(
FileUtil.readUtf8String(dockerfileResource.getFile()),
fileDir + "/DinkyFlinkDockerfile");
Docker docker =
Docker.build((Map) clusterConfiguration.getConfig().get("dockerConfig"));
if (docker != null
&& StringUtils.isNotBlank(docker.getInstance())
&& clusterConfiguration.getId() != null) {
new DockerClientUtils(docker, dockerFile).initImage();
Docker.build(
(Map<String, Object>)
clusterConfiguration.getConfig().get("dockerConfig"));
if (docker != null && clusterConfiguration.getId() != null) {
new DockerClientUtils(docker).initImage();
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,12 @@ private void loadDocker(
}

DockerClientUtils dockerClientUtils = new DockerClientUtils(docker);
if (StrUtil.isNotBlank(dockerClientUtils.getImage())) {
String tag = dockerClientUtils.getDocker().getTag();
if (StrUtil.isNotBlank(tag)) {
gatewayConfig
.getFlinkConfig()
.getConfiguration()
.put("kubernetes.container.image", dockerClientUtils.getImage());
.put("kubernetes.container.image", tag);
}
}

Expand Down
5 changes: 4 additions & 1 deletion dinky-admin/src/main/resources/DinkyFlinkDockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# 用来构建dinky环境
ARG FLINK_VERSION=1.14.5
ARG FLINK_BIG_VERSION=1.14

FROM flink:${FLINK_VERSION}

ARG FLINK_VERSION
ARG FLINK_BIG_VERSION
ENV PYTHON_HOME /opt/miniconda3

USER root
Expand All @@ -16,4 +19,4 @@ RUN pip install "apache-flink==${FLINK_VERSION}" -i http://pypi.douban.com/simpl

RUN cp /opt/flink/opt/flink-python_* /opt/flink/lib/

COPY jar/dinky-app-@dinky.flink.version@-@project.version@-jar-with-dependencies.jar plugins/* $FLINK_HOME/lib/
RUN wget -O dinky-app-${FLINK_BIG_VERSION}.jar - ${DINKY_HTTP}/downloadAppJar/${FLINK_BIG_VERSION} | mv dinky-app-${FLINK_BIG_VERSION}.jar
13 changes: 13 additions & 0 deletions dinky-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,25 @@
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-transport-httpclient5</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
32 changes: 25 additions & 7 deletions dinky-common/src/main/java/org/dinky/config/Docker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import org.dinky.assertion.Asserts;

import java.util.List;
import java.util.Map;

import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -46,25 +48,41 @@ public class Docker {
private String registryPassword;
private String imageNamespace;
private String imageStorehouse;
private String imageDinkyVersion;
private String imageVersion;
private String dockerfile;
private String tag;

public static Docker build(Map configMap) {
public static Docker build(Map<String, Object> configMap) {
if (Asserts.isNullMap(configMap)) {
return null;
}
String instance1 = configMap.getOrDefault("docker.instance", "").toString();
if ("".equals(instance1)) {
// eg tag: docker.io/dinky/flink:1.16.0
String tag = configMap.getOrDefault("docker.image.tag", "").toString();
String dockerfile = configMap.getOrDefault("docker.image.dockerfile", "").toString();
if (StrUtil.hasBlank(instance1, tag)) {
return null;
}
List<String> tagSplit = StrUtil.splitTrim(tag, "/");
List<String> versionSplit = StrUtil.splitTrim(tag, ":");
if (tagSplit.size() < 1 || versionSplit.size() < 1) {
throw new RuntimeException("image tag eg:(docker.io/dinky/flink:1.16.0)");
}
if (tagSplit.size() == 2) {
tagSplit.add(0, "docker.io");
}
tagSplit.set(2, tagSplit.get(2).replace(":" + versionSplit.get(1), ""));

return Docker.builder()
.instance(instance1)
.registryUrl(configMap.getOrDefault("docker.registry.url", "").toString())
.registryUsername(configMap.getOrDefault("docker.registry.username", "").toString())
.registryPassword(configMap.getOrDefault("docker.registry.password", "").toString())
.imageNamespace(configMap.getOrDefault("docker.image.namespace", "").toString())
.imageStorehouse(configMap.getOrDefault("docker.image.storehouse", "").toString())
.imageDinkyVersion(
configMap.getOrDefault("docker.image.dinkyVersion", "").toString())
.imageNamespace(tagSplit.get(1))
.imageStorehouse(tagSplit.get(2))
.imageVersion(versionSplit.get(1))
.tag(tag)
.dockerfile(dockerfile)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class FlinkClusterConfiguration {
private String userJarPath;

Map<String, String> flinkConfig;
Map<String, Object> kubernetesConfig;
Map<String, String> kubernetesConfig;

public static enum Type {
//
Expand Down
65 changes: 30 additions & 35 deletions dinky-common/src/main/java/org/dinky/utils/DockerClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.dinky.docker.DockerClientBuilder;

import java.io.File;
import java.net.URISyntaxException;
import java.util.Arrays;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
Expand All @@ -35,6 +33,8 @@
import com.github.dockerjava.api.model.PushResponseItem;
import com.github.dockerjava.core.DefaultDockerClientConfig;

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.UUID;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -49,24 +49,13 @@ public class DockerClientUtils {
private final DockerClient dockerClient;
private final Docker docker;
private final File dockerfile;
private final String image;

public DockerClientUtils(Docker docker) {
this(docker, null);
}

public DockerClientUtils(Docker docker, File dockerfile) {
this.docker = docker;
this.dockerfile = dockerfile;
this.image =
String.join(
"/",
Arrays.asList(
docker.getRegistryUrl(),
docker.getImageNamespace(),
docker.getImageStorehouse()))
+ ":"
+ docker.getImageDinkyVersion();
this.dockerfile =
FileUtil.writeUtf8String(
docker.getDockerfile(),
System.getProperty("user.dir") + "/tmp/dockerfile/" + UUID.randomUUID());
dockerClient =
DockerClientBuilder.getInstance(
DefaultDockerClientConfig.createDefaultConfigBuilder()
Expand All @@ -91,29 +80,37 @@ public DockerClientUtils(Docker docker, File dockerfile) {
}
}

public void initImage() throws URISyntaxException, InterruptedException {
BuildImageResultCallback resultCallback = new BuildImageResultCallback();
dockerClient
.buildImageCmd()
.withRemove(true)
.withDockerfile(dockerfile)
.withTag(image)
.exec(resultCallback);
resultCallback.awaitImageId();
pushImage(image);
cleanNoneImage();
public void initImage() throws InterruptedException {
try {
BuildImageResultCallback resultCallback = new BuildImageResultCallback();

if (FileUtil.readUtf8String(dockerfile).length() > 0) {
dockerClient
.buildImageCmd()
.withRemove(true)
.withDockerfile(dockerfile)
.withTag(docker.getTag())
.exec(resultCallback);
resultCallback.awaitCompletion().onError(new RuntimeException());
}
pushImage(docker.getTag());
cleanNoneImage();
} finally {
FileUtil.del(dockerfile);
}
}

public void pushImage(String tag) throws InterruptedException {
ResultCallback.Adapter<PushResponseItem> resultCallback1 = new ResultCallback.Adapter<>();
dockerClient.pushImageCmd(tag).exec(resultCallback1);
try {
log.info("start push-image: {}", tag);
resultCallback1.awaitCompletion().onError(new RuntimeException());
log.info("push-image finish: {}", tag);
} catch (Exception e) {
log.error("push-image failed: {} , reason: {}", tag, e.getMessage());
throw e;
}
resultCallback1.awaitCompletion();
log.info("push-image finish: {}", tag);
}

/** 清除空容器 */
Expand All @@ -134,14 +131,12 @@ public void cleanNoneImage() {
/**
* 创建容器
*
* @param client
* @return
* @param client client
* @return 创建容器
*/
public CreateContainerResponse createContainers(
DockerClient client, String containerName, String imageName) {
CreateContainerResponse container =
client.createContainerCmd(imageName).withName(containerName).exec();
return container;
return client.createContainerCmd(imageName).withName(containerName).exec();
}

/**
Expand Down
Loading

0 comments on commit e1a5feb

Please sign in to comment.