Skip to content

Commit

Permalink
fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bharatviswa504 committed Jun 24, 2019
1 parent 56eef3b commit 58a756b
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public enum DummyAction implements AuditAction {

CREATE_VOLUME,
CREATE_BUCKET,
CREATE_KEY,
READ_VOLUME,
READ_BUCKET,
READ_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public enum OMAction implements AuditAction {
COMMIT_KEY,
CREATE_VOLUME,
CREATE_BUCKET,
CREATE_KEY,
DELETE_VOLUME,
DELETE_BUCKET,
DELETE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ message CreateKeyRequest {
required KeyArgs keyArgs = 1;
// Set in OM HA during preExecute step. This way all OM's use same ID in
// OM HA.
optional uint64 ID = 2;
optional uint64 clientID = 2;
}

message CreateKeyResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,8 @@ public void commitKey(OmKeyArgs args, long clientID) throws IOException {
validateBucket(volumeName, bucketName);
OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
if (keyInfo == null) {
throw new OMException("Commit a key without corresponding entry " +
openKey, KEY_NOT_FOUND);
throw new OMException("Failed to commit key, as " + openKey + "entry " +
"is not found in the openKey table", KEY_NOT_FOUND);
}
keyInfo.setDataSize(args.getDataSize());
keyInfo.setModificationTime(Time.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,7 @@ public boolean isVolumeEmpty(String volume) throws IOException {
try (TableIterator<String, ? extends KeyValue<String, OmBucketInfo>>
bucketIter = bucketTable.iterator()) {
KeyValue<String, OmBucketInfo> kv = bucketIter.seek(volumePrefix);
// During iteration from DB, check in mean time if this bucket is not
// marked for delete.
if (kv != null && kv.getKey().startsWith(volumePrefix) &&
bucketTable.get(kv.getKey()) != null) {
if (kv != null && kv.getKey().startsWith(volumePrefix)) {
return false; // we found at least one bucket with this volume prefix.
}
}
Expand Down Expand Up @@ -512,10 +509,7 @@ public boolean isBucketEmpty(String volume, String bucket)
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
keyTable.iterator()) {
KeyValue<String, OmKeyInfo> kv = keyIter.seek(keyPrefix);
// During iteration from DB, check in mean time if this key is not
// marked for delete.
if (kv != null && kv.getKey().startsWith(keyPrefix) &&
keyTable.get(kv.getKey()) != null) {
if (kv != null && kv.getKey().startsWith(keyPrefix)) {
return false; // we found at least one key with this vol/bucket prefix.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final long scmBlockSize;
private final int preallocateBlocksMax;
private final boolean grpcBlockTokenEnabled;
private final boolean useRatis;
private final boolean useRatisForReplication;


private OzoneManager(OzoneConfiguration conf) throws IOException,
Expand Down Expand Up @@ -431,16 +431,16 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
this.grpcBlockTokenEnabled = conf.getBoolean(
HDDS_BLOCK_TOKEN_ENABLED,
HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY,
DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
this.useRatisForReplication = conf.getBoolean(
DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
}

/**
* Return configuration value of
* {@link OzoneConfigKeys#DFS_CONTAINER_RATIS_ENABLED_KEY}.
*/
public boolean isUseRatis() {
return useRatis;
public boolean shouldUseRatis() {
return useRatisForReplication;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,13 @@ public void updateServerRole() {
} else if (thisNodeRole.equals(RaftPeerRole.FOLLOWER)) {
ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
.getLeaderInfo().getId().getId();
RaftPeerId leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
// There may be a chance, here we get leaderNodeId as null. For
// example, in 3 node OM Ratis, if 2 OM nodes are down, there will
// be no leader.
RaftPeerId leaderPeerId = null;
if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
}

setServerRole(thisNodeRole, leaderPeerId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,

OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
try {
validateBucket(omMetadataManager, volumeName,
validateBucketAndVolume(omMetadataManager, volumeName,
bucketName);
} catch (IOException ex) {
LOG.error("AllocateBlock failed for Key: {} in volume/bucket:{}/{}",
Expand All @@ -210,7 +210,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
try {
omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
if (omKeyInfo == null) {
throw new OMException("Open Key not found", KEY_NOT_FOUND);
throw new OMException("Open Key not found " + openKey, KEY_NOT_FOUND);
}

// Append new block
Expand All @@ -229,11 +229,10 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
exception = ex;
}

// Performing audit logging outside of the lock.
auditLog(auditLogger, buildAuditMessage(OMAction.ALLOCATE_BLOCK, auditMap,
exception, getOmRequest().getUserInfo()));

// return response after releasing lock.

if (exception == null) {
omResponse.setAllocateBlockResponse(AllocateBlockResponse.newBuilder()
.setKeyLocation(blockLocation).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
IOException exception = null;
OmKeyInfo omKeyInfo = null;
try {
validateBucket(omMetadataManager, volumeName, bucketName);
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
omKeyInfo = omMetadataManager.getOpenKeyTable().get(dbOpenKey);
if (omKeyInfo == null) {
throw new OMException("Commit a key without corresponding entry " +
dbOpenKey, KEY_NOT_FOUND);
throw new OMException("Failed to commit key, as " + dbOpenKey +
"entry is not found in the openKey table", KEY_NOT_FOUND);
}
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
// allocateBlock call happen's we shall know type and factor, as we set
// the type and factor read from multipart table, and set the KeyInfo in
// validateAndUpdateCache and return to the client. TODO: See if we can fix
// this.
// this. We do not call allocateBlock in openKey for multipart upload.

CreateKeyRequest.Builder newCreateKeyRequest = null;
KeyArgs.Builder newKeyArgs = null;
Expand All @@ -106,7 +106,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
final long requestedSize = keyArgs.getDataSize() > 0 ?
keyArgs.getDataSize() : scmBlockSize;

boolean useRatis = ozoneManager.isUseRatis();
boolean useRatis = ozoneManager.shouldUseRatis();

HddsProtos.ReplicationFactor factor = keyArgs.getFactor();
if (factor == null) {
Expand Down Expand Up @@ -145,7 +145,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {

newCreateKeyRequest =
createKeyRequest.toBuilder().setKeyArgs(newKeyArgs)
.setID(UniqueId.next());
.setClientID(UniqueId.next());

return getOmRequest().toBuilder()
.setCreateKeyRequest(newCreateKeyRequest).setUserInfo(getUserInfo())
Expand Down Expand Up @@ -196,7 +196,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,

OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
String dbOpenKeyName = omMetadataManager.getOpenKey(volumeName,
bucketName, keyName, createKeyRequest.getID());
bucketName, keyName, createKeyRequest.getClientID());
String dbKeyName = omMetadataManager.getOzoneKey(volumeName, bucketName,
keyName);
String dbBucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
Expand All @@ -208,7 +208,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
IOException exception = null;
omMetadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
validateBucket(omMetadataManager, volumeName, bucketName);
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
//TODO: We can optimize this get here, if getKmsProvider is null, then
// bucket encryptionInfo will be not set. If this assumption holds
// true, we can avoid get from bucket table.
Expand Down Expand Up @@ -265,7 +265,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
auditLog(auditLogger, buildAuditMessage(OMAction.ALLOCATE_KEY, auditMap,
exception, getOmRequest().getUserInfo()));

long clientID = createKeyRequest.getID();
long clientID = createKeyRequest.getClientID();

omResponse.setCreateKeyResponse(CreateKeyResponse.newBuilder()
.setKeyInfo(omKeyInfo.getProtobuf())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class OMKeyDeleteRequest extends OMClientRequest
implements OMKeyRequest {

private static final Logger LOG =
LoggerFactory.getLogger(OMKeyCommitRequest.class);
LoggerFactory.getLogger(OMKeyDeleteRequest.class);

public OMKeyDeleteRequest(OMRequest omRequest) {
super(omRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ default UserGroupInformation getRemoteUser() throws IOException {
* @param bucketName
* @throws IOException
*/
default void validateBucket(OMMetadataManager omMetadataManager,
default void validateBucketAndVolume(OMMetadataManager omMetadataManager,
String volumeName, String bucketName)
throws IOException {
String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,26 @@ public OMResponse submitRequest(RpcController controller,
return submitReadRequestToOM(request);
} else {
// PreExecute if needed.
try {
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(request);
if (omClientRequest != null) {
request = omClientRequest.preExecute(ozoneManager);
if (omRatisServer.isLeader()) {
try {
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(request);
if (omClientRequest != null) {
request = omClientRequest.preExecute(ozoneManager);
}
} catch (IOException ex) {
// As some of the preExecute returns error. So handle here.
return createErrorResponse(request, ex);
}
} catch (IOException ex) {
// As some of the preExecute returns error. So handle here.
return createErrorResponse(request, ex);
return submitRequestToRatis(request);
} else {
// throw not leader exception. This is being done, so to avoid
// unnecessary execution of preExecute on follower OM's. This
// will be helpful in the case like where we we reduce the
// chance of allocate blocks on follower OM's. Right now our
// leader status is updated every 1 second.
throw createNotLeaderException();
}
return submitRequestToRatis(request);
}
} else {
return submitRequestDirectlyToOM(request);
Expand Down Expand Up @@ -153,24 +162,28 @@ private OMResponse submitReadRequestToOM(OMRequest request)
if (omRatisServer.isLeader()) {
return handler.handle(request);
} else {
RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
Optional<RaftPeerId> leaderRaftPeerId = omRatisServer
.getCachedLeaderPeerId();
throw createNotLeaderException();
}
}

NotLeaderException notLeaderException;
if (leaderRaftPeerId.isPresent()) {
notLeaderException = new NotLeaderException(raftPeerId.toString());
} else {
notLeaderException = new NotLeaderException(
raftPeerId.toString(), leaderRaftPeerId.toString());
}
private ServiceException createNotLeaderException() {
RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
Optional<RaftPeerId> leaderRaftPeerId = omRatisServer
.getCachedLeaderPeerId();

if (LOG.isDebugEnabled()) {
LOG.debug(notLeaderException.getMessage());
}
NotLeaderException notLeaderException;
if (leaderRaftPeerId.isPresent()) {
notLeaderException = new NotLeaderException(raftPeerId.toString());
} else {
notLeaderException = new NotLeaderException(
raftPeerId.toString(), leaderRaftPeerId.toString());
}

throw new ServiceException(notLeaderException);
if (LOG.isDebugEnabled()) {
LOG.debug(notLeaderException.getMessage());
}

return new ServiceException(notLeaderException);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testValidateAndUpdateCache() throws Exception {
TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager);

long id = modifiedOmRequest.getCreateKeyRequest().getID();
long id = modifiedOmRequest.getCreateKeyRequest().getClientID();

String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, id);
Expand Down Expand Up @@ -128,7 +128,7 @@ public void testValidateAndUpdateCacheWithNoSuchMultipartUploadError()
TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager);

long id = modifiedOmRequest.getCreateKeyRequest().getID();
long id = modifiedOmRequest.getCreateKeyRequest().getClientID();

String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, id);
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testValidateAndUpdateCacheWithVolumeNotFound() throws Exception {
new OMKeyCreateRequest(modifiedOmRequest);


long id = modifiedOmRequest.getCreateKeyRequest().getID();
long id = modifiedOmRequest.getCreateKeyRequest().getClientID();

String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, id);
Expand Down Expand Up @@ -202,7 +202,7 @@ public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
new OMKeyCreateRequest(modifiedOmRequest);


long id = modifiedOmRequest.getCreateKeyRequest().getID();
long id = modifiedOmRequest.getCreateKeyRequest().getClientID();

String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, id);
Expand Down Expand Up @@ -259,9 +259,9 @@ private OMRequest doPreExecute(OMRequest originalOMRequest) throws Exception {
Assert.assertTrue(keyArgs.getModificationTime() > 0);


// ID should be set.
Assert.assertTrue(createKeyRequest.hasID());
Assert.assertTrue(createKeyRequest.getID() > 0);
// Client ID should be set.
Assert.assertTrue(createKeyRequest.hasClientID());
Assert.assertTrue(createKeyRequest.getClientID() > 0);


if (!originalOMRequest.getCreateKeyRequest().getKeyArgs()
Expand Down

0 comments on commit 58a756b

Please sign in to comment.