Skip to content

Commit

Permalink
Fix app not throw exception (DataLinkDC#2796)
Browse files Browse the repository at this point in the history
* fix log error

* add k8s doc

* add k8s doc

* fix dinky app not throw exception

* Add a retry limit

* formate code
  • Loading branch information
gaoyan1998 authored Dec 26, 2023
1 parent df6a1ae commit 1a3cc6a
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 51 deletions.
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.15/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.16/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.17/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.18/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
1 change: 1 addition & 0 deletions dinky-app/dinky-app-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ public class FlinkAppUtil {
*/
public static void monitorFlinkTask(Executor executor, int taskId) {
boolean isRun = true;
int reTryCount = 0;
try (RestClusterClient<StandaloneClusterId> client = createClient(executor)) {
while (isRun) {
Collection<JobStatusMessage> jobs = client.listJobs().get();
if (jobs.isEmpty()) {
log.error("No Flink task found, try again in 2 seconds.....");
log.error("No Flink task found, try again in 5 seconds.....");
reTryCount++;
if (reTryCount > 10) {
isRun = false;
log.error("No Flink task found, please check the Flink cluster status.");
}
}
for (JobStatusMessage job : jobs) {
if (JobStatus.isDone(job.getJobState().toString())) {
Expand Down Expand Up @@ -101,6 +107,7 @@ private static void sendHook(int taskId, String jobId, int reTryCount) throws In

/**
* Create a REST cluster client for Flink.
*
* @return
* @throws Exception
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

import cn.hutool.core.text.StrFormatter;

/**
* KubernetesSessionGateway
Expand All @@ -46,6 +49,8 @@ public GatewayType getType() {
@Override
public GatewayResult deployCluster(FlinkUdfPathContextHolder udfPathContextHolder) {
if (Asserts.isNull(client)) {
String clusterId = StrFormatter.format("dinky-flink-session-{}", System.currentTimeMillis());
addConfigParas(KubernetesConfigOptions.CLUSTER_ID, clusterId);
init();
}

Expand Down

0 comments on commit 1a3cc6a

Please sign in to comment.