19
19
20
20
import static org .apache .hadoop .util .curator .ZKCuratorManager .getNodePath ;
21
21
22
+ import java .io .ByteArrayOutputStream ;
23
+ import java .io .DataOutputStream ;
22
24
import java .io .IOException ;
23
25
import java .util .ArrayList ;
24
26
import java .util .Calendar ;
25
27
import java .util .List ;
26
28
import java .util .TimeZone ;
27
29
import java .util .Comparator ;
28
30
import java .util .stream .Collectors ;
31
+ import java .util .Map ;
32
+ import java .util .HashMap ;
33
+ import java .nio .ByteBuffer ;
29
34
30
35
import org .apache .commons .lang3 .NotImplementedException ;
31
36
import org .apache .hadoop .classification .VisibleForTesting ;
32
37
import org .apache .hadoop .conf .Configuration ;
38
+ import org .apache .hadoop .security .token .delegation .DelegationKey ;
33
39
import org .apache .hadoop .util .curator .ZKCuratorManager ;
34
40
import org .apache .hadoop .yarn .api .records .ApplicationId ;
35
41
import org .apache .hadoop .yarn .conf .YarnConfiguration ;
84
90
import org .apache .hadoop .yarn .server .federation .store .records .RouterMasterKeyRequest ;
85
91
import org .apache .hadoop .yarn .server .federation .store .records .RouterRMTokenResponse ;
86
92
import org .apache .hadoop .yarn .server .federation .store .records .RouterRMTokenRequest ;
93
+ import org .apache .hadoop .yarn .server .federation .store .records .RouterMasterKey ;
87
94
import org .apache .hadoop .yarn .server .federation .store .records .impl .pb .SubClusterIdPBImpl ;
88
95
import org .apache .hadoop .yarn .server .federation .store .records .impl .pb .SubClusterInfoPBImpl ;
89
96
import org .apache .hadoop .yarn .server .federation .store .records .impl .pb .SubClusterPolicyConfigurationPBImpl ;
96
103
import org .apache .hadoop .yarn .api .records .ReservationId ;
97
104
import org .apache .hadoop .yarn .util .Clock ;
98
105
import org .apache .hadoop .yarn .util .SystemClock ;
106
+ import org .apache .zookeeper .KeeperException ;
99
107
import org .apache .zookeeper .data .ACL ;
100
108
import org .slf4j .Logger ;
101
109
import org .slf4j .LoggerFactory ;
@@ -131,6 +139,12 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
131
139
private final static String ROOT_ZNODE_NAME_APPLICATION = "applications" ;
132
140
private final static String ROOT_ZNODE_NAME_POLICY = "policies" ;
133
141
private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation" ;
142
+ private final static String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot" ;
143
+ private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = "RMDTMasterKeysRoot" ;
144
+ private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot" ;
145
+ private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = "RMDTSequentialNumber" ;
146
+ private static final String DELEGATION_KEY_PREFIX = "DelegationKey_" ;
147
+ private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_" ;
134
148
135
149
/** Interface to Zookeeper. */
136
150
private ZKCuratorManager zkManager ;
@@ -143,6 +157,14 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
143
157
private String policiesZNode ;
144
158
private String reservationsZNode ;
145
159
private int maxAppsInStateStore ;
160
+ private int delegationTokenNodeSplitIndex = 0 ;
161
+
162
+ /** **/
163
+ private String rmDTSecretManagerRoot ;
164
+ private String dtMasterKeysRootPath ;
165
+ private String delegationTokensRootPath ;
166
+ private String dtSequenceNumberPath ;
167
+ private Map <Integer , String > rmDelegationTokenHierarchies ;
146
168
147
169
private volatile Clock clock = SystemClock .getInstance ();
148
170
@@ -173,6 +195,33 @@ public void init(Configuration conf) throws YarnException {
173
195
appsZNode = getNodePath (baseZNode , ROOT_ZNODE_NAME_APPLICATION );
174
196
policiesZNode = getNodePath (baseZNode , ROOT_ZNODE_NAME_POLICY );
175
197
reservationsZNode = getNodePath (baseZNode , ROOT_ZNODE_NAME_RESERVATION );
198
+ rmDTSecretManagerRoot = getNodePath (baseZNode , RM_DT_SECRET_MANAGER_ROOT );
199
+ dtMasterKeysRootPath = getNodePath (rmDTSecretManagerRoot ,
200
+ RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME );
201
+ delegationTokensRootPath = getNodePath (rmDTSecretManagerRoot ,
202
+ RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME );
203
+ rmDelegationTokenHierarchies = new HashMap <>(5 );
204
+ rmDelegationTokenHierarchies .put (0 , delegationTokensRootPath );
205
+ for (int splitIndex = 1 ; splitIndex <= 4 ; splitIndex ++) {
206
+ rmDelegationTokenHierarchies .put (splitIndex ,
207
+ getNodePath (delegationTokensRootPath , Integer .toString (splitIndex )));
208
+ }
209
+ dtSequenceNumberPath = getNodePath (rmDTSecretManagerRoot ,
210
+ RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME );
211
+
212
+ delegationTokenNodeSplitIndex =
213
+ conf .getInt (YarnConfiguration .ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX ,
214
+ YarnConfiguration .DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX );
215
+
216
+ if (delegationTokenNodeSplitIndex < 0
217
+ || delegationTokenNodeSplitIndex > 4 ) {
218
+ LOG .info ("Invalid value " + delegationTokenNodeSplitIndex + " for config "
219
+ + YarnConfiguration .ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX
220
+ + " specified. Resetting it to " +
221
+ YarnConfiguration .DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX );
222
+ delegationTokenNodeSplitIndex =
223
+ YarnConfiguration .DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX ;
224
+ }
176
225
177
226
// Create base znode for each entity
178
227
try {
@@ -181,6 +230,14 @@ public void init(Configuration conf) throws YarnException {
181
230
zkManager .createRootDirRecursively (appsZNode , zkAcl );
182
231
zkManager .createRootDirRecursively (policiesZNode , zkAcl );
183
232
zkManager .createRootDirRecursively (reservationsZNode , zkAcl );
233
+ zkManager .createRootDirRecursively (rmDTSecretManagerRoot , zkAcl );
234
+ zkManager .createRootDirRecursively (dtMasterKeysRootPath , zkAcl );
235
+ zkManager .createRootDirRecursively (delegationTokensRootPath , zkAcl );
236
+ for (int splitIndex = 1 ; splitIndex <= 4 ; splitIndex ++) {
237
+ zkManager .createRootDirRecursively (rmDelegationTokenHierarchies .get (splitIndex ), zkAcl );
238
+ }
239
+ zkManager .createRootDirRecursively (dtSequenceNumberPath , zkAcl );
240
+
184
241
} catch (Exception e ) {
185
242
String errMsg = "Cannot create base directories: " + e .getMessage ();
186
243
FederationStateStoreUtils .logAndThrowStoreException (LOG , errMsg );
@@ -887,15 +944,35 @@ public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
887
944
}
888
945
889
946
@ Override
890
- public RouterMasterKeyResponse storeNewMasterKey (RouterMasterKeyRequest request )
947
+ public synchronized RouterMasterKeyResponse storeNewMasterKey (RouterMasterKeyRequest request )
891
948
throws YarnException , IOException {
892
- throw new NotImplementedException ("Code is not implemented" );
949
+ RouterMasterKey masterKey = request .getRouterMasterKey ();
950
+ DelegationKey delegationKey = getDelegationKeyByMasterKey (masterKey );
951
+ String nodeCreatePath = getNodePath (dtMasterKeysRootPath ,
952
+ DELEGATION_KEY_PREFIX + delegationKey .getKeyId ());
953
+ LOG .debug ("Storing RMDelegationKey_{}" , delegationKey .getKeyId ());
954
+ ByteArrayOutputStream os = new ByteArrayOutputStream ();
955
+ try (DataOutputStream fsOut = new DataOutputStream (os )) {
956
+ delegationKey .write (fsOut );
957
+ put (nodeCreatePath , os .toByteArray (), false );
958
+ }
959
+ return RouterMasterKeyResponse .newInstance (masterKey );
893
960
}
894
961
895
962
@ Override
896
963
public RouterMasterKeyResponse removeStoredMasterKey (RouterMasterKeyRequest request )
897
964
throws YarnException , IOException {
898
- throw new NotImplementedException ("Code is not implemented" );
965
+ RouterMasterKey masterKey = request .getRouterMasterKey ();
966
+ DelegationKey delegationKey = getDelegationKeyByMasterKey (masterKey );
967
+ String nodeRemovePath = getNodePath (dtMasterKeysRootPath , DELEGATION_KEY_PREFIX
968
+ + delegationKey .getKeyId ());
969
+ LOG .debug ("Removing RMDelegationKey_{}" , delegationKey .getKeyId ());
970
+ try {
971
+ zkManager .delete (nodeRemovePath );
972
+ }catch (Exception e ){
973
+ throw new YarnException (e );
974
+ }
975
+ return RouterMasterKeyResponse .newInstance (masterKey );
899
976
}
900
977
901
978
@ Override
@@ -927,4 +1004,53 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
927
1004
throws YarnException , IOException {
928
1005
throw new NotImplementedException ("Code is not implemented" );
929
1006
}
1007
+
1008
+ private static DelegationKey getDelegationKeyByMasterKey (RouterMasterKey masterKey ) {
1009
+ ByteBuffer keyByteBuf = masterKey .getKeyBytes ();
1010
+ byte [] keyBytes = new byte [keyByteBuf .remaining ()];
1011
+ keyByteBuf .get (keyBytes );
1012
+ return new DelegationKey (masterKey .getKeyId (), masterKey .getExpiryDate (), keyBytes );
1013
+ }
1014
+
1015
+ private String getLeafDelegationTokenNodePath (int rmDTSequenceNumber ,
1016
+ boolean createParentIfNotExists ) throws Exception {
1017
+ return getLeafDelegationTokenNodePath (rmDTSequenceNumber ,
1018
+ createParentIfNotExists , delegationTokenNodeSplitIndex );
1019
+ }
1020
+
1021
+ private String getLeafDelegationTokenNodePath (int rmDTSequenceNumber ,
1022
+ boolean createParentIfNotExists , int split ) throws Exception {
1023
+ String nodeName = DELEGATION_TOKEN_PREFIX ;
1024
+ if (split == 0 ) {
1025
+ nodeName += rmDTSequenceNumber ;
1026
+ } else {
1027
+ nodeName += String .format ("%04d" , rmDTSequenceNumber );
1028
+ }
1029
+ return getLeafZnodePath (nodeName , rmDelegationTokenHierarchies .get (split ),
1030
+ split , createParentIfNotExists );
1031
+ }
1032
+
1033
+ private String getLeafZnodePath (String nodeName , String rootNode ,
1034
+ int splitIdx , boolean createParentIfNotExists ) throws Exception {
1035
+ if (splitIdx == 0 ) {
1036
+ return getNodePath (rootNode , nodeName );
1037
+ }
1038
+ int split = nodeName .length () - splitIdx ;
1039
+ String rootNodePath =
1040
+ getNodePath (rootNode , nodeName .substring (0 , split ));
1041
+ if (createParentIfNotExists && !exists (rootNodePath )) {
1042
+ try {
1043
+ zkManager .create (rootNodePath );
1044
+ } catch (KeeperException .NodeExistsException e ) {
1045
+ LOG .debug ("Unable to create app parent node {} as it already exists." ,
1046
+ rootNodePath );
1047
+ }
1048
+ }
1049
+ return getNodePath (rootNodePath , nodeName .substring (split ));
1050
+ }
1051
+
1052
+ @ VisibleForTesting
1053
+ boolean exists (final String path ) throws Exception {
1054
+ return zkManager .exists (path );
1055
+ }
930
1056
}
0 commit comments