Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2171][fix] 解决on k8s application模式提交后任务经常状态为未知问题 #2173

Merged
merged 11 commits into from
Aug 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -245,7 +246,19 @@ public JobResult submitTask(Integer id) {
JobResult jobResult;
if (config.isJarTask()) {
jobResult = jobManager.executeJar();
process.finish("Submit Flink Jar finished.");
if (jobResult.isSuccess()) {
process.finish("Submit Flink SQL finished, JobManager Web Interface: http://"
+ jobResult.getJobManagerAddress());
} else {
// 如果提交失败,则只打印出关键的错误信息
process.error("Submit Flink SQL " + jobResult.getStatus());
if (Asserts.isNotNull(jobResult.getError())) {
process.error(jobResult.getError().split("\n")[0]);
Arrays.stream(jobResult.getError().split("\n"))
.filter(row -> row.contains("Caused by"))
.forEach(row -> process.error(row));
}
}
} else {
jobResult = jobManager.executeSql(task.getStatement());
process.finish("Submit Flink SQL finished.");
Expand Down
41 changes: 41 additions & 0 deletions dinky-common/src/main/java/org/dinky/utils/ThreadUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** @since 2023/8/9 */
public class ThreadUtil {
protected static final Logger logger = LoggerFactory.getLogger(ThreadUtil.class);

/**
* try sleep exception
*
* @param ms
*/
public static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
logger.warn("sleep exception");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,39 @@ public EnvironmentSetting(String host, Integer port, String... jarFiles) {

public static EnvironmentSetting build(String address, String... jarFiles) {
Asserts.checkNull(address, "Flink 地址不能为空");
String[] strs = address.split(NetConstant.COLON);
String[] strs = getHostAndPort(address);
if (strs.length >= 2) {
return new EnvironmentSetting(strs[0], Integer.parseInt(strs[1]), jarFiles);
} else {
return new EnvironmentSetting(jarFiles);
}
}

/**
* 修复地址,解决无只有域名无端口的情况,具体情况,请看测试用例
*
* @param address flink地址(前端配置)
* @return String[]{域名,端口}
*/
static String[] getHostAndPort(String address) {
address = address.toLowerCase().replace("://", "//");
String[] strs = address.split(NetConstant.COLON);
// 兼容直接填写域名情况(走k8s ingress时没有端口号)
if (address.startsWith("http//")) {
String port = strs.length == 1 ? "80" : strs[1];
strs = new String[] {strs[0].replace("http//", ""), port};
}
if (address.startsWith("https//")) {
String port = strs.length == 1 ? "443" : strs[1];
strs = new String[] {strs[0].replace("https//", ""), port};
}
// 如果没有填端口,则默认80
if (strs.length == 1) {
strs = new String[] {address, "80"};
}
return strs;
}

public String getAddress() {
if (Asserts.isAllNotNull(host, port)) {
return host + NetConstant.COLON + port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
package org.dinky.gateway.kubernetes;

import org.dinky.assertion.Asserts;
import org.dinky.data.constant.NetConstant;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.gateway.config.AppConfig;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.KubernetesResult;
import org.dinky.utils.LogUtil;
import org.dinky.utils.ThreadUtil;

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
Expand All @@ -41,6 +43,7 @@
import java.util.concurrent.ExecutionException;

import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;

/**
* KubernetesApplicationGateway
Expand Down Expand Up @@ -115,17 +118,41 @@ public GatewayResult submitJar() {
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jobId = jobStatusMessage.getJobId().toHexString();
}
// if JobStatusMessage not have job id, use timestamp
// and... it`s maybe wrong with submit
// if JobStatusMessage not have job id, it`s maybe wrong with submit,throw exception
if (TextUtils.isEmpty(jobId)) {
jobId = "unknown" + System.currentTimeMillis();
int cost = SystemConfiguration.getInstances().getJobIdWait() - counts;
String clusterId = clusterClient.getClusterId();
throw new Exception("无法获得jobId请联系管理排查问题,等待时长:" + cost + ",job name:" + clusterId);
}
result.setId(jobId);
result.setWebURL(clusterClient.getWebInterfaceURL());
waitForTaskManagerToBeReady(result.getWebURL(), jobId);
result.success();
} catch (Exception e) {
result.fail(LogUtil.getError(e));
}
return result;
}

/**
* 等待tm完成,若不等待,则后续步骤可能会查询不到状态,报 NullPointerException
*
* @param apiPath
* @param jobId
*/
static void waitForTaskManagerToBeReady(String apiPath, String jobId) {
int jobIdWait = SystemConfiguration.getInstances().getJobIdWait();
String fullPath = String.format("http://%s/jobs/%s", apiPath, jobId);
for (int i = 1; i <= jobIdWait; i++) {
try {
// 不抛异常,就为成功
String result = HttpUtil.get(fullPath, NetConstant.SERVER_TIME_OUT_ACTIVE);
logger.info("get job status success,jobPath:{},result: {}", fullPath, result);
break;
} catch (Exception e) {
logger.info("Unable to connect to Flink JobManager: {},wait count:{}", fullPath, i);
ThreadUtil.sleep(1000);
}
}
}
}