Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void reinitialize(
* leader.
*/
public void onBecomeLeader() {
transactionStatusManager.clear();
transactionStatusManager.onBecomeLeader();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,19 @@ void recordTransactionCreated(DatanodeID dnId, long scmCmdId, Set<Long> dnTxSet)
.putIfAbsent(txId, new LinkedHashSet<>()));
}

public void clear() {
public void onBecomeLeader() {
transactionToRetryCountMap.clear();
scmDeleteBlocksCommandStatusManager.clear();
transactionToDNsCommitMap.clear();
txSizeMap.clear();
try {
initDataDistributionData();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this code to other method and call when there is leadertransfer instead using inside clear which is logically meant for clearing all the memory maps or alternatively just update the method name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Rename clear() to onBecomeLeader()

} catch (IOException e) {
LOG.warn("Failed to initialize Storage space distribution data. The feature will continue with current " +
"totalTxCount {}, totalBlockCount {}, totalBlocksSize {} and totalReplicatedBlocksSize {}. " +
"There is a high chance that the real data and current data has a gap.",
totalTxCount.get(), totalBlockCount.get(), totalBlocksSize.get(), totalReplicatedBlocksSize.get());
}
}

public void cleanAllTimeoutSCMCommand(long timeoutMs) {
Expand Down Expand Up @@ -672,8 +680,9 @@ private void initDataDistributionData() throws IOException {
totalBlockCount.set(summary.getTotalBlockCount());
totalBlocksSize.set(summary.getTotalBlockSize());
totalReplicatedBlocksSize.set(summary.getTotalBlockReplicatedSize());
LOG.info("Data distribution is enabled with totalBlockCount {} totalBlocksSize {}",
totalBlockCount.get(), totalBlocksSize.get());
LOG.info("Storage space distribution is initialized with totalTxCount {}, totalBlockCount {}, " +
"totalBlocksSize {} and totalReplicatedBlocksSize {}", totalTxCount.get(),
totalBlockCount.get(), totalBlocksSize.get(), totalReplicatedBlocksSize.get());
}
}

Expand All @@ -688,8 +697,7 @@ private DeletedBlocksTransactionSummary loadDeletedBlocksSummary() throws IOExce
}
return DeletedBlocksTransactionSummary.parseFrom(byteString);
} catch (IOException e) {
LOG.error("Failed to get property {} for service {}. DataDistribution function will be disabled.",
propertyName, SERVICE_NAME, e);
LOG.error("Failed to get property {} for service {}.", propertyName, SERVICE_NAME, e);
throw new IOException("Failed to get property " + propertyName, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
currentLeaderTerm.get());
scm.getSequenceIdGen().invalidateBatch();

try {
transactionBuffer.flush();
} catch (Exception ex) {
ExitUtils.terminate(1, "Failed to flush transactionBuffer", ex, StateMachine.LOG);
}

DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
.getDeletedBlockLog();
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package org.apache.hadoop.hdds.upgrade;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
Expand Down Expand Up @@ -109,12 +114,14 @@ public void init(OzoneConfiguration conf,
configurator.setUpgradeFinalizationExecutor(executor);

conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY, HDDSLayoutFeature.HBASE_SUPPORT.layoutVersion());
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100, MILLISECONDS);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 100, TimeUnit.MILLISECONDS);

ScmConfig scmConfig = conf.getObject(ScmConfig.class);
scmConfig.setBlockDeletionInterval(Duration.ofMillis(100));
conf.setFromObject(scmConfig);
Expand All @@ -123,6 +130,7 @@ public void init(OzoneConfiguration conf,
DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
dnConf.setBlockDeletionInterval(Duration.ofMillis(100));
dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(100));
conf.setFromObject(dnConf);

MiniOzoneHAClusterImpl.Builder clusterBuilder = MiniOzoneCluster.newHABuilder(conf);
Expand Down Expand Up @@ -344,13 +352,39 @@ public void testFinalizationNonEmptyClusterDataDistribution() throws Exception {
assertEquals(value.getBytes(UTF_8).length, summary.getTotalBlockSize());
assertEquals(value.getBytes(UTF_8).length * 3, summary.getTotalBlockReplicatedSize());

// transfer SCM leader
String newLeaderScmId = null;
for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) {
if (scm != activeSCM) {
newLeaderScmId = scm.getScmId();
break;
}
}
cluster.getStorageContainerLocationClient().transferLeadership(newLeaderScmId);
StorageContainerManager newActiveSCM = cluster.getActiveSCM();
deletedBlockLog = (DeletedBlockLogImpl) newActiveSCM.getScmBlockManager().getDeletedBlockLog();
SCMDeletedBlockTransactionStatusManager newStatusManager =
deletedBlockLog.getSCMDeletedBlockTransactionStatusManager();
// new leader SCM should have the right deletion tx summary
summary = newStatusManager.getTransactionSummary();
assertEquals(1, summary.getTotalTransactionCount());
assertEquals(1, summary.getTotalBlockCount());
assertEquals(value.getBytes(UTF_8).length, summary.getTotalBlockSize());
assertEquals(value.getBytes(UTF_8).length * 3, summary.getTotalBlockReplicatedSize());

// flush buffer and start SCMBlockDeletingService
for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) {
flushDBTransactionBuffer(scm);
scm.getScmBlockManager().getSCMBlockDeletingService().start();
}

// force close the container so that block can be deleted
activeSCM.getClientProtocolServer().closeContainer(
newActiveSCM.getClientProtocolServer().closeContainer(
keyDetails.getOzoneKeyLocations().get(0).getContainerID());
// wait for container to be closed
GenericTestUtils.waitFor(() -> {
try {
return activeSCM.getClientProtocolServer().getContainer(
return newActiveSCM.getClientProtocolServer().getContainer(
keyDetails.getOzoneKeyLocations().get(0).getContainerID())
.getState() == HddsProtos.LifeCycleState.CLOSED;
} catch (IOException e) {
Expand All @@ -359,15 +393,15 @@ public void testFinalizationNonEmptyClusterDataDistribution() throws Exception {
}
}, 100, 5000);

// flush buffer and start SCMBlockDeletingService
for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) {
flushDBTransactionBuffer(scm);
scm.getScmBlockManager().getSCMBlockDeletingService().start();
}

// wait for block deletion transactions to be confirmed by DN
GenericTestUtils.waitFor(
() -> statusManager.getTransactionSummary().getTotalTransactionCount() == 0, 100, 30000);
() -> newStatusManager.getTransactionSummary().getTotalTransactionCount() == 0, 100, 30000);

// transfer leader back to old SCM and verify
cluster.getStorageContainerLocationClient().transferLeadership(activeSCM.getScmId());
deletedBlockLog = (DeletedBlockLogImpl) activeSCM.getScmBlockManager().getDeletedBlockLog();
summary = deletedBlockLog.getSCMDeletedBlockTransactionStatusManager().getTransactionSummary();
assertEquals(EMPTY_SUMMARY, summary);
}

private Map<Long, List<DeletedBlock>> generateDeletedBlocks(int dataSize, boolean withSize) {
Expand Down