Skip to content

HDDS-1753. Datanode unable to find chunk while replication data using ratis. #1318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,10 @@ static RetryPolicy createRetryPolicy(Configuration conf) {
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
return retryPolicy;
}

static Long getMinReplicatedIndex(
Collection<RaftProtos.CommitInfoProto> commitInfos) {
return commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex)
.min(Long::compareTo).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,9 @@ public void computeAndSetChecksum(Yaml yaml) throws IOException {
* @return Protocol Buffer Message
*/
public abstract ContainerProtos.ContainerDataProto getProtoBufMessage();

/**
* Returns the blockCommitSequenceId.
*/
public abstract long getBlockCommitSequenceId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common
.interfaces.ContainerDeletionChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -165,6 +164,10 @@ public Map<Long, Container> getContainerMapCopy() {
return ImmutableMap.copyOf(containerMap);
}

public Map<Long, Container> getContainerMap() {
return Collections.unmodifiableMap(containerMap);
}

/**
* A simple interface for container Iterations.
* <p>
Expand Down Expand Up @@ -232,18 +235,6 @@ public ContainerReportsProto getContainerReport() throws IOException {
return crBuilder.build();
}

public List<ContainerData> chooseContainerForBlockDeletion(int count,
ContainerDeletionChoosingPolicy deletionPolicy)
throws StorageContainerException {
Map<Long, ContainerData> containerDataMap = containerMap.entrySet().stream()
.filter(e -> deletionPolicy.isValidContainerType(
e.getValue().getContainerType()))
.collect(Collectors.toMap(Map.Entry::getKey,
e -> e.getValue().getContainerData()));
return deletionPolicy
.chooseContainerForBlockDeletion(count, containerDataMap);
}

public Set<Long> getMissingContainerSet() {
return missingContainerSet;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
Expand Down Expand Up @@ -81,6 +82,11 @@ public CommandHandler getCloseContainerHandler() {
return handlerMap.get(Type.closeContainerCommand);
}

@VisibleForTesting
public CommandHandler getDeleteBlocksCommandHandler() {
return handlerMap.get(Type.deleteBlocksCommand);
}

/**
* Dispatch the command to the correct handler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ public long getNumReadStateMachineMissCount() {
return numReadStateMachineMissCount.value();
}

@VisibleForTesting
public long getNumReadStateMachineOps() {
return numReadStateMachineOps.value();
}

@VisibleForTesting
public long getNumBytesWrittenCount() {
return numBytesWrittenCount.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.ozone.container.common.transport.server.ratis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -518,7 +519,8 @@ public CompletableFuture<Message> query(Message request) {
}

private ByteString readStateMachineData(
ContainerCommandRequestProto requestProto, long term, long index) {
ContainerCommandRequestProto requestProto, long term, long index)
throws IOException {
// the stateMachine data is not present in the stateMachine cache,
// increment the stateMachine cache miss count
metrics.incNumReadStateMachineMissCount();
Expand All @@ -532,18 +534,24 @@ private ByteString readStateMachineData(
.setChunkData(chunkInfo);
ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setCmdType(Type.ReadChunk)
.setReadChunk(readChunkRequestProto)
.setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto)
.build();
DispatcherContext context =
new DispatcherContext.Builder()
.setTerm(term)
.setLogIndex(index)
.setReadFromTmpFile(true)
.build();
new DispatcherContext.Builder().setTerm(term).setLogIndex(index)
.setReadFromTmpFile(true).build();
// read the chunk
ContainerCommandResponseProto response =
dispatchCommand(dataContainerCommandProto, context);
if (response.getResult() != ContainerProtos.Result.SUCCESS) {
StorageContainerException sce =
new StorageContainerException(response.getMessage(),
response.getResult());
LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : "
+ "{} Container Result: {}", gid, response.getCmdType(), index,
response.getMessage(), response.getResult());
throw sce;
}

ReadChunkResponseProto responseProto = response.getReadChunk();

ByteString data = responseProto.getData();
Expand Down Expand Up @@ -746,7 +754,8 @@ private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
return future;
}

private void evictStateMachineCache() {
@VisibleForTesting
public void evictStateMachineCache() {
stateMachineDataCache.invalidateAll();
stateMachineDataCache.cleanUp();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would cause checkstyle issue.

import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
Expand All @@ -50,14 +46,7 @@
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
Expand All @@ -74,11 +63,11 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Collections;
import java.util.UUID;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -139,10 +128,10 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
TimeUnit.MILLISECONDS);
this.dispatcher = dispatcher;

RaftServer.Builder builder = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(dd))
.setProperties(serverProperties)
.setStateMachineRegistry(this::getStateMachine);
RaftServer.Builder builder =
RaftServer.newBuilder().setServerId(RatisHelper.toRaftPeerId(dd))
.setProperties(serverProperties)
.setStateMachineRegistry(this::getStateMachine);
if (tlsConfig != null) {
builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig));
}
Expand Down Expand Up @@ -507,6 +496,13 @@ private RaftClientRequest createRaftClientRequest(
null);
}

private GroupInfoRequest createGroupInfoRequest(
HddsProtos.PipelineID pipelineID) {
return new GroupInfoRequest(clientId, server.getId(),
RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()),
nextCallId());
}

private void handlePipelineFailure(RaftGroupId groupId,
RoleInfoProto roleInfoProto) {
String msg;
Expand Down Expand Up @@ -654,4 +650,12 @@ public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) {
triggerPipelineClose(groupId, msg,
ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true);
}

public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {
Long minIndex;
GroupInfoReply reply = getServer()
.getGroupInfo(createGroupInfoRequest(pipelineID.getProtobuf()));
minIndex = RatisHelper.getMinReplicatedIndex(reply.getCommitInfos());
return minIndex == null ? -1 : minIndex.longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -76,8 +75,6 @@
import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
.BlockDeletingService;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.ReflectionUtils;

Expand All @@ -86,15 +83,8 @@
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
Result.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -109,7 +99,6 @@ public class KeyValueHandler extends Handler {
private final ContainerType containerType;
private final BlockManager blockManager;
private final ChunkManager chunkManager;
private final BlockDeletingService blockDeletingService;
private final VolumeChoosingPolicy volumeChoosingPolicy;
private final long maxContainerSize;

Expand All @@ -126,18 +115,6 @@ public KeyValueHandler(Configuration config, StateContext context,
conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY,
OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT);
chunkManager = new ChunkManagerImpl(doSyncWrite);
long svcInterval = config
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
long serviceTimeout = config
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
this.blockDeletingService =
new BlockDeletingService(containerSet, svcInterval, serviceTimeout,
TimeUnit.MILLISECONDS, config);
blockDeletingService.start();
volumeChoosingPolicy = ReflectionUtils.newInstance(conf.getClass(
HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
.class, VolumeChoosingPolicy.class), conf);
Expand All @@ -160,7 +137,6 @@ public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() {

@Override
public void stop() {
blockDeletingService.shutdown();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,

// In case the chunk file does not exist but tmp chunk file exist,
// read from tmp chunk file if readFromTmpFile is set to true
if (!chunkFile.exists() && dispatcherContext.isReadFromTmpFile()) {
if (!chunkFile.exists() && dispatcherContext != null
&& dispatcherContext.isReadFromTmpFile()) {
chunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
}
data = ChunkUtils.readData(chunkFile, info, volumeIOStats);
Expand Down
Loading