diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index 9ddb8070ee..5e812393e0 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -29,6 +29,7 @@ import org.dinky.gateway.enums.SavePointStrategy; import org.dinky.gateway.model.FlinkClusterConfig; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestOptions; @@ -42,12 +43,14 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.extern.slf4j.Slf4j; /** * JobConfig * * @since 2021/6/27 18:45 */ +@Slf4j @Data @Builder @AllArgsConstructor @@ -257,9 +260,18 @@ public void buildGatewayConfig(FlinkClusterConfig config) { Assert.notNull(customConfig.getValue(), "Custom flink config has null value"); flinkConfig.getConfiguration().put(customConfig.getName(), customConfig.getValue()); } + + Map configuration = flinkConfig.getConfiguration(); + + // In Kubernetes mode, must set jobmanager.memory.process.size. + if (StringUtils.isBlank(configuration.get("jobmanager.memory.process.size"))) { + log.warn("In Kubernetes mode, please configure 'jobmanager.memory.process.size', default 2048m"); + configuration.put("jobmanager.memory.process.size", "2048m"); + } + // Load job configuration content afterwards - flinkConfig.getConfiguration().putAll(getConfigJson()); - flinkConfig.getConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism)); + configuration.putAll(getConfigJson()); + configuration.put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism)); flinkConfig.setJobName(getJobName()); gatewayConfig = GatewayConfig.build(config);