Skip to content

Commit be320f5

Browse files
authored
HADOOP-19612 Add RPC header for access token (apache#7803)
Add a new auth header to the rpc header proto for access token support. This should support different access tokens within the same connection. Contributed-by: Tom McCormick <tmccormi@linkedin.com>
1 parent b5cfc7c commit be320f5

File tree

6 files changed

+265
-41
lines changed

6 files changed

+265
-41
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 67 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
import org.apache.hadoop.thirdparty.protobuf.Message;
140140
import org.slf4j.Logger;
141141
import org.slf4j.LoggerFactory;
142+
import org.apache.hadoop.security.AuthorizationContext;
142143

143144
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
144145
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -1004,6 +1005,7 @@ public static class Call implements Schedulable,
10041005
final byte[] clientId;
10051006
private final Span span; // the trace span on the server side
10061007
private final CallerContext callerContext; // the call context
1008+
private final byte[] authHeader; // the auth header
10071009
private boolean deferredResponse = false;
10081010
private int priorityLevel;
10091011
// the priority level assigned by scheduler, 0 by default
@@ -1035,6 +1037,11 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2,
10351037

10361038
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId,
10371039
Span span, CallerContext callerContext) {
1040+
this(id, retryCount, kind, clientId, span, callerContext, null);
1041+
}
1042+
1043+
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId,
1044+
Span span, CallerContext callerContext, byte[] authHeader) {
10381045
this.callId = id;
10391046
this.retryCount = retryCount;
10401047
this.timestampNanos = Time.monotonicNowNanos();
@@ -1043,6 +1050,7 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2,
10431050
this.clientId = clientId;
10441051
this.span = span;
10451052
this.callerContext = callerContext;
1053+
this.authHeader = authHeader;
10461054
this.clientStateId = Long.MIN_VALUE;
10471055
this.isCallCoordinated = false;
10481056
}
@@ -1243,7 +1251,14 @@ private class RpcCall extends Call {
12431251
RpcCall(Connection connection, int id, int retryCount,
12441252
Writable param, RPC.RpcKind kind, byte[] clientId,
12451253
Span span, CallerContext context) {
1246-
super(id, retryCount, kind, clientId, span, context);
1254+
this(connection, id, retryCount, param, kind, clientId,
1255+
span, context, new byte[0]);
1256+
}
1257+
1258+
RpcCall(Connection connection, int id, int retryCount,
1259+
Writable param, RPC.RpcKind kind, byte[] clientId,
1260+
Span span, CallerContext context, byte[] authHeader) {
1261+
super(id, retryCount, kind, clientId, span, context, authHeader);
12471262
this.connection = connection;
12481263
this.rpcRequest = param;
12491264
}
@@ -2975,51 +2990,61 @@ private void processRpcRequest(RpcRequestHeaderProto header,
29752990
.build();
29762991
}
29772992

2978-
RpcCall call = new RpcCall(this, header.getCallId(),
2979-
header.getRetryCount(), rpcRequest,
2980-
ProtoUtil.convert(header.getRpcKind()),
2981-
header.getClientId().toByteArray(), span, callerContext);
2982-
2983-
// Save the priority level assignment by the scheduler
2984-
call.setPriorityLevel(callQueue.getPriorityLevel(call));
2985-
call.markCallCoordinated(false);
2986-
if(alignmentContext != null && call.rpcRequest != null &&
2987-
(call.rpcRequest instanceof ProtobufRpcEngine2.RpcProtobufRequest)) {
2988-
// if call.rpcRequest is not RpcProtobufRequest, will skip the following
2989-
// step and treat the call as uncoordinated. As currently only certain
2990-
// ClientProtocol methods request made through RPC protobuf needs to be
2991-
// coordinated.
2992-
String methodName;
2993-
String protoName;
2994-
ProtobufRpcEngine2.RpcProtobufRequest req =
2995-
(ProtobufRpcEngine2.RpcProtobufRequest) call.rpcRequest;
2996-
try {
2997-
methodName = req.getRequestHeader().getMethodName();
2998-
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
2999-
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
3000-
call.markCallCoordinated(true);
3001-
long stateId;
3002-
stateId = alignmentContext.receiveRequestState(
3003-
header, getMaxIdleTime());
3004-
call.setClientStateId(stateId);
3005-
if (header.hasRouterFederatedState()) {
3006-
call.setFederatedNamespaceState(header.getRouterFederatedState());
2993+
// Set AuthorizationContext for this thread if present
2994+
byte[] authHeader = null;
2995+
try {
2996+
if (header.hasAuthorizationHeader()) {
2997+
authHeader = header.getAuthorizationHeader().toByteArray();
2998+
}
2999+
3000+
RpcCall call = new RpcCall(this, header.getCallId(),
3001+
header.getRetryCount(), rpcRequest,
3002+
ProtoUtil.convert(header.getRpcKind()),
3003+
header.getClientId().toByteArray(), span, callerContext, authHeader);
3004+
3005+
// Save the priority level assignment by the scheduler
3006+
call.setPriorityLevel(callQueue.getPriorityLevel(call));
3007+
call.markCallCoordinated(false);
3008+
if (alignmentContext != null && call.rpcRequest != null &&
3009+
(call.rpcRequest instanceof ProtobufRpcEngine2.RpcProtobufRequest)) {
3010+
// if call.rpcRequest is not RpcProtobufRequest, will skip the following
3011+
// step and treat the call as uncoordinated. As currently only certain
3012+
// ClientProtocol methods request made through RPC protobuf needs to be
3013+
// coordinated.
3014+
String methodName;
3015+
String protoName;
3016+
ProtobufRpcEngine2.RpcProtobufRequest req =
3017+
(ProtobufRpcEngine2.RpcProtobufRequest) call.rpcRequest;
3018+
try {
3019+
methodName = req.getRequestHeader().getMethodName();
3020+
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
3021+
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
3022+
call.markCallCoordinated(true);
3023+
long stateId;
3024+
stateId = alignmentContext.receiveRequestState(
3025+
header, getMaxIdleTime());
3026+
call.setClientStateId(stateId);
3027+
if (header.hasRouterFederatedState()) {
3028+
call.setFederatedNamespaceState(header.getRouterFederatedState());
3029+
}
30073030
}
3031+
} catch (IOException ioe) {
3032+
throw new RpcServerException("Processing RPC request caught ", ioe);
30083033
}
3009-
} catch (IOException ioe) {
3010-
throw new RpcServerException("Processing RPC request caught ", ioe);
30113034
}
3012-
}
30133035

3014-
try {
3015-
internalQueueCall(call);
3016-
} catch (RpcServerException rse) {
3017-
throw rse;
3018-
} catch (IOException ioe) {
3019-
throw new FatalRpcServerException(
3020-
RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
3036+
try {
3037+
internalQueueCall(call);
3038+
} catch (RpcServerException rse) {
3039+
throw rse;
3040+
} catch (IOException ioe) {
3041+
throw new FatalRpcServerException(
3042+
RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
3043+
}
3044+
incRpcCount(); // Increment the rpc count
3045+
} finally {
3046+
AuthorizationContext.clear();
30213047
}
3022-
incRpcCount(); // Increment the rpc count
30233048
}
30243049

30253050
/**
@@ -3245,6 +3270,7 @@ public void run() {
32453270
}
32463271
// always update the current call context
32473272
CallerContext.setCurrent(call.callerContext);
3273+
AuthorizationContext.setCurrentAuthorizationHeader(call.authHeader);
32483274
UserGroupInformation remoteUser = call.getRemoteUser();
32493275
connDropped = !call.isOpen();
32503276
if (remoteUser != null) {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.security;
19+
20+
/**
21+
* Utility for managing a thread-local authorization header for RPC calls.
22+
*/
23+
public final class AuthorizationContext {
24+
private static final ThreadLocal<byte[]> AUTH_HEADER = new ThreadLocal<>();
25+
26+
private AuthorizationContext() {}
27+
28+
public static void setCurrentAuthorizationHeader(byte[] header) {
29+
AUTH_HEADER.set(header);
30+
}
31+
32+
public static byte[] getCurrentAuthorizationHeader() {
33+
return AUTH_HEADER.get();
34+
}
35+
36+
public static void clear() {
37+
AUTH_HEADER.remove();
38+
}
39+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.tracing.Span;
3333
import org.apache.hadoop.tracing.Tracer;
3434
import org.apache.hadoop.tracing.TraceUtils;
35+
import org.apache.hadoop.security.AuthorizationContext;
3536

3637
import org.apache.hadoop.thirdparty.protobuf.ByteString;
3738

@@ -203,6 +204,12 @@ public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
203204
result.setCallerContext(contextBuilder);
204205
}
205206

207+
// Add authorization header if present
208+
byte[] authzHeader = AuthorizationContext.getCurrentAuthorizationHeader();
209+
if (authzHeader != null) {
210+
result.setAuthorizationHeader(ByteString.copyFrom(authzHeader));
211+
}
212+
206213
// Add alignment context if it is not null
207214
if (alignmentContext != null) {
208215
alignmentContext.updateRequestState(result);

hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
9595
// The client should not interpret these bytes, but only forward bytes
9696
// received from RpcResponseHeaderProto.routerFederatedState.
9797
optional bytes routerFederatedState = 9;
98+
// Authorization header for passing opaque credentials or tokens
99+
optional bytes authorizationHeader = 10;
98100
}
99101

100102

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.security;
19+
20+
import org.junit.jupiter.api.Assertions;
21+
import org.junit.jupiter.api.Test;
22+
23+
public class TestAuthorizationContext {
24+
25+
@Test
26+
public void testSetAndGetAuthorizationHeader() {
27+
byte[] header = "my-auth-header".getBytes();
28+
AuthorizationContext.setCurrentAuthorizationHeader(header);
29+
Assertions.assertArrayEquals(header, AuthorizationContext.getCurrentAuthorizationHeader());
30+
AuthorizationContext.clear();
31+
}
32+
33+
@Test
34+
public void testClearAuthorizationHeader() {
35+
byte[] header = "clear-me".getBytes();
36+
AuthorizationContext.setCurrentAuthorizationHeader(header);
37+
AuthorizationContext.clear();
38+
Assertions.assertNull(AuthorizationContext.getCurrentAuthorizationHeader());
39+
}
40+
41+
@Test
42+
public void testThreadLocalIsolation() throws Exception {
43+
byte[] mainHeader = "main-thread".getBytes();
44+
AuthorizationContext.setCurrentAuthorizationHeader(mainHeader);
45+
Thread t = new Thread(() -> {
46+
Assertions.assertNull(AuthorizationContext.getCurrentAuthorizationHeader());
47+
byte[] threadHeader = "other-thread".getBytes();
48+
AuthorizationContext.setCurrentAuthorizationHeader(threadHeader);
49+
Assertions.assertArrayEquals(threadHeader, AuthorizationContext.getCurrentAuthorizationHeader());
50+
AuthorizationContext.clear();
51+
Assertions.assertNull(AuthorizationContext.getCurrentAuthorizationHeader());
52+
});
53+
t.start();
54+
t.join();
55+
// Main thread should still have its header
56+
Assertions.assertArrayEquals(mainHeader, AuthorizationContext.getCurrentAuthorizationHeader());
57+
AuthorizationContext.clear();
58+
}
59+
60+
@Test
61+
public void testNullAndEmptyHeader() {
62+
AuthorizationContext.setCurrentAuthorizationHeader(null);
63+
Assertions.assertNull(AuthorizationContext.getCurrentAuthorizationHeader());
64+
byte[] empty = new byte[0];
65+
AuthorizationContext.setCurrentAuthorizationHeader(empty);
66+
Assertions.assertArrayEquals(empty, AuthorizationContext.getCurrentAuthorizationHeader());
67+
AuthorizationContext.clear();
68+
}
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.namenode;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.FileSystem;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.fs.FileStatus;
24+
import org.apache.hadoop.hdfs.HdfsConfiguration;
25+
import org.apache.hadoop.hdfs.MiniDFSCluster;
26+
import org.apache.hadoop.security.AuthorizationContext;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.net.InetAddress;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.List;
33+
34+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
35+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
36+
import static org.junit.jupiter.api.Assertions.assertNull;
37+
38+
public class TestAuthorizationHeaderPropagation {
39+
40+
public static class HeaderCapturingAuditLogger implements AuditLogger {
41+
public static final List<byte[]> capturedHeaders = new ArrayList<>();
42+
@Override
43+
public void initialize(Configuration conf) {}
44+
@Override
45+
public void logAuditEvent(boolean succeeded, String userName, InetAddress addr,
46+
String cmd, String src, String dst, FileStatus stat) {
47+
byte[] header = AuthorizationContext.getCurrentAuthorizationHeader();
48+
capturedHeaders.add(header == null ? null : Arrays.copyOf(header, header.length));
49+
}
50+
}
51+
52+
@Test
53+
public void testAuthorizationHeaderPerRpc() throws Exception {
54+
Configuration conf = new HdfsConfiguration();
55+
conf.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY, HeaderCapturingAuditLogger.class.getName());
56+
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
57+
try {
58+
cluster.waitClusterUp();
59+
HeaderCapturingAuditLogger.capturedHeaders.clear();
60+
FileSystem fs = cluster.getFileSystem();
61+
// First RPC with header1
62+
byte[] header1 = "header-one".getBytes();
63+
AuthorizationContext.setCurrentAuthorizationHeader(header1);
64+
fs.mkdirs(new Path("/authz1"));
65+
AuthorizationContext.clear();
66+
// Second RPC with header2
67+
byte[] header2 = "header-two".getBytes();
68+
AuthorizationContext.setCurrentAuthorizationHeader(header2);
69+
fs.mkdirs(new Path("/authz2"));
70+
AuthorizationContext.clear();
71+
// Third RPC with no header
72+
fs.mkdirs(new Path("/authz3"));
73+
// Now assert
74+
assertArrayEquals(header1, HeaderCapturingAuditLogger.capturedHeaders.get(0));
75+
assertArrayEquals(header2, HeaderCapturingAuditLogger.capturedHeaders.get(1));
76+
assertNull(HeaderCapturingAuditLogger.capturedHeaders.get(2));
77+
} finally {
78+
cluster.shutdown();
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)