From 3e8df6866c7fb4456775e18da199db5c84c51659 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Fri, 16 Aug 2024 14:34:25 +0800 Subject: [PATCH] fix disk usage exceed threshold cluster can not spin up issue Signed-off-by: zane-neo --- CHANGELOG.md | 1 + .../cluster/block/ClusterBlocks.java | 22 +--------- .../allocation/DiskThresholdMonitor.java | 44 +++++++++++++++---- .../main/java/org/opensearch/node/Node.java | 4 +- .../allocation/DiskThresholdMonitorTests.java | 43 ++++++++---------- 5 files changed, 58 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 511dd77c24e22..ef971592e5199 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix array_index_out_of_bounds_exception when indexing documents with field name containing only dot ([#15126](https://github.com/opensearch-project/OpenSearch/pull/15126)) - Fixed array field name omission in flat_object function for nested JSON ([#13620](https://github.com/opensearch-project/OpenSearch/pull/13620)) - Fix range aggregation optimization ignoring top level queries ([#15194](https://github.com/opensearch-project/OpenSearch/pull/15194)) +- Fix disk usage exceeds threshold cluster can't spin up issue ([#15258](https://github.com/opensearch-project/OpenSearch/pull/15258))) ### Security diff --git a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java index 4fcbbc3a9d135..02a20b7681ba7 100644 --- a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java @@ -67,9 +67,9 @@ public class ClusterBlocks extends AbstractDiffable { private final Set global; - private Map> indicesBlocks; + private final Map> indicesBlocks; - private EnumMap levelHolders; + private final EnumMap levelHolders; ClusterBlocks(Set global, final Map> indicesBlocks) { this.global = global; @@ -161,24 +161,6 @@ public boolean hasIndexBlock(String index, ClusterBlock block) { return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block); } - public void removeIndexBlock(String index, ClusterBlock block) { - Map> newIndicesBlocks = new HashMap<>(indicesBlocks); // copy to avoid UnsupportedOperationException> - for (Map.Entry> entry : indicesBlocks.entrySet()) { - String indexName = entry.getKey(); - Set clusterBlockSet = new HashSet<>(entry.getValue()); - if (indexName.equals(index)) { - clusterBlockSet.remove(block); - if (clusterBlockSet.isEmpty()) { - newIndicesBlocks.remove(indexName); - } else { - newIndicesBlocks.put(indexName, Collections.unmodifiableSet(clusterBlockSet)); - } - } - } - this.indicesBlocks = Collections.unmodifiableMap(newIndicesBlocks); - this.levelHolders = generateLevelHolders(global, indicesBlocks); - } - public boolean hasIndexBlockWithId(String index, int blockId) { final Set clusterBlocks = indicesBlocks.get(index); if (clusterBlocks != null) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index 9c784ad289208..aabb1c5b13da3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -39,8 +39,8 @@ import org.opensearch.client.Client; import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.DiskUsage; -import org.opensearch.cluster.block.ClusterBlock; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; @@ -50,8 +50,8 @@ import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; @@ -82,6 +82,7 @@ public class DiskThresholdMonitor { private static final Logger logger = LogManager.getLogger(DiskThresholdMonitor.class); private final DiskThresholdSettings diskThresholdSettings; private final Client client; + private final ClusterService clusterService; private final Supplier clusterStateSupplier; private final LongSupplier currentTimeMillisSupplier; private final RerouteService rerouteService; @@ -107,17 +108,16 @@ public class DiskThresholdMonitor { private final Set nodesOverHighThresholdAndRelocating = Sets.newConcurrentHashSet(); public DiskThresholdMonitor( - Settings settings, - Supplier clusterStateSupplier, - ClusterSettings clusterSettings, + ClusterService clusterService, Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService ) { - this.clusterStateSupplier = clusterStateSupplier; + this.clusterService = clusterService; + this.clusterStateSupplier = clusterService::state; this.currentTimeMillisSupplier = currentTimeMillisSupplier; this.rerouteService = rerouteService; - this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); + this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings()); this.client = client; } @@ -158,6 +158,7 @@ public void onNewInfo(ClusterInfo info) { final Set indicesToMarkReadOnly = new HashSet<>(); RoutingNodes routingNodes = state.getRoutingNodes(); Set indicesNotToAutoRelease = new HashSet<>(); + final Set indicesToRemoveReadOnly = new HashSet<>(); markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease); final List usagesOverHighThreshold = new ArrayList<>(); @@ -255,8 +256,10 @@ public void onNewInfo(ClusterInfo info) { } } else { - for (Map.Entry> indexBlockEntry : state.blocks().indices().entrySet()) { - state.blocks().removeIndexBlock(indexBlockEntry.getKey(), IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK); + if (routingNode != null) { + for (ShardRouting routing : routingNode) { + indicesToRemoveReadOnly.add(routing.getIndexName()); + } } nodesOverHighThresholdAndRelocating.remove(node); @@ -290,6 +293,29 @@ public void onNewInfo(ClusterInfo info) { } } + // remove read-only blocks for indices. + if (!indicesToRemoveReadOnly.isEmpty()) { + clusterService.submitStateUpdateTask("disk-usage-recovered", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(state.blocks()); + for (String index : indicesToRemoveReadOnly) { + clusterBlocksBuilder.removeIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK); + } + final Metadata metadata = Metadata.builder() + .clusterUUID(state.metadata().clusterUUID()) + .coordinationMetadata(state.metadata().coordinationMetadata()) + .build(); + return ClusterState.builder(state).metadata(metadata).blocks(clusterBlocksBuilder.build()).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("failed to update cluster state for disk usage recovery task", e); + } + }); + } + final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 4); if (reroute) { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1a9b233b387b2..19b5097dfc059 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1225,9 +1225,7 @@ protected Node( ); final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( - settings, - clusterService::state, - clusterService.getClusterSettings(), + clusterService, client, threadPool::relativeTimeInMillis, rerouteService diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 6ab57d10b05c1..b0c8448752b57 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -47,6 +47,9 @@ import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.service.ClusterApplierService; +import org.opensearch.cluster.service.ClusterManagerService; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -72,6 +75,8 @@ import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class DiskThresholdMonitorTests extends OpenSearchAllocationTestCase { @@ -116,9 +121,7 @@ public void testMarkFloodStageIndicesReadOnly() { AtomicReference> indices = new AtomicReference<>(); AtomicLong currentTime = new AtomicLong(); DiskThresholdMonitor monitor = new DiskThresholdMonitor( - Settings.EMPTY, - () -> clusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + createClusterService(Settings.EMPTY, clusterState), null, currentTime::get, (reason, priority, listener) -> { @@ -178,9 +181,7 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); monitor = new DiskThresholdMonitor( - Settings.EMPTY, - () -> anotherFinalClusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + createClusterService(Settings.EMPTY, anotherFinalClusterState), null, currentTime::get, (reason, priority, listener) -> { @@ -219,9 +220,7 @@ public void testDoesNotSubmitRerouteTaskTooFrequently() { AtomicLong currentTime = new AtomicLong(); AtomicReference> listenerReference = new AtomicReference<>(); DiskThresholdMonitor monitor = new DiskThresholdMonitor( - Settings.EMPTY, - () -> clusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + createClusterService(Settings.EMPTY, clusterState), null, currentTime::get, (reason, priority, listener) -> { @@ -360,9 +359,7 @@ public void testAutoReleaseIndices() { ); DiskThresholdMonitor monitor = new DiskThresholdMonitor( - Settings.EMPTY, - () -> clusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + createClusterService(Settings.EMPTY, clusterState), null, () -> 0L, (reason, priority, listener) -> { @@ -422,9 +419,7 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); monitor = new DiskThresholdMonitor( - Settings.EMPTY, - () -> clusterStateWithBlocks, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + createClusterService(Settings.EMPTY, clusterStateWithBlocks), null, () -> 0L, (reason, priority, listener) -> { @@ -539,9 +534,7 @@ public long getAsLong() { final AtomicLong relocatingShardSizeRef = new AtomicLong(); DiskThresholdMonitor monitor = new DiskThresholdMonitor( - Settings.EMPTY, - clusterStateRef::get, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + createClusterService(Settings.EMPTY, clusterState), null, timeSupplier, (reason, priority, listener) -> listener.onResponse(clusterStateRef.get()) @@ -687,9 +680,7 @@ public void testIndexCreateBlockWhenNoDataNodeHealthy() { AtomicLong currentTime = new AtomicLong(); Settings settings = Settings.builder().build(); DiskThresholdMonitor monitor = new DiskThresholdMonitor( - settings, - () -> clusterState, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + createClusterService(settings, clusterState), null, currentTime::get, (reason, priority, listener) -> { @@ -766,9 +757,7 @@ public void testIndexCreateBlockRemovedOnlyWhenAnyNodeAboveHighWatermark() { AtomicLong currentTime = new AtomicLong(); Settings settings = Settings.builder().put(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), true).build(); DiskThresholdMonitor monitor = new DiskThresholdMonitor( - settings, - () -> clusterState, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + createClusterService(settings, clusterState), null, currentTime::get, (reason, priority, listener) -> { @@ -905,4 +894,10 @@ private static ClusterInfo clusterInfo( return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of()); } + private static ClusterService createClusterService(Settings settings, ClusterState clusterState) { + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + when(clusterApplierService.state()).thenReturn(clusterState); + return new ClusterService(settings, clusterSettings, mock(ClusterManagerService.class), clusterApplierService); + } }