Skip to content

Commit

Permalink
[2171][fix] 解决on k8s application模式提交后任务经常状态为未知问题 (#2173)
Browse files Browse the repository at this point in the history
* [2171][fix] 解决on k8s application模式提交后任务经常状态为未知问题

* [2175][fix] 1.如果on k8s application模式提交失败时控制台无任何信息,2.兼容flink地址没有端口号的情况

* [2175][fix] 1.解决现存的一些bug,和一些优化

* [2175][fix] 1.fix code style

* [2175][fix] 1.fix code style

* [2175][fix] 1.delete test class because code format error

* [2175][fix] 1.fix test class...

* [2175][fix] delete test class because code format error

* [2175][fix] delete test class because code format error

* [2175][fix] fix code format

---------

Co-authored-by: chengsheng <chengsheng@travelsky.com.cn>
  • Loading branch information
seawenc and chengsheng authored Aug 12, 2023
1 parent d5b597c commit 09a984a
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -245,7 +246,19 @@ public JobResult submitTask(Integer id) {
JobResult jobResult;
if (config.isJarTask()) {
jobResult = jobManager.executeJar();
process.finish("Submit Flink Jar finished.");
if (jobResult.isSuccess()) {
process.finish("Submit Flink SQL finished, JobManager Web Interface: http://"
+ jobResult.getJobManagerAddress());
} else {
// 如果提交失败,则只打印出关键的错误信息
process.error("Submit Flink SQL " + jobResult.getStatus());
if (Asserts.isNotNull(jobResult.getError())) {
process.error(jobResult.getError().split("\n")[0]);
Arrays.stream(jobResult.getError().split("\n"))
.filter(row -> row.contains("Caused by"))
.forEach(row -> process.error(row));
}
}
} else {
jobResult = jobManager.executeSql(task.getStatement());
process.finish("Submit Flink SQL finished.");
Expand Down
41 changes: 41 additions & 0 deletions dinky-common/src/main/java/org/dinky/utils/ThreadUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** @since 2023/8/9 */
public class ThreadUtil {
protected static final Logger logger = LoggerFactory.getLogger(ThreadUtil.class);

/**
* try sleep exception
*
* @param ms
*/
public static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
logger.warn("sleep exception");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,39 @@ public EnvironmentSetting(String host, Integer port, String... jarFiles) {

public static EnvironmentSetting build(String address, String... jarFiles) {
Asserts.checkNull(address, "Flink 地址不能为空");
String[] strs = address.split(NetConstant.COLON);
String[] strs = getHostAndPort(address);
if (strs.length >= 2) {
return new EnvironmentSetting(strs[0], Integer.parseInt(strs[1]), jarFiles);
} else {
return new EnvironmentSetting(jarFiles);
}
}

/**
* 修复地址,解决无只有域名无端口的情况,具体情况,请看测试用例
*
* @param address flink地址(前端配置)
* @return String[]{域名,端口}
*/
static String[] getHostAndPort(String address) {
address = address.toLowerCase().replace("://", "//");
String[] strs = address.split(NetConstant.COLON);
// 兼容直接填写域名情况(走k8s ingress时没有端口号)
if (address.startsWith("http//")) {
String port = strs.length == 1 ? "80" : strs[1];
strs = new String[] {strs[0].replace("http//", ""), port};
}
if (address.startsWith("https//")) {
String port = strs.length == 1 ? "443" : strs[1];
strs = new String[] {strs[0].replace("https//", ""), port};
}
// 如果没有填端口,则默认80
if (strs.length == 1) {
strs = new String[] {address, "80"};
}
return strs;
}

public String getAddress() {
if (Asserts.isAllNotNull(host, port)) {
return host + NetConstant.COLON + port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
package org.dinky.gateway.kubernetes;

import org.dinky.assertion.Asserts;
import org.dinky.data.constant.NetConstant;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.gateway.config.AppConfig;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.KubernetesResult;
import org.dinky.utils.LogUtil;
import org.dinky.utils.ThreadUtil;

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
Expand All @@ -41,6 +43,7 @@
import java.util.concurrent.ExecutionException;

import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;

/**
* KubernetesApplicationGateway
Expand Down Expand Up @@ -115,17 +118,41 @@ public GatewayResult submitJar() {
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jobId = jobStatusMessage.getJobId().toHexString();
}
// if JobStatusMessage not have job id, use timestamp
// and... it`s maybe wrong with submit
// if JobStatusMessage not have job id, it`s maybe wrong with submit,throw exception
if (TextUtils.isEmpty(jobId)) {
jobId = "unknown" + System.currentTimeMillis();
int cost = SystemConfiguration.getInstances().getJobIdWait() - counts;
String clusterId = clusterClient.getClusterId();
throw new Exception("无法获得jobId请联系管理排查问题,等待时长:" + cost + ",job name:" + clusterId);
}
result.setId(jobId);
result.setWebURL(clusterClient.getWebInterfaceURL());
waitForTaskManagerToBeReady(result.getWebURL(), jobId);
result.success();
} catch (Exception e) {
result.fail(LogUtil.getError(e));
}
return result;
}

/**
* 等待tm完成,若不等待,则后续步骤可能会查询不到状态,报 NullPointerException
*
* @param apiPath
* @param jobId
*/
static void waitForTaskManagerToBeReady(String apiPath, String jobId) {
int jobIdWait = SystemConfiguration.getInstances().getJobIdWait();
String fullPath = String.format("http://%s/jobs/%s", apiPath, jobId);
for (int i = 1; i <= jobIdWait; i++) {
try {
// 不抛异常,就为成功
String result = HttpUtil.get(fullPath, NetConstant.SERVER_TIME_OUT_ACTIVE);
logger.info("get job status success,jobPath:{},result: {}", fullPath, result);
break;
} catch (Exception e) {
logger.info("Unable to connect to Flink JobManager: {},wait count:{}", fullPath, i);
ThreadUtil.sleep(1000);
}
}
}
}

0 comments on commit 09a984a

Please sign in to comment.