Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31518][Runtime / REST] Fix StandaloneHaServices#getClusterRestEndpointLeaderRetreiver to return correct rest port #22308

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ public DispatcherResourceManagerComponent create(
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();

configuration.setInteger(RestOptions.PORT, webMonitorEndpoint.getRestPort());
configuration.setString(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need update this Address?

"getServerAddress()" may produce NPE here.

RestOptions.ADDRESS, webMonitorEndpoint.getServerAddress().getHostString());

final String hostname = RpcUtils.getHostname(rpcService);

resourceManagerService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;

import java.net.UnknownHostException;

/**
* {@code ClientHighAvailabilityServices} provides services those are required on client-side. At
* the moment only the REST endpoint leader retriever is required because all communication between
Expand All @@ -32,5 +34,5 @@ public interface ClientHighAvailabilityServices extends AutoCloseable {
*
* @return the leader retriever for cluster's rest endpoint.
*/
LeaderRetrievalService getClusterRestEndpointLeaderRetriever();
LeaderRetrievalService getClusterRestEndpointLeaderRetriever() throws UnknownHostException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.util.concurrent.FutureUtils;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -202,7 +203,8 @@ default LeaderElectionService getClusterRestEndpointLeaderElectionService() {
}

@Override
default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
default LeaderRetrievalService getClusterRestEndpointLeaderRetriever()
throws UnknownHostException {
// for backwards compatibility we delegate to getWebMonitorLeaderRetriever
// all implementations of this interface should override
// getClusterRestEndpointLeaderRetriever, though
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,9 @@ public static HighAvailabilityServices createHighAvailabilityServices(
RpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
addressResolution,
configuration);
final String webMonitorAddress =
getWebMonitorAddress(configuration, addressResolution);

return new StandaloneHaServices(
resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
resourceManagerRpcUrl, dispatcherRpcUrl, configuration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need these change if you use ClientHighAvailabilityServices to retrieve the rest address and port?

case ZOOKEEPER:
return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
case KUBERNETES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package org.apache.flink.runtime.highavailability.nonha.standalone;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.rpc.AddressResolution;

import java.net.UnknownHostException;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -45,23 +50,20 @@ public class StandaloneHaServices extends AbstractNonHaServices {
/** The fix address of the Dispatcher. */
private final String dispatcherAddress;

private final String clusterRestEndpointAddress;
private final Configuration configuration;

/**
* Creates a new services class for the fix pre-defined leaders.
*
* @param resourceManagerAddress The fix address of the ResourceManager
* @param clusterRestEndpointAddress
* @param configuration
*/
public StandaloneHaServices(
String resourceManagerAddress,
String dispatcherAddress,
String clusterRestEndpointAddress) {
String resourceManagerAddress, String dispatcherAddress, Configuration configuration) {
this.resourceManagerAddress =
checkNotNull(resourceManagerAddress, "resourceManagerAddress");
this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
this.clusterRestEndpointAddress =
checkNotNull(clusterRestEndpointAddress, clusterRestEndpointAddress);
this.configuration = configuration;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -134,10 +136,13 @@ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
}

@Override
public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
public LeaderRetrievalService getClusterRestEndpointLeaderRetriever()
throws UnknownHostException {
String clusterRestEndpointAddress =
HighAvailabilityServicesUtils.getWebMonitorAddress(
configuration, AddressResolution.NO_ADDRESS_RESOLUTION);
synchronized (lock) {
checkNotShutdown();

return new StandaloneLeaderRetrievalService(
clusterRestEndpointAddress, DEFAULT_LEADER_ID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.util.Preconditions;
Expand All @@ -41,7 +40,7 @@ public class MiniClusterConfiguration {

static final int DEFAULT_IO_POOL_SIZE = 4;

private final UnmodifiableConfiguration configuration;
private final Configuration configuration;

private final int numTaskManagers;

Expand Down Expand Up @@ -72,7 +71,7 @@ public MiniClusterConfiguration(
this.pluginManager = pluginManager;
}

private UnmodifiableConfiguration generateConfiguration(final Configuration configuration) {
private Configuration generateConfiguration(final Configuration configuration) {
final Configuration modifiedConfig = new Configuration(configuration);

TaskExecutorResourceUtils.adjustForLocalExecution(modifiedConfig);
Expand All @@ -94,7 +93,7 @@ private UnmodifiableConfiguration generateConfiguration(final Configuration conf
modifiedConfig.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(5L));
}

return new UnmodifiableConfiguration(modifiedConfig);
return new Configuration(modifiedConfig);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -146,7 +145,7 @@ public String getTaskManagerBindAddress() {
: configuration.getString(TaskManagerOptions.BIND_HOST, "localhost");
}

public UnmodifiableConfiguration getConfiguration() {
public Configuration getConfiguration() {
return configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.highavailability.nonha.standalone;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
Expand All @@ -39,16 +40,14 @@ public class StandaloneHaServicesTest extends TestLogger {

private final String dispatcherAddress = "dispatcher";
private final String resourceManagerAddress = "resourceManager";
private final String webMonitorAddress = "webMonitor";

private StandaloneHaServices standaloneHaServices;

@Before
public void setupTest() {

Configuration config = new Configuration();
standaloneHaServices =
new StandaloneHaServices(
resourceManagerAddress, dispatcherAddress, webMonitorAddress);
new StandaloneHaServices(resourceManagerAddress, dispatcherAddress, config);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.minicluster;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
Expand Down Expand Up @@ -156,11 +157,13 @@ protected HighAvailabilityServices createHighAvailabilityServices(
final List<DispatcherResourceManagerComponent> result =
new ArrayList<>(numberDispatcherResourceManagerComponents);

String configuredPort = configuration.getString(RestOptions.BIND_PORT);
for (int i = 0; i < numberDispatcherResourceManagerComponents; i++) {
// FLINK-24038 relies on the fact that there is only one leader election instance per
// JVM that is freed when the JobManager stops. This is simulated in the
// TestingMiniCluster by providing individual HighAvailabilityServices per
// DispatcherResourceManagerComponent to allow running more-than-once JobManager tests

final HighAvailabilityServices thisHaServices =
createHighAvailabilityServices(configuration, getIOExecutor());
final DispatcherResourceManagerComponent dispatcherResourceManagerComponent =
Expand Down Expand Up @@ -200,6 +203,10 @@ protected HighAvailabilityServices createHighAvailabilityServices(
});
FutureUtils.assertNoException(shutDownFuture);
result.add(dispatcherResourceManagerComponent);
if (i + 1 < numberDispatcherResourceManagerComponents) {
// Reset the port to original
configuration.setString(RestOptions.BIND_PORT, configuredPort);
}
}

return result;
Expand Down