diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 03500050c2..4ad3a0c30a 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -126,6 +126,7 @@ import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; @@ -245,7 +246,19 @@ public JobResult submitTask(Integer id) { JobResult jobResult; if (config.isJarTask()) { jobResult = jobManager.executeJar(); - process.finish("Submit Flink Jar finished."); + if (jobResult.isSuccess()) { + process.finish("Submit Flink SQL finished, JobManager Web Interface: http://" + + jobResult.getJobManagerAddress()); + } else { + // 如果提交失败,则只打印出关键的错误信息 + process.error("Submit Flink SQL " + jobResult.getStatus()); + if (Asserts.isNotNull(jobResult.getError())) { + process.error(jobResult.getError().split("\n")[0]); + Arrays.stream(jobResult.getError().split("\n")) + .filter(row -> row.contains("Caused by")) + .forEach(row -> process.error(row)); + } + } } else { jobResult = jobManager.executeSql(task.getStatement()); process.finish("Submit Flink SQL finished."); diff --git a/dinky-common/src/main/java/org/dinky/utils/ThreadUtil.java b/dinky-common/src/main/java/org/dinky/utils/ThreadUtil.java new file mode 100644 index 0000000000..09a71a6701 --- /dev/null +++ b/dinky-common/src/main/java/org/dinky/utils/ThreadUtil.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** @since 2023/8/9 */ +public class ThreadUtil { + protected static final Logger logger = LoggerFactory.getLogger(ThreadUtil.class); + + /** + * try sleep exception + * + * @param ms + */ + public static void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + logger.warn("sleep exception"); + } + } +} diff --git a/dinky-executor/src/main/java/org/dinky/executor/EnvironmentSetting.java b/dinky-executor/src/main/java/org/dinky/executor/EnvironmentSetting.java index f23757fca3..374a3bf9a6 100644 --- a/dinky-executor/src/main/java/org/dinky/executor/EnvironmentSetting.java +++ b/dinky-executor/src/main/java/org/dinky/executor/EnvironmentSetting.java @@ -60,7 +60,7 @@ public EnvironmentSetting(String host, Integer port, String... jarFiles) { public static EnvironmentSetting build(String address, String... jarFiles) { Asserts.checkNull(address, "Flink 地址不能为空"); - String[] strs = address.split(NetConstant.COLON); + String[] strs = getHostAndPort(address); if (strs.length >= 2) { return new EnvironmentSetting(strs[0], Integer.parseInt(strs[1]), jarFiles); } else { @@ -68,6 +68,31 @@ public static EnvironmentSetting build(String address, String... jarFiles) { } } + /** + * 修复地址,解决无只有域名无端口的情况,具体情况,请看测试用例 + * + * @param address flink地址(前端配置) + * @return String[]{域名,端口} + */ + static String[] getHostAndPort(String address) { + address = address.toLowerCase().replace("://", "//"); + String[] strs = address.split(NetConstant.COLON); + // 兼容直接填写域名情况(走k8s ingress时没有端口号) + if (address.startsWith("http//")) { + String port = strs.length == 1 ? "80" : strs[1]; + strs = new String[] {strs[0].replace("http//", ""), port}; + } + if (address.startsWith("https//")) { + String port = strs.length == 1 ? "443" : strs[1]; + strs = new String[] {strs[0].replace("https//", ""), port}; + } + // 如果没有填端口,则默认80 + if (strs.length == 1) { + strs = new String[] {address, "80"}; + } + return strs; + } + public String getAddress() { if (Asserts.isAllNotNull(host, port)) { return host + NetConstant.COLON + port; diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesApplicationGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesApplicationGateway.java index e888d3e379..33bf2f10ce 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesApplicationGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesApplicationGateway.java @@ -20,12 +20,14 @@ package org.dinky.gateway.kubernetes; import org.dinky.assertion.Asserts; +import org.dinky.data.constant.NetConstant; import org.dinky.data.model.SystemConfiguration; import org.dinky.gateway.config.AppConfig; import org.dinky.gateway.enums.GatewayType; import org.dinky.gateway.result.GatewayResult; import org.dinky.gateway.result.KubernetesResult; import org.dinky.utils.LogUtil; +import org.dinky.utils.ThreadUtil; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.application.ApplicationConfiguration; @@ -41,6 +43,7 @@ import java.util.concurrent.ExecutionException; import cn.hutool.core.util.StrUtil; +import cn.hutool.http.HttpUtil; /** * KubernetesApplicationGateway @@ -115,17 +118,41 @@ public GatewayResult submitJar() { for (JobStatusMessage jobStatusMessage : jobStatusMessages) { jobId = jobStatusMessage.getJobId().toHexString(); } - // if JobStatusMessage not have job id, use timestamp - // and... it`s maybe wrong with submit + // if JobStatusMessage not have job id, it`s maybe wrong with submit,throw exception if (TextUtils.isEmpty(jobId)) { - jobId = "unknown" + System.currentTimeMillis(); + int cost = SystemConfiguration.getInstances().getJobIdWait() - counts; + String clusterId = clusterClient.getClusterId(); + throw new Exception("无法获得jobId请联系管理排查问题,等待时长:" + cost + ",job name:" + clusterId); } result.setId(jobId); result.setWebURL(clusterClient.getWebInterfaceURL()); + waitForTaskManagerToBeReady(result.getWebURL(), jobId); result.success(); } catch (Exception e) { result.fail(LogUtil.getError(e)); } return result; } + + /** + * 等待tm完成,若不等待,则后续步骤可能会查询不到状态,报 NullPointerException + * + * @param apiPath + * @param jobId + */ + static void waitForTaskManagerToBeReady(String apiPath, String jobId) { + int jobIdWait = SystemConfiguration.getInstances().getJobIdWait(); + String fullPath = String.format("http://%s/jobs/%s", apiPath, jobId); + for (int i = 1; i <= jobIdWait; i++) { + try { + // 不抛异常,就为成功 + String result = HttpUtil.get(fullPath, NetConstant.SERVER_TIME_OUT_ACTIVE); + logger.info("get job status success,jobPath:{},result: {}", fullPath, result); + break; + } catch (Exception e) { + logger.info("Unable to connect to Flink JobManager: {},wait count:{}", fullPath, i); + ThreadUtil.sleep(1000); + } + } + } }