Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Use remote publication flag to decide which custom objects to upload #14390

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
Expand All @@ -25,10 +27,9 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
Expand Down Expand Up @@ -126,18 +127,35 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
}

public Map<String, ClusterState.Custom> getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) {
Map<String, ClusterState.Custom> updatedCustoms = new HashMap<>();
Set<String> currentCustoms = new HashSet<>(clusterState.customs().keySet());
for (Map.Entry<String, ClusterState.Custom> entry : previousClusterState.customs().entrySet()) {
if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) {
updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey()));
}
currentCustoms.remove(entry.getKey());
public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> getUpdatedCustoms(
ClusterState clusterState,
ClusterState previousClusterState,
boolean isRemotePublicationEnabled,
boolean isFirstUpload
) {
if (!isRemotePublicationEnabled) {
// When isRemotePublicationEnabled is false, we do not want store any custom objects
return DiffableUtils.diff(
Collections.emptyMap(),
Collections.emptyMap(),
DiffableUtils.getStringKeySerializer(),
NonDiffableValueSerializer.getAbstractInstance()
);
}
for (String custom : currentCustoms) {
updatedCustoms.put(custom, clusterState.customs().get(custom));
if (isFirstUpload) {
// For first upload of ephemeral metadata, we want to upload all customs
return DiffableUtils.diff(
Collections.emptyMap(),
clusterState.customs(),
DiffableUtils.getStringKeySerializer(),
NonDiffableValueSerializer.getAbstractInstance()
);
}
return updatedCustoms;
return DiffableUtils.diff(
previousClusterState.customs(),
clusterState.customs(),
DiffableUtils.getStringKeySerializer(),
NonDiffableValueSerializer.getAbstractInstance()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -88,6 +89,7 @@

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS;
Expand Down Expand Up @@ -159,6 +161,7 @@ public class RemoteClusterStateService implements Closeable {
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private final boolean isPublicationEnabled;

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
Expand Down Expand Up @@ -201,6 +204,9 @@ public RemoteClusterStateService(
threadPool
);
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
}

/**
Expand All @@ -221,15 +227,15 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
clusterState,
new ArrayList<>(clusterState.metadata().indices().values()),
emptyMap(),
clusterState.metadata().customs(),
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled),
true,
true,
true,
true,
true,
true,
clusterState.customs(),
true,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(),
isPublicationEnabled,
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable())
);
final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
Expand Down Expand Up @@ -285,28 +291,17 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
}
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();

final Map<String, UploadedMetadataAttribute> customsToBeDeletedFromRemote = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, Metadata.Custom> customsToUpload = remoteGlobalMetadataManager.getUpdatedCustoms(
clusterState,
previousClusterState
);
final Map<String, UploadedMetadataAttribute> clusterStateCustomsToBeDeleted = new HashMap<>(
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();

final DiffableUtils.MapDiff<String, Metadata.Custom, Map<String, Metadata.Custom>> customsDiff = remoteGlobalMetadataManager
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled);
final DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> clusterStateCustomsDiff =
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, UploadedMetadataAttribute> allUploadedClusterStateCustomsMap = new HashMap<>(
previousManifest.getClusterStateCustomMap()
);
final Map<String, ClusterState.Custom> clusterStateCustomsToUpload = remoteClusterStateAttributesManager.getUpdatedCustoms(
clusterState,
previousClusterState
);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
for (final String custom : clusterState.metadata().customs().keySet()) {
// remove all the customs which are present currently
customsToBeDeletedFromRemote.remove(custom);
}
final Map<String, IndexMetadata> indicesToBeDeletedFromRemote = new HashMap<>(previousClusterState.metadata().indices());
for (final String custom : clusterState.customs().keySet()) {
// remove all the custom which are present currently
clusterStateCustomsToBeDeleted.remove(custom);
}
int numIndicesUpdated = 0;
int numIndicesUnchanged = 0;
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndexMetadata = previousManifest.getIndices()
Expand Down Expand Up @@ -337,42 +332,44 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName());
}

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = remoteRoutingTableService
final DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = remoteRoutingTableService
.getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable());
List<IndexRoutingTable> indicesRoutingToUpload = new ArrayList<>();
final List<IndexRoutingTable> indicesRoutingToUpload = new ArrayList<>();
routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v));

UploadedMetadataResults uploadedMetadataResults;
// For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files,
// If file is empty and codec is 1 then write global metadata.
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();
boolean updateCoordinationMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isCoordinationMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
;
boolean updateSettingsMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
boolean updateTransientSettingsMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isTransientSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
boolean updateTransientSettingsMetadata = Metadata.isTransientSettingsMetadataEqual(
previousClusterState.metadata(),
clusterState.metadata()
) == false;
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
// ToDo: check if these needs to be updated or not
final boolean updateDiscoveryNodes = clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = firstUploadForSplitGlobalMetadata

final boolean updateDiscoveryNodes = isPublicationEnabled
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
|| Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

uploadedMetadataResults = writeMetadataInParallel(
clusterState,
toUpload,
prevIndexMetadataByName,
firstUploadForSplitGlobalMetadata ? clusterState.metadata().customs() : customsToUpload,
customsDiff.getUpserts(),
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
updateDiscoveryNodes,
updateClusterBlocks,
updateTransientSettingsMetadata,
clusterStateCustomsToUpload,
clusterStateCustomsDiff.getUpserts(),
updateHashesOfConsistentSettings,
indicesRoutingToUpload
);
Expand All @@ -382,10 +379,11 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
);
allUploadedCustomMap.putAll(uploadedMetadataResults.uploadedCustomMetadataMap);
allUploadedClusterStateCustomsMap.putAll(uploadedMetadataResults.uploadedClusterStateCustomMetadataMap);
// remove the data for removed custom/indices
customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove);
customsDiff.getDeletes().forEach(allUploadedCustomMap::remove);
indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove);
clusterStateCustomsToBeDeleted.keySet().forEach(allUploadedCustomMap::remove);
clusterStateCustomsDiff.getDeletes().forEach(allUploadedClusterStateCustomsMap::remove);

if (!updateCoordinationMetadata) {
uploadedMetadataResults.uploadedCoordinationMetadata = previousManifest.getCoordinationMetadata();
Expand All @@ -399,31 +397,24 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
if (!updateTemplatesMetadata) {
uploadedMetadataResults.uploadedTemplatesMetadata = previousManifest.getTemplatesMetadata();
}
if (!updateDiscoveryNodes && !firstUploadForSplitGlobalMetadata) {
if (!updateDiscoveryNodes) {
uploadedMetadataResults.uploadedDiscoveryNodes = previousManifest.getDiscoveryNodesMetadata();
}
if (!updateClusterBlocks && !firstUploadForSplitGlobalMetadata) {
if (!updateClusterBlocks) {
uploadedMetadataResults.uploadedClusterBlocks = previousManifest.getClusterBlocksMetadata();
}
if (!updateHashesOfConsistentSettings && !firstUploadForSplitGlobalMetadata) {
if (!updateHashesOfConsistentSettings) {
uploadedMetadataResults.uploadedHashesOfConsistentSettings = previousManifest.getHashesOfConsistentSettings();
}
if (!firstUploadForSplitGlobalMetadata && customsToUpload.isEmpty()) {
uploadedMetadataResults.uploadedCustomMetadataMap = previousManifest.getCustomMetadataMap();
}
if (!firstUploadForSplitGlobalMetadata && clusterStateCustomsToUpload.isEmpty()) {
uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = previousManifest.getClusterStateCustomMap();
}
uploadedMetadataResults.uploadedCustomMetadataMap = allUploadedCustomMap;
uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = allUploadedClusterStateCustomsMap;
uploadedMetadataResults.uploadedIndexMetadata = new ArrayList<>(allUploadedIndexMetadata.values());

List<ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = new ArrayList<>();
allUploadedIndicesRouting = remoteRoutingTableService.getAllUploadedIndicesRouting(
uploadedMetadataResults.uploadedIndicesRoutingMetadata = remoteRoutingTableService.getAllUploadedIndicesRouting(
previousManifest,
uploadedMetadataResults.uploadedIndicesRoutingMetadata,
routingTableDiff.getDeletes()
);
uploadedMetadataResults.uploadedIndicesRoutingMetadata = allUploadedIndicesRouting;

final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
clusterState,
Expand All @@ -448,7 +439,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
customsToUpload.size(),
customsDiff.getUpserts().size(),
indicesRoutingToUpload.size()
);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
Expand All @@ -464,7 +455,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
customsToUpload.size()
customsDiff.getUpserts().size()
);
} else {
logger.info("{}; {}", clusterStateUploadTimeMessage, metadataUpdateMessage);
Expand All @@ -479,7 +470,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
customsToUpload.size()
customsDiff.getUpserts().size()
);
}
return manifestDetails;
Expand Down
Loading
Loading