-
Notifications
You must be signed in to change notification settings - Fork 9.1k
Ozone:SCM: Add support for registerNode in datanode #151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
import org.apache.hadoop.classification.InterfaceStability; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.net.NetUtils; | ||
import org.apache.hadoop.scm.ScmConfigKeys; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -35,7 +36,17 @@ | |
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; | ||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS; | ||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; | ||
|
||
import static org.apache.hadoop.ozone.OzoneConfigKeys | ||
.OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT; | ||
import static org.apache.hadoop.ozone.OzoneConfigKeys | ||
.OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT; | ||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; | ||
|
||
import static org.apache.hadoop.ozone.OzoneConfigKeys | ||
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS; | ||
import static org.apache.hadoop.ozone.OzoneConfigKeys | ||
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS_DEFAULT; | ||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT; | ||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS; | ||
|
||
|
@@ -179,20 +190,51 @@ static Optional<String> getHostNameFromConfigKeys( | |
Configuration conf, String ... keys) { | ||
for (final String key : keys) { | ||
final String value = conf.getTrimmed(key); | ||
if (value != null && !value.isEmpty()) { | ||
String[] splits = value.split(":"); | ||
|
||
if(splits.length < 1 || splits.length > 2) { | ||
throw new IllegalArgumentException( | ||
"Invalid value " + value + " for config key " + key + | ||
". It should be in 'host' or 'host:port' format"); | ||
} | ||
return Optional.of(splits[0]); | ||
Optional<String> splits = getHostName(value); | ||
if (splits.isPresent()) { | ||
return splits; | ||
} | ||
} | ||
return Optional.absent(); | ||
} | ||
|
||
/** | ||
* Returns a hostname from the hostname:port or hostname. | ||
* @param value | ||
* @return | ||
*/ | ||
private static Optional<String> getHostComponent(String value, int pos) { | ||
if (value != null && !value.isEmpty()) { | ||
String[] splits = value.split(":"); | ||
|
||
if(splits.length < 1 || splits.length > 2) { | ||
throw new IllegalArgumentException( | ||
"Invalid value " + value + ". It should be in " + | ||
"'host' or 'host:port' format"); | ||
} | ||
|
||
if(splits.length >= pos + 1) { | ||
return Optional.of(splits[pos]); | ||
} | ||
} | ||
return Optional.absent(); | ||
} | ||
|
||
public static Optional<String> getHostName(String value) { | ||
final int hostIndex = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be a Enum as commented above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replaced this logic with HostAndPort class. |
||
return getHostComponent(value, hostIndex); //hostname: | ||
} | ||
|
||
public static Optional<Integer> getHostPort(String value) { | ||
final int portIndex = 1; | ||
Optional<String> portString = getHostComponent(value, portIndex); | ||
if(portString.isPresent()){ | ||
return Optional.of(Integer.parseInt(portString.get())); | ||
} | ||
return Optional.absent(); | ||
} | ||
|
||
|
||
/** | ||
* Retrieve the port number, trying the supplied config keys in order. | ||
* Each config value may be absent, or if present in the format | ||
|
@@ -205,23 +247,11 @@ static Optional<String> getHostNameFromConfigKeys( | |
* @throws IllegalArgumentException if any values are not in the 'host' | ||
* or host:port format. | ||
*/ | ||
static Optional<Integer> getPortNumberFromConfigKeys( | ||
public static Optional<Integer> getPortNumberFromConfigKeys( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add complete java docs for the parameters after make this public? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed. |
||
Configuration conf, String ... keys) { | ||
for (final String key : keys) { | ||
final String value = conf.getTrimmed(key); | ||
if (value != null && !value.isEmpty()) { | ||
String[] splits = value.split(":"); | ||
|
||
if(splits.length < 1 || splits.length > 2) { | ||
throw new IllegalArgumentException( | ||
"Invalid value " + value + " for config key " + key + | ||
". It should be in 'host' or 'host:port' format"); | ||
} | ||
|
||
if (splits.length == 2) { | ||
return Optional.of(Integer.parseInt(splits[1])); | ||
} | ||
} | ||
return getHostPort(value); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we handle multiple keys but only the last key has a valid value? The new logic will return Optional.absent() in the first loop instead of the last valid value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanx fixed. The intention is to return the first key with valid hostport, now it checks for the isPresent and returns the value if it is present or loops till the end of the key space. |
||
} | ||
return Optional.absent(); | ||
} | ||
|
@@ -378,4 +408,38 @@ public static int getMaxHBToProcessPerLoop(Configuration conf){ | |
return conf.getInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, | ||
OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT); | ||
} | ||
|
||
/** | ||
* Timeout value for the RPC from Datanode to SCM, primarily used for | ||
* Heartbeats and container reports. | ||
* | ||
* @param conf - Ozone Config | ||
* @return - Rpc timeout in Milliseconds. | ||
*/ | ||
public static int getScmRpcTimeOutInMilliseconds(Configuration conf) { | ||
return conf.getInt(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest we use Configuration#getTimeDuration() instead of Configuration#getInt() for durations. This will reduce the documentation cost and chance of mis-configuration in deployment. An sample usage can be found in datanode for DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS_DEFAULT); | ||
} | ||
|
||
/** | ||
* Log Warn interval. | ||
* | ||
* @param conf - Ozone Config | ||
* @return - Log warn interval. | ||
*/ | ||
public static int getLogWarnInterval(Configuration conf) { | ||
return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT, | ||
OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT); | ||
} | ||
|
||
/** | ||
* returns the Container port. | ||
* @param conf - Conf | ||
* @return port number. | ||
*/ | ||
public static int getContainerPort(Configuration conf) { | ||
return conf.getInt(ScmConfigKeys.DFS_CONTAINER_IPC_PORT, ScmConfigKeys | ||
.DFS_CONTAINER_IPC_PORT_DEFAULT); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,8 +43,8 @@ public final class OzoneConfigKeys { | |
"ozone.trace.enabled"; | ||
public static final boolean OZONE_TRACE_ENABLED_DEFAULT = false; | ||
|
||
public static final String OZONE_METADATA_DIRS = | ||
"ozone.metadata.dirs"; | ||
public static final String OZONE_CONTAINER_METADATA_DIRS = | ||
"ozone.container.metadata.dirs"; | ||
|
||
public static final String OZONE_KEY_CACHE = "ozone.key.cache.size"; | ||
public static final int OZONE_KEY_CACHE_DEFAULT = 1024; | ||
|
@@ -94,6 +94,54 @@ public final class OzoneConfigKeys { | |
public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT = | ||
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L; | ||
|
||
public static final String OZONE_SCM_CONTAINER_THREADS = | ||
"ozone.scm.container.threads"; | ||
public static final int OZONE_SCM_CONTAINER_THREADS_DEFAULT = | ||
Runtime.getRuntime().availableProcessors() * 2; | ||
|
||
public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS = | ||
"ozone.scm.heartbeat.rpc-timeout.ms"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest using time duration property with suffix and remove the .ms from the configuration key. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
public static final int OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS_DEFAULT = | ||
100; | ||
|
||
/** | ||
* Defines how frequently we will log the missing of heartbeat to a specific | ||
* SCM. In the default case we will write a warning message for each 10 | ||
* sequential heart beats that we miss to a specific SCM. This is to avoid | ||
* overrunning the log with lots of HB missed Log statements. | ||
*/ | ||
public static final String OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT = | ||
"ozone.scm.heartbeat.log.warn.interval.count"; | ||
public static final int OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT = | ||
10; | ||
|
||
public static final String OZONE_CONTAINER_TASK_WAIT = | ||
"ozone.container.task.wait.seconds"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest using time duration property with suffix and remove the .seconds from the configuration key. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This key has been removed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean remove the .seconds from the key name. |
||
public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5; | ||
|
||
|
||
// ozone.scm.names key is a set of DNS | DNS:PORT | IP Address | IP:PORT. | ||
// Written as a comma separated string. e.g. scm1, scm2:8020, 7.7.7.7:7777 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have unit test for ozone.scm.names like "scm1, scm2:8020, 7.7.7.7:7777" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will post another patch this negative test cases . I have filed HDFS-11137 to track this issue. |
||
// | ||
// If this key is not specified datanodes will not be able to find | ||
// SCM. The SCM membership can be dynamic, so this key should contain | ||
// all possible SCM names. Once the SCM leader is discovered datanodes will | ||
// get the right list of SCMs to heartbeat to from the leader. | ||
// While it is good for the datanodes to know the names of all SCM nodes, | ||
// it is sufficient to actually know the name of on working SCM. That SCM | ||
// will be able to return the information about other SCMs that are part of | ||
// the SCM replicated Log. | ||
// | ||
//In case of a membership change, any one of the SCM machines will be | ||
// able to send back a new list to the datanodes. | ||
public static final String OZONE_SCM_NAMES = "ozone.scm.names"; | ||
|
||
public static final int OZONE_SCM_DEFAULT_PORT = 9862; | ||
// Full path the location where datanode ID is to written to. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: Maybe we should say "file name" instead of "Full path" here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
// if this value is not set then container startup will fail. | ||
public static final String OZONE_SCM_DATANODE_ID = "ozone.scm.datanode.id"; | ||
|
||
|
||
|
||
/** | ||
* There is no need to instantiate this class. | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc for the pos parameter is missing. Suggest document pos (0/1) usage or use a Enum like HostComponentType for safer usage as we don't have the validation of pos inside getHostComponent().
pos(0) -> host
pos(1) -> port
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. Thanks for the comment. I have replaced the parsing code with HostAndPort class. So this code has becomes simpler and I have updated the Javadocs. So we don't need the pos parameter any longer.