Ozone:SCM: Add support for registerNode in datanode#151
Ozone:SCM: Add support for registerNode in datanode#151anuengineer wants to merge 1 commit intoapache:HDFS-7240from anuengineer:HDFS-7240
Conversation
| /** | ||
| * Returns a hostname from the hostname:port or hostname. | ||
| * @param value | ||
| * @return |
There was a problem hiding this comment.
Javadoc for the pos parameter is missing. Suggest document pos (0/1) usage or use a Enum like HostComponentType for safer usage as we don't have the validation of pos inside getHostComponent().
pos(0) -> host
pos(1) -> port
There was a problem hiding this comment.
fixed. Thanks for the comment. I have replaced the parsing code with HostAndPort class. So this code has becomes simpler and I have updated the Javadocs. So we don't need the pos parameter any longer.
| } | ||
|
|
||
| public static Optional<String> getHostName(String value) { | ||
| final int hostIndex = 0; |
There was a problem hiding this comment.
This can be a Enum as commented above.
There was a problem hiding this comment.
Replaced this logic with HostAndPort class.
| } | ||
| } | ||
| return getHostPort(value); | ||
| } |
There was a problem hiding this comment.
How do we handle multiple keys but only the last key has a valid value? The new logic will return Optional.absent() in the first loop instead of the last valid value.
There was a problem hiding this comment.
Thanx fixed. The intention is to return the first key with valid hostport, now it checks for the isPresent and returns the value if it is present or loops till the end of the key space.
| */ | ||
| static Optional<Integer> getPortNumberFromConfigKeys( | ||
| public static Optional<Integer> getPortNumberFromConfigKeys( | ||
| Configuration conf, String ... keys) { |
There was a problem hiding this comment.
Can we add complete java docs for the parameters after make this public?
| */ | ||
| public static int getScmRpcTimeOutInMilliseconds(Configuration conf) { | ||
| return conf.getInt(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS, | ||
| OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS_DEFAULT); |
There was a problem hiding this comment.
I would suggest we use Configuration#getTimeDuration() instead of Configuration#getInt() for durations. This will reduce the documentation cost and chance of mis-configuration in deployment. An sample usage can be found in datanode for DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY.
| Runtime.getRuntime().availableProcessors() * 2; | ||
|
|
||
| public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS = | ||
| "ozone.scm.heartbeat.rpc-timeout.ms"; |
There was a problem hiding this comment.
Suggest using time duration property with suffix and remove the .ms from the configuration key.
| 10; | ||
|
|
||
| public static final String OZONE_CONTAINER_TASK_WAIT = | ||
| "ozone.container.task.wait.seconds"; |
There was a problem hiding this comment.
Suggest using time duration property with suffix and remove the .seconds from the configuration key.
There was a problem hiding this comment.
This key has been removed.
There was a problem hiding this comment.
You mean remove the .seconds from the key name.
|
|
||
|
|
||
| // ozone.scm.names key is a set of DNS | DNS:PORT | IP Address | IP:PORT. | ||
| // Written as a comma separated string. e.g. scm1, scm2:8020, 7.7.7.7:7777 |
There was a problem hiding this comment.
Do we have unit test for ozone.scm.names like "scm1, scm2:8020, 7.7.7.7:7777"
There was a problem hiding this comment.
I will post another patch this negative test cases . I have filed HDFS-11137 to track this issue.
| public static final String OZONE_SCM_NAMES = "ozone.scm.names"; | ||
|
|
||
| public static final int OZONE_SCM_DEFAULT_PORT = 9862; | ||
| // Full path the location where datanode ID is to written to. |
There was a problem hiding this comment.
NIT: Maybe we should say "file name" instead of "Full path" here.
| new ThreadFactoryBuilder().setDaemon(true) | ||
| .setNameFormat("Container State Machine Thread - %d").build()); | ||
| connectionManager = new SCMConnectionManager( | ||
| OzoneClientUtils.getScmRpcTimeOutInMilliseconds(conf), |
There was a problem hiding this comment.
OzoneClientUtils.getScmRpcTimeOutInMilliseconds(conf) can be encapsulated inside SCMConnectionManager constructor as we have passed the conf parameter anyway.
| } | ||
| } catch (InterruptedException e) { | ||
| executorService.shutdownNow(); | ||
| Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Can we add a Log.error() after the Thread.currentThread.interrupt() for troubleshooting of shutdown issues in the future?
| static final Logger | ||
| LOG = LoggerFactory.getLogger(EndpointStateMachine.class); | ||
| private final StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint; | ||
| private final AtomicInteger missedCount; |
There was a problem hiding this comment.
AtomicLong instead of AtomicInteger?
| } | ||
|
|
||
| /** | ||
| * Returns the next logical state that endPoint should move to. |
There was a problem hiding this comment.
Can you document the same assumption that the state is transited by value + 1.
| private final Configuration conf; | ||
|
|
||
|
|
||
| public SCMConnectionManager(int rpcTimeout, Configuration conf) { |
There was a problem hiding this comment.
rpcTimeout parameter can be removed
| EndpointStateMachine endPoint = | ||
| new EndpointStateMachine(address, rpcClient, conf); | ||
| scmMachines.put(address, endPoint); | ||
|
|
| * if you need to execute a task without any concurrent execution, please | ||
| * return a single task in this list. | ||
| * | ||
| * @return List of Callables |
There was a problem hiding this comment.
I don't think we return a List here. Can you clarify here as well as "Returns a lit of tasks that..;"
|
|
||
| Container has the following states. | ||
|
|
||
| Start - > getVersion -> Register -> Running +-> Upgrade -> Shutdown |
There was a problem hiding this comment.
Should we update the states document here without upgrade/decommission state since they are not implemented yet.
| import java.util.concurrent.TimeoutException; | ||
|
|
||
| /** | ||
| * Init Container Task is the task that gets run when we are in Init State. |
There was a problem hiding this comment.
javadoc needs update? init container task-> init datanode state
| OzoneConfigKeys.OZONE_SCM_DATANODE_ID); | ||
|
|
||
| // This is an unrecoverable error. | ||
| this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); |
There was a problem hiding this comment.
Should we return null after mark DatanodeStates.SHUTDOWN instead of continue attempt to read the container ID?
| try { | ||
| nodeID = readPersistedDatanodeID(Paths.get(dataNodeIDPath)); | ||
| if (nodeID != null) { | ||
| return nodeID; |
There was a problem hiding this comment.
Add a LOG.trace() for containerID read from file successfully.
| // info to SCM. | ||
| try { | ||
| nodeID = createNewContainerID(Paths.get(dataNodeIDPath)); | ||
| return nodeID; |
There was a problem hiding this comment.
Add a trace for ContainerID created?
| * Computes the next state the container state machine must move to by looking | ||
| * at all the state of endpoints. | ||
| * <p> | ||
| * if any endpoint state has moved to Register state, then the Container State |
There was a problem hiding this comment.
"if any endpoint state has moved to Register state,...", this seems not being enforced in the code below.
| return new VersionEndpointTask(endpoint, conf); | ||
| case REGISTER: | ||
| RegisterEndpointTask task = new RegisterEndpointTask(endpoint, conf); | ||
| task.setContainerNodeIDProto(getContainerNodeID()); |
There was a problem hiding this comment.
NIT: maybe use a builder pattern to avoid creating RegisterEndpointTask without ContainerNodeID.
| * | ||
| * @return ContainerNodeIDProto | ||
| */ | ||
| public ContainerNodeIDProto getContainerNodeIDProto() { |
There was a problem hiding this comment.
Builder pattern as suggested earlier.
| * @return ContainerNodeIDProto | ||
| */ | ||
| public ContainerNodeIDProto getContainerNodeIDProto() { | ||
| return containerNodeIDProto; |
| if (getContainerNodeIDProto() == null) { | ||
| LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " + | ||
| "shutting down the endpoint."); | ||
| rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); |
There was a problem hiding this comment.
NIT: Consider returning the new state from rpcEndPoint.setState so that we can consolidate logic here?
| rpcEndpoint.zeroMissedCount(); | ||
| } catch (IOException ex) { | ||
| rpcEndpoint.logIfNeeded(ex, | ||
| OzoneClientUtils.getScmHeartbeatInterval(this.conf)); |
There was a problem hiding this comment.
The second parameter can be removed as ScmHeartbeatInterval is available within rpcEndpoint.logIfNeeded() with RpcEndpoint#conf.
| import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService; | ||
|
|
||
| /** | ||
| * Protocol used from an HDFS node to StorageContainerManager. This extends the |
There was a problem hiding this comment.
Can you elaborate the comment on "HDFS node"?
| datanodeID); | ||
| nodes.put(newDatanodeID.getDatanodeUuid(), newDatanodeID); | ||
|
|
||
| nodes.put(datanodeID.getDatanodeUuid(), datanodeID); |
There was a problem hiding this comment.
How do we handle the case when two datanodes attempt to register with the same datanodeID? I don't think we should allow both succeed without warning or error.
No description provided.