Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16708. RBF: Support transmit state id from client in router. #4666

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface AlignmentContext {
void updateResponseState(RpcResponseHeaderProto.Builder header);

/**
* This is the intended client method call to implement to recieve state info
* This is the intended client method call to implement to receive state info
* during RPC response processing.
*
* @param header The RPC response header.
Expand All @@ -73,7 +73,7 @@ public interface AlignmentContext {
* @return state id required for the server to execute the call.
* @throws IOException raised on errors performing I/O.
*/
long receiveRequestState(RpcRequestHeaderProto header, long threshold)
long receiveRequestState(RpcRequestHeaderProto header, long threshold, boolean isCoordinatedCall)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ipc;

public enum NameServiceStateIdMode {
DISABLE("DISABLE"),
TRANSMISSION("TRANSMISSION"),
PROXY("PROXY");

private String name;

NameServiceStateIdMode(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}

public boolean isDisable() {
return this == NameServiceStateIdMode.DISABLE;
}

public boolean isTransmission() {
return this == NameServiceStateIdMode.TRANSMISSION;
}

public boolean isProxy() {
return this == NameServiceStateIdMode.PROXY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.NameserviceStateIdContextProto.NameServiceStateIdModeProto;
import org.apache.hadoop.net.NetUtils;

import org.apache.hadoop.thirdparty.protobuf.RpcController;
Expand Down Expand Up @@ -238,4 +240,29 @@ public static String toTraceName(String fullName) {
return fullName.substring(secondLastPeriod + 1, lastPeriod) + "#" +
fullName.substring(lastPeriod + 1);
}

public static NameServiceStateIdMode toStateIdMode(String mode) {
return NameServiceStateIdMode.valueOf(mode.toUpperCase());
}

public static NameServiceStateIdMode toStateIdMode(RpcRequestHeaderProto proto) {
if (proto.hasNameserviceStateIdsContext()) {
return NameServiceStateIdMode.valueOf(proto.getNameserviceStateIdsContext().getMode().name());
}
return null;
}

public static NameServiceStateIdModeProto toNameServiceStateIdModeProto(
NameServiceStateIdMode mode) {
switch(mode) {
case DISABLE:
return NameServiceStateIdModeProto.DISABLE;
case TRANSMISSION:
return NameServiceStateIdModeProto.TRANSMISSION;
case PROXY:
return NameServiceStateIdModeProto.PROXY;
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ public static class Call implements Schedulable,
private volatile String detailedMetricsName = "";
final int callId; // the client's call id
final int retryCount; // the retry count of the call
long timestampNanos; // time the call was received
private final long timestampNanos; // time the call was received
long responseTimestampNanos; // time the call was served
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
Expand Down Expand Up @@ -1110,6 +1110,18 @@ public void setDeferredResponse(Writable response) {

public void setDeferredError(Throwable t) {
}

public long getTimestampNanos() {
return timestampNanos;
}

public int getCallId() {
return callId;
}

public byte[] getClientId() {
return clientId;
}
}

/** A RPC extended call queued for handling. */
Expand Down Expand Up @@ -1190,8 +1202,7 @@ public Void run() throws Exception {
ResponseParams responseParams = new ResponseParams();

try {
value = call(
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
value = call(rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
} catch (Throwable e) {
populateResponseParamsOnError(e, responseParams);
}
Expand Down Expand Up @@ -2884,11 +2895,10 @@ private void processRpcRequest(RpcRequestHeaderProto header,
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
call.markCallCoordinated(true);
long stateId;
stateId = alignmentContext.receiveRequestState(
header, getMaxIdleTime());
call.setClientStateId(stateId);
}
long stateId = alignmentContext.receiveRequestState(header, getMaxIdleTime(),
call.isCallCoordinated());
call.setClientStateId(stateId);
} catch (IOException ioe) {
throw new RpcServerException("Processing RPC request caught ", ioe);
}
Expand Down
17 changes: 17 additions & 0 deletions hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ message RPCCallerContextProto {
optional bytes signature = 2;
}

message NameserviceStateIdContextProto {
enum NameServiceStateIdModeProto {
DISABLE = 0; // NameserviceStateIdContextProto will be ignored.
TRANSMISSION = 1; // NameserviceStateIdContextProto will transparent transmission by router.
PROXY = 2; // State id is proxy by router, NameserviceStateIdContextProto will be ignore.
}
required NameServiceStateIdModeProto mode = 1 [default = DISABLE];
repeated NameserviceStateIdProto nameserviceStateIds = 2; // Last seen state IDs for multiple nameservices.
}

message NameserviceStateIdProto {
required string nsId = 1;
required int64 stateId = 2;
}

message RpcRequestHeaderProto { // the header for the RpcRequest
enum OperationProto {
RPC_FINAL_PACKET = 0; // The final RPC Packet
Expand All @@ -91,6 +106,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
optional RPCTraceInfoProto traceInfo = 6; // tracing info
optional RPCCallerContextProto callerContext = 7; // call context
optional int64 stateId = 8; // The last seen Global State ID
optional NameserviceStateIdContextProto nameserviceStateIdsContext = 9;
}


Expand Down Expand Up @@ -157,6 +173,7 @@ message RpcResponseHeaderProto {
optional bytes clientId = 7; // Globally unique client ID
optional sint32 retryCount = 8 [default = -1];
optional int64 stateId = 9; // The last written Global State ID
optional NameserviceStateIdContextProto nameserviceStateIdsContext = 10;
}

message RpcSaslProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.NameServiceStateIdMode;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;

Expand All @@ -37,18 +38,28 @@
@InterfaceStability.Evolving
public class ClientGSIContext implements AlignmentContext {

private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE);
public final static String DEFAULT_NS = "";
private final FederatedNamespaceIds federatedNamespaceIds;
private final String nsId;

public ClientGSIContext(NameServiceStateIdMode mode) {
this(mode, DEFAULT_NS);
}

public ClientGSIContext(NameServiceStateIdMode mode, String nsId) {
this.federatedNamespaceIds = new FederatedNamespaceIds(mode);
this.nsId = nsId;
}


@Override
public long getLastSeenStateId() {
return lastSeenStateId.get();
return federatedNamespaceIds.getNamespaceId(nsId, true).get();
}

@Override
public boolean isCoordinatedCall(String protocolName, String method) {
throw new UnsupportedOperationException(
"Client should not be checking uncoordinated call");
throw new UnsupportedOperationException("Client should not be checking uncoordinated call");
}

/**
Expand All @@ -66,24 +77,33 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
lastSeenStateId.accumulate(header.getStateId());
if (federatedNamespaceIds.isDisable()) {
federatedNamespaceIds.updateNameserviceState(this.nsId, header.getStateId());
} else {
federatedNamespaceIds.updateStateUsingResponseHeader(header);
}
}

/**
* Client side implementation for providing state alignment info in requests.
*/
@Override
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
header.setStateId(lastSeenStateId.longValue());
if (federatedNamespaceIds.isDisable()) {
header.setStateId(federatedNamespaceIds.getNamespaceId(this.nsId, true).get());
header.clearNameserviceStateIdsContext();
} else {
federatedNamespaceIds.setRequestHeaderState(header);
}
}

/**
* Client side implementation only provides state alignment info in requests.
* Client does not receive RPC requests therefore this does nothing.
*/
@Override
public long receiveRequestState(RpcRequestHeaderProto header, long threshold)
throws IOException {
public long receiveRequestState(RpcRequestHeaderProto header, long threshold,
boolean isCoordinatedCall) throws IOException {
// Do nothing.
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.NameServiceStateIdMode;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.NameserviceStateIdProto;


/** Collection of last-seen namespace state Ids for a set of namespaces. */
public class FederatedNamespaceIds {

private final Map<String, NamespaceStateId> namespaceIdMap = new ConcurrentHashMap<>();
private NameServiceStateIdMode mode;

public FederatedNamespaceIds(NameServiceStateIdMode mode) {
this.mode = mode;
}

public void updateStateUsingRequestHeader(RpcRequestHeaderProto header) {
mode = RpcClientUtil.toStateIdMode(header);
header.getNameserviceStateIdsContext().getNameserviceStateIdsList()
.forEach(this::updateNameserviceState);
}

public void updateStateUsingResponseHeader(RpcResponseHeaderProto header) {
header.getNameserviceStateIdsContext().getNameserviceStateIdsList()
.forEach(this::updateNameserviceState);
}

public void updateNameserviceState(NameserviceStateIdProto proto) {
namespaceIdMap.computeIfAbsent(proto.getNsId(), n -> new NamespaceStateId());
namespaceIdMap.get(proto.getNsId()).update(proto.getStateId());
}

public void updateNameserviceState(String nsId, long stateId) {
namespaceIdMap.computeIfAbsent(nsId, n -> new NamespaceStateId());
namespaceIdMap.get(nsId).update(stateId);
}

public void setRequestHeaderState(RpcRequestHeaderProto.Builder headerBuilder) {
headerBuilder.getNameserviceStateIdsContextBuilder()
.setMode(RpcClientUtil.toNameServiceStateIdModeProto(mode));
namespaceIdMap.forEach((k, v) -> headerBuilder.getNameserviceStateIdsContextBuilder()
.addNameserviceStateIds(
NameserviceStateIdProto.newBuilder()
.setNsId(k)
.setStateId(v.get())
.build())
);
}

public void setRequestHeaderState(RpcRequestHeaderProto.Builder headerBuilder, String nsId) {
NamespaceStateId namespaceStateId = namespaceIdMap.get(nsId);
long stateId = (namespaceStateId == null) ? NamespaceStateId.DEFAULT : namespaceStateId.get();
headerBuilder.setStateId(stateId);
}

public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) {
headerBuilder.getNameserviceStateIdsContextBuilder()
.setMode(RpcClientUtil.toNameServiceStateIdModeProto(mode));
namespaceIdMap.forEach((k, v) -> headerBuilder.getNameserviceStateIdsContextBuilder()
.addNameserviceStateIds(
NameserviceStateIdProto.newBuilder()
.setNsId(k)
.setStateId(v.get())
.build())
);
}

public NamespaceStateId getNamespaceId(String nsId, boolean useDefault) {
if (useDefault) {
namespaceIdMap.computeIfAbsent(nsId, n -> new NamespaceStateId());
}
return namespaceIdMap.get(nsId);
}

public boolean isProxyMode() {
return mode == NameServiceStateIdMode.PROXY;
}

public boolean isTransmissionMode() {
return mode == NameServiceStateIdMode.TRANSMISSION;
}

public boolean isDisable() {
return mode == NameServiceStateIdMode.DISABLE;
}

public NameServiceStateIdMode getMode() {
return mode;
}

public boolean contains(String nsId) {
return this.namespaceIdMap.containsKey(nsId);
}
}
Loading