Skip to content

Commit

Permalink
YARN-9708. Yarn Router Support DelegationToken.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Aug 14, 2022
1 parent d383cc4 commit 72d9d5d
Show file tree
Hide file tree
Showing 26 changed files with 1,487 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@
<additionalProtoPathElement>
${basedir}/../../hadoop-yarn-api/src/main/proto
</additionalProtoPathElement>
<additionalProtoPathElement>
${basedir}/../../hadoop-yarn-common/src/main/proto
</additionalProtoPathElement>
</additionalProtoPathElements>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.records.*;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface FederationDelegationTokenStateStore {

StoreNewMasterKeyResponse storeNewMasterKey(StoreNewMasterKeyRequest request) throws Exception;

RemoveStoredMasterKeyResponse removeStoredMasterKey(RemoveStoredMasterKeyRequest request)
throws Exception;

RouterStoreNewTokenResponse storeNewToken(RouterStoreNewTokenRequest request) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
*/
public interface FederationStateStore extends
FederationApplicationHomeSubClusterStore, FederationMembershipStateStore,
FederationPolicyStore, FederationReservationHomeSubClusterStore {
FederationPolicyStore, FederationReservationHomeSubClusterStore,
FederationDelegationTokenStateStore {

/**
* Initialize the FederationStore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package org.apache.hadoop.yarn.server.federation.store.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
Expand Down Expand Up @@ -66,7 +71,16 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RemoveStoredMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RemoveStoredMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.StoreNewMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.StoreNewMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreNewTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreNewTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
Expand All @@ -86,6 +100,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
private Map<ApplicationId, SubClusterId> applications;
private Map<ReservationId, SubClusterId> reservations;
private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState;

private final MonotonicClock clock = new MonotonicClock();

Expand All @@ -98,6 +113,7 @@ public void init(Configuration conf) {
applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
routerRMSecretManagerState = new RouterRMDTSecretManagerState();
}

@Override
Expand Down Expand Up @@ -365,4 +381,74 @@ public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(

return GetReservationsHomeSubClusterResponse.newInstance(result);
}

@Override
public StoreNewMasterKeyResponse storeNewMasterKey(
StoreNewMasterKeyRequest request) throws Exception {

// Restore the DelegationKey from the request
RouterMasterKey masterKey = request.getRouterMasterKey();
ByteBuffer keyByteBuf = masterKey.getKeyBytes();
byte[] keyBytes = new byte[keyByteBuf.remaining()];
keyByteBuf.get(keyBytes);
DelegationKey delegationKey =
new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes);

Set<DelegationKey> rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState();
if (rmDTMasterKeyState.contains(delegationKey)) {
LOG.info("Error storing info for RMDTMasterKey with keyID: {}.", delegationKey.getKeyId());
throw new IOException("RMDTMasterKey with keyID: " +
delegationKey.getKeyId() + " is already stored");
}

routerRMSecretManagerState.getMasterKeyState().add(delegationKey);
LOG.info("Store Router-RMDT master key with key id: {}. Currently rmDTMasterKeyState size: {}",
delegationKey.getKeyId(), rmDTMasterKeyState.size());

return StoreNewMasterKeyResponse.newInstance(masterKey);
}

@Override
public RemoveStoredMasterKeyResponse removeStoredMasterKey(
RemoveStoredMasterKeyRequest request) throws Exception {

// Restore the DelegationKey from the request
RouterMasterKey masterKey = request.getRouterMasterKey();
ByteBuffer keyByteBuf = masterKey.getKeyBytes();
byte[] keyBytes = new byte[keyByteBuf.remaining()];
keyByteBuf.get(keyBytes);
DelegationKey delegationKey =
new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes);

LOG.info("Remove Router-RMDT master key with key id: {}.", delegationKey.getKeyId());
Set<DelegationKey> rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState();
rmDTMasterKeyState.remove(delegationKey);

return RemoveStoredMasterKeyResponse.newInstance(masterKey);
}

@Override
public RouterStoreNewTokenResponse storeNewToken(
RouterStoreNewTokenRequest request) throws Exception {
RouterStoreToken storeToken = request.getRouterStoreToken();
RMDelegationTokenIdentifier tokenIdentifier = storeToken.getTokenIdentifier();
Long renewDate = storeToken.getRenewDate();
storeOrUpdateRouterRMDT(tokenIdentifier, renewDate, false);
return RouterStoreNewTokenResponse.newInstance(storeToken);
}

private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
Long renewDate, boolean isUpdate) throws Exception {
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();
if (rmDTState.containsKey(rmDTIdentifier)) {
LOG.info("Error storing info for RMDelegationToken: {}.", rmDTIdentifier);
throw new IOException("RMDelegationToken: " + rmDTIdentifier + "is already stored.");
}
rmDTState.put(rmDTIdentifier, renewDate);
if(!isUpdate) {
routerRMSecretManagerState.setDtSequenceNumber(rmDTIdentifier.getSequenceNumber());
}
LOG.info("Store RM-RMDT with sequence number {}.", rmDTIdentifier.getSequenceNumber());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RemoveStoredMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RemoveStoredMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.StoreNewMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.StoreNewMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreNewTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreNewTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
Expand Down Expand Up @@ -1027,4 +1033,22 @@ public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}

@Override
public StoreNewMasterKeyResponse storeNewMasterKey(
StoreNewMasterKeyRequest request) throws Exception{
throw new NotImplementedException("Code is not implemented");
}

@Override
public RemoveStoredMasterKeyResponse removeStoredMasterKey(
RemoveStoredMasterKeyRequest request) throws Exception {
throw new NotImplementedException("Code is not implemented");
}

@Override
public RouterStoreNewTokenResponse storeNewToken(
RouterStoreNewTokenRequest request) throws Exception {
throw new NotImplementedException("Code is not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.federation.store.records.RemoveStoredMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RemoveStoredMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.StoreNewMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.StoreNewMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreNewTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreNewTokenResponse;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
Expand Down Expand Up @@ -662,4 +668,22 @@ public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}

@Override
public StoreNewMasterKeyResponse storeNewMasterKey(
StoreNewMasterKeyRequest request) throws Exception{
throw new NotImplementedException("Code is not implemented");
}

@Override
public RemoveStoredMasterKeyResponse removeStoredMasterKey(
RemoveStoredMasterKeyRequest request) throws Exception {
throw new NotImplementedException("Code is not implemented");
}

@Override
public RouterStoreNewTokenResponse storeNewToken(
RouterStoreNewTokenRequest request) throws Exception {
throw new NotImplementedException("Code is not implemented");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.federation.store.records;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.util.Records;

/**
* The request issued by the client to get a delegation token from
* the {@code ResourceManager}.
* for more information.
*/
@Public
@Stable
public abstract class GetRouterDelegationTokenRequest {

@Public
@Stable
public static GetRouterDelegationTokenRequest newInstance(String renewer) {
GetRouterDelegationTokenRequest request =
Records.newRecord(GetRouterDelegationTokenRequest.class);
request.setRenewer(renewer);
return request;
}

@Public
@Stable
public abstract String getRenewer();

@Public
@Stable
public abstract void setRenewer(String renewer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.federation.store.records;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.util.Records;


/**
* Response to a {@link GetDelegationTokenRequest} request
* from the client. The response contains the token that
* can be used by the containers to talk to ClientRMService.
*
*/
@Public
@Stable
public abstract class GetRouterDelegationTokenResponse {

@Private
@Unstable
public static GetRouterDelegationTokenResponse newInstance(Token rmDTToken) {
GetRouterDelegationTokenResponse response =
Records.newRecord(GetRouterDelegationTokenResponse.class);
response.setRMDelegationToken(rmDTToken);
return response;
}

/**
* The Delegation tokens have a identifier which maps to
* {@link AbstractDelegationTokenIdentifier}.
* @return the delegation tokens
*/
@Public
@Stable
public abstract Token getRMDelegationToken();

@Private
@Unstable
public abstract void setRMDelegationToken(Token rmDTToken);
}
Loading

0 comments on commit 72d9d5d

Please sign in to comment.