Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19519] [Runtime/Configuration] Support port range for Taskmanager data port configuration #15704

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
<td>Long</td>
<td>Time we wait for the timers in milliseconds to finish all pending timer threads when the stream task is cancelled.</td>
</tr>
<tr>
<td><h5>taskmanager.data.bind-port</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The task manager's bind port used for data exchange operations. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine. If not configured, 'taskmanager.data.port' will be used.</td>
</tr>
<tr>
<td><h5>taskmanager.data.port</h5></td>
<td style="word-wrap: break-word;">0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>Integer</td>
<td>The port that the client connects to. If rest.bind-port has not been specified, then the REST server will bind to this port. Attention: This option is respected only if the high-availability configuration is NONE.</td>
</tr>
<tr>
<td><h5>taskmanager.data.bind-port</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The task manager's bind port used for data exchange operations. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine. If not configured, 'taskmanager.data.port' will be used.</td>
</tr>
<tr>
<td><h5>taskmanager.data.port</h5></td>
<td style="word-wrap: break-word;">0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
<tr>
<td><h5>taskmanager.data.bind-port</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The task manager's bind port used for data exchange operations. If not configured, 'taskmanager.data.port' will be used.</td>
<td>String</td>
<td>The task manager's bind port used for data exchange operations. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine. If not configured, 'taskmanager.data.port' will be used.</td>
</tr>
<tr>
<td><h5>taskmanager.data.port</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,19 @@ public class NettyShuffleEnvironmentOptions {
"The task manager’s external port used for data exchange operations.");

/** The local network port that the task manager listen at for data exchange. */
public static final ConfigOption<Integer> DATA_BIND_PORT =
@Documentation.Section({
Documentation.Sections.COMMON_HOST_PORT,
Documentation.Sections.ALL_TASK_MANAGER
})
public static final ConfigOption<String> DATA_BIND_PORT =
key("taskmanager.data.bind-port")
.intType()
.stringType()
.noDefaultValue()
.withDescription(
"The task manager's bind port used for data exchange operations. If not configured, '"
"The task manager's bind port used for data exchange operations."
+ " Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both."
+ " It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine."
+ " If not configured, '"
+ DATA_PORT.key()
+ "' will be used.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ void shutdown() {
private void initNioBootstrap() {
// Add the server port number to the name in order to distinguish
// multiple clients running on the same host.
String name = NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
String name =
NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")";

NioEventLoopGroup nioGroup =
new NioEventLoopGroup(
Expand All @@ -157,7 +158,8 @@ private void initNioBootstrap() {
private void initEpollBootstrap() {
// Add the server port number to the name in order to distinguish
// multiple clients running on the same host.
String name = NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
String name =
NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")";

EpollEventLoopGroup epollGroup =
new EpollEventLoopGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.NetUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.net.InetAddress;
import java.util.Iterator;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -49,7 +49,8 @@ enum TransportType {

private final InetAddress serverAddress;

private final int serverPort;
private final String serverPortRange;
private final Iterator<Integer> serverPortRangeIterator;
Comment on lines +52 to +53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like serverPortRangeIterator can be created using serverPortRange, but is there a reason to keep both fields?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, serverPort was used to generate the Netty thread group name. So when I replace serverPort with serverPortRangeIterator, I also include serverPortRange for that purpose.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of my question is, isn't one field enough? For example, if there is only the serverPortRange field, can't the iterator be created at the desired time using a getter method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because NetUtils.getPortRangeFromString also does validation, the iterator is generated during configuration parsing phase, so that if there's any configuration error, the user will be notified earlier.


private final int memorySegmentSize;

Expand All @@ -59,15 +60,16 @@ enum TransportType {

public NettyConfig(
InetAddress serverAddress,
int serverPort,
Iterator<Integer> serverPortRangeIterator,
String serverPortRange,
int memorySegmentSize,
int numberOfSlots,
Configuration config) {

this.serverAddress = checkNotNull(serverAddress);

checkArgument(NetUtils.isValidHostPort(serverPort), "Invalid port number.");
this.serverPort = serverPort;
this.serverPortRangeIterator = serverPortRangeIterator;
this.serverPortRange = serverPortRange;

checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
this.memorySegmentSize = memorySegmentSize;
Expand All @@ -84,8 +86,12 @@ InetAddress getServerAddress() {
return serverAddress;
}

int getServerPort() {
return serverPort;
String getServerPortRange() {
return serverPortRange;
}

Iterator<Integer> getServerPortRangeIterator() {
return serverPortRangeIterator;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -165,7 +171,7 @@ public String toString() {
String format =
"NettyConfig ["
+ "server address: %s, "
+ "server port: %d, "
+ "server port range: %s, "
+ "ssl enabled: %s, "
+ "memory segment size (bytes): %d, "
+ "transport type: %s, "
Expand All @@ -181,7 +187,7 @@ public String toString() {
return String.format(
format,
serverAddress,
serverPort,
serverPortRange,
getSSLEnabled() ? "true" : "false",
memorySegmentSize,
getTransportType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;

Expand All @@ -64,6 +65,7 @@ class NettyServer {
NettyServer(NettyConfig config) {
this.config = checkNotNull(config);
localAddress = null;
bindFuture = null;
}

int init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException {
Expand Down Expand Up @@ -109,9 +111,6 @@ int init(
// Configuration
// --------------------------------------------------------------------

// Server bind address
bootstrap.localAddress(config.getServerAddress(), config.getServerPort());

// Pooled allocators for Netty's ByteBuf instances
bootstrap.option(ChannelOption.ALLOCATOR, nettyBufferPool);
bootstrap.childOption(ChannelOption.ALLOCATOR, nettyBufferPool);
Expand Down Expand Up @@ -145,17 +144,37 @@ int init(
// Start Server
// --------------------------------------------------------------------

bindFuture = bootstrap.bind().syncUninterruptibly();

localAddress = (InetSocketAddress) bindFuture.channel().localAddress();

final long duration = (System.nanoTime() - start) / 1_000_000;
LOG.info(
"Successful initialization (took {} ms). Listening on SocketAddress {}.",
duration,
localAddress);
Iterator<Integer> portRangeIterator = config.getServerPortRangeIterator();
while (bindFuture == null && portRangeIterator.hasNext()) {
Integer port = portRangeIterator.next();
LOG.info("Trying to bind Task Manager data port to: {}", port);
bootstrap.localAddress(config.getServerAddress(), port);

try {
bindFuture = bootstrap.bind().syncUninterruptibly();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to bind port {}", port, e);
} else {
LOG.info("Unable to bind port {}, due to error: {}", port, e.getMessage());
}
}
}

return localAddress.getPort();
if (bindFuture != null) {
localAddress = (InetSocketAddress) bindFuture.channel().localAddress();

final long duration = (System.nanoTime() - start) / 1_000_000;
LOG.info(
"Successful initialization (took {} ms). Listening on SocketAddress {}.",
duration,
localAddress);
return localAddress.getPort();
} else {
throw new IOException(
"Unable to bind Task Manager Netty Server to any ports in "
+ config.getServerPortRange());
}
}

NettyConfig getConfig() {
Expand Down Expand Up @@ -186,7 +205,8 @@ void shutdown() {
private void initNioBootstrap() {
// Add the server port number to the name in order to distinguish
// multiple servers running on the same host.
String name = NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
String name =
NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")";

NioEventLoopGroup nioGroup =
new NioEventLoopGroup(config.getServerNumThreads(), getNamedThreadFactory(name));
Expand All @@ -196,7 +216,8 @@ private void initNioBootstrap() {
private void initEpollBootstrap() {
// Add the server port number to the name in order to distinguish
// multiple servers running on the same host.
String name = NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
String name =
NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")";

EpollEventLoopGroup epollGroup =
new EpollEventLoopGroup(config.getServerNumThreads(), getNamedThreadFactory(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand All @@ -34,9 +36,10 @@
import javax.annotation.Nullable;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;

/** Configuration object for the network stack. */
Expand Down Expand Up @@ -218,7 +221,7 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(
boolean localTaskManagerCommunication,
InetAddress taskManagerAddress) {

final int dataBindPort = getDataBindPort(configuration);
final Iterator<Integer> dataBindPortRange = getDataBindPortRange(configuration);

final int pageSize = ConfigurationParserUtils.getPageSize(configuration);

Expand All @@ -227,7 +230,7 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(
configuration,
localTaskManagerCommunication,
taskManagerAddress,
dataBindPort);
dataBindPortRange);

final int numberOfNetworkBuffers =
calculateNumberOfNetworkBuffers(configuration, networkMemorySize, pageSize);
Expand Down Expand Up @@ -306,26 +309,35 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(
* @param configuration configuration object
* @return the data port
*/
private static int getDataBindPort(Configuration configuration) {
final int dataBindPort;
private static Iterator<Integer> getDataBindPortRange(Configuration configuration) {
final Iterator<Integer> portRangeIterator;
final String dataBindPortRange;
if (configuration.contains(NettyShuffleEnvironmentOptions.DATA_BIND_PORT)) {
dataBindPort = configuration.getInteger(NettyShuffleEnvironmentOptions.DATA_BIND_PORT);
ConfigurationParserUtils.checkConfigParameter(
dataBindPort >= 0,
dataBindPort,
NettyShuffleEnvironmentOptions.DATA_BIND_PORT.key(),
"Leave config parameter empty to fallback to '"
+ NettyShuffleEnvironmentOptions.DATA_PORT.key()
+ "' automatically.");
dataBindPortRange =
configuration.getString(NettyShuffleEnvironmentOptions.DATA_BIND_PORT);
try {
portRangeIterator = NetUtils.getPortRangeFromString(dataBindPortRange);
} catch (Exception e) {
throw new IllegalConfigurationException(
"Invalid value for '"
+ NettyShuffleEnvironmentOptions.DATA_BIND_PORT.key()
+ "' (port for the TaskManager data transfer): "
+ dataBindPortRange
+ " Leave config parameter empty to fallback to '"
+ NettyShuffleEnvironmentOptions.DATA_PORT.key()
+ "' automatically.");
}
} else {
dataBindPort = configuration.getInteger(NettyShuffleEnvironmentOptions.DATA_PORT);
final int dataBindPort =
configuration.getInteger(NettyShuffleEnvironmentOptions.DATA_PORT);
ConfigurationParserUtils.checkConfigParameter(
dataBindPort >= 0,
dataBindPort,
NettyShuffleEnvironmentOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");
portRangeIterator = Collections.singletonList(dataBindPort).iterator();
}
return dataBindPort;
return portRangeIterator;
}

/**
Expand Down Expand Up @@ -370,25 +382,34 @@ private static void logIfIgnoringOldConfigs(Configuration configuration) {
* @param localTaskManagerCommunication true, to skip initializing the network stack
* @param taskManagerAddress identifying the IP address under which the TaskManager will be
* accessible
* @param dataport data port for communication and data exchange
* @param dataBindPortRangeIterator An iterator for data ports for communication and data
* exchange
* @return the netty configuration or {@code null} if communication is in the same task manager
*/
@Nullable
private static NettyConfig createNettyConfig(
Configuration configuration,
boolean localTaskManagerCommunication,
InetAddress taskManagerAddress,
int dataport) {
Iterator<Integer> dataBindPortRangeIterator) {

final NettyConfig nettyConfig;
if (!localTaskManagerCommunication) {
final InetSocketAddress taskManagerInetSocketAddress =
new InetSocketAddress(taskManagerAddress, dataport);
final String dataBindPortRange;
if (configuration.contains(NettyShuffleEnvironmentOptions.DATA_BIND_PORT)) {
dataBindPortRange =
configuration.getString(NettyShuffleEnvironmentOptions.DATA_BIND_PORT);
} else {
dataBindPortRange =
String.valueOf(
configuration.getInteger(NettyShuffleEnvironmentOptions.DATA_PORT));
}

nettyConfig =
new NettyConfig(
taskManagerInetSocketAddress.getAddress(),
taskManagerInetSocketAddress.getPort(),
taskManagerAddress,
dataBindPortRangeIterator,
dataBindPortRange,
ConfigurationParserUtils.getPageSize(configuration),
ConfigurationParserUtils.getSlot(configuration),
configuration);
Expand Down
Loading