Skip to content

Commit

Permalink
[Feature-2622][core] Local task supports cancel (DataLinkDC#2623)
Browse files Browse the repository at this point in the history
Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com>
  • Loading branch information
aiwenmo and aiwenmo authored Dec 12, 2023
1 parent 963cb1d commit 7ea8cab
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public boolean success() {
history.setStatus(job.getStatus().getCode());
history.setJobId(job.getJobId());
history.setEndTime(job.getEndTime());
history.setJobManagerAddress(job.isUseGateway() ? job.getJobManagerAddress() : null);
history.setJobManagerAddress(job.getJobManagerAddress());

Integer clusterId = job.getJobConfig().getClusterId();
ClusterInstance clusterInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public String buildEnvironmentAddress(JobConfig config) {
port = Integer.valueOf(flinkConfig.get("rest.port"));
} else {
port = URLUtils.getRandomPort();
while (!IpUtils.isPortAvailable(port)) {
port = URLUtils.getRandomPort();
}
}
return buildLocalEnvironmentAddress(port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,13 @@ public JobConfig buildJobConfig(TaskDTO task) {
config.setClusterId(jobInstance.getClusterId());
}
} else if (GatewayType.LOCAL.equalsValue(task.getType())) {
config.setClusterId(null);
if (task.getJobInstanceId() == null) {
config.setClusterId(null);
} else {
JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId());
config.setClusterId(jobInstance.getClusterId());
config.setUseRemote(true);
}
config.setClusterConfigurationId(null);
} else {
Optional.ofNullable(task.getClusterId()).ifPresent(config::setClusterId);
Expand Down
11 changes: 11 additions & 0 deletions dinky-admin/src/main/java/org/dinky/utils/IpUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;

import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;

import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -171,6 +172,16 @@ public static String getHostIp() {
return "127.0.0.1";
}

public static boolean isPortAvailable(int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
// If the code reaches this point, the port is available
return true;
} catch (Exception e) {
// Port is not available
return false;
}
}

public static String getHostName() {
try {
return InetAddress.getLocalHost().getHostName();
Expand Down
2 changes: 1 addition & 1 deletion dinky-common/src/main/java/org/dinky/utils/URLUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ public static String formatAddress(String webURL) {

public static int getRandomPort() {
Random random = new Random();
return random.nextInt(65536);
return 30000 + random.nextInt(35536);
}
}
2 changes: 1 addition & 1 deletion dinky-core/src/main/java/org/dinky/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void addGatewayConfig(Map<String, Object> config) {
}

public boolean isUseRemote() {
return !GatewayType.LOCAL.equalsValue(type);
return useRemote || !GatewayType.LOCAL.equalsValue(type);
}

public void buildLocal() {
Expand Down

0 comments on commit 7ea8cab

Please sign in to comment.