Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import org.opensearch.cluster.metadata.Metadata.Custom;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -80,7 +82,8 @@ public TransportClusterStateAction(
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
@Nullable RemoteClusterStateService remoteClusterStateService
) {
super(
ClusterStateAction.NAME,
Expand All @@ -93,6 +96,7 @@ public TransportClusterStateAction(
indexNameExpressionResolver
);
this.localExecuteSupported = true;
this.remoteClusterStateService = remoteClusterStateService;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -63,6 +64,9 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteManifestManager;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -94,6 +98,7 @@ public abstract class TransportClusterManagerNodeAction<Request extends ClusterM
protected final TransportService transportService;
protected final ClusterService clusterService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
protected RemoteClusterStateService remoteClusterStateService;

private final String executor;

Expand Down Expand Up @@ -378,11 +383,15 @@ public void handleResponse(GetTermVersionResponse response) {
response.getClusterStateTermVersion(),
isLatestClusterStatePresentOnLocalNode
);
if (isLatestClusterStatePresentOnLocalNode) {
onLatestLocalState.accept(clusterState);

ClusterState stateFromNode = getStateFromLocalNode(response.getClusterStateTermVersion());
if (stateFromNode != null) {
onLatestLocalState.accept(stateFromNode);
} else {
// fallback to clusterManager
onStaleLocalState.accept(clusterManagerNode, clusterState);
}

}

@Override
Expand All @@ -405,6 +414,37 @@ public GetTermVersionResponse read(StreamInput in) throws IOException {
};
}

public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) {
ClusterState appliedState = clusterService.state();
if (termVersion.equals(new ClusterStateTermVersion(appliedState))) {
return appliedState;
}
ClusterState publishState = clusterService.publishState();
if (publishState != null && termVersion.equals(new ClusterStateTermVersion(publishState))) {
return publishState;
}
if (remoteClusterStateService != null) {
try {
String manifestFile = RemoteManifestManager.getManifestFilePrefixForTermVersion(
termVersion.getTerm(),
termVersion.getVersion()
);
ClusterMetadataManifest clusterMetadataManifestByFileName = remoteClusterStateService
.getClusterMetadataManifestByFileName(appliedState.getClusterName().value(), manifestFile);
ClusterState clusterStateForManifest = remoteClusterStateService.getClusterStateForManifest(
appliedState.getClusterName().value(),
clusterMetadataManifestByFileName,
appliedState.nodes().getLocalNode().getId(),
true
);
return clusterStateForManifest;
} catch (IOException e) {

}
}
return null;
}

private boolean checkForBlock(Request request, ClusterState localClusterState) {
final ClusterBlockException blockException = checkBlock(request, localClusterState);
if (blockException != null) {
Expand Down Expand Up @@ -510,4 +550,9 @@ protected String getMasterActionName(DiscoveryNode node) {
protected boolean localExecuteSupportedByAction() {
return false;
}

public void setRemoteClusterStateService(RemoteClusterStateService remoteClusterStateService) {
this.remoteClusterStateService = remoteClusterStateService;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,12 @@ && getCurrentTerm() == ZEN1_BWC_TERM
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);

ClusterState publishState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
final ClusterState publishClusterState = mode == Mode.CANDIDATE
? clusterStateWithNoClusterManagerBlock(publishState)
: publishState;
clusterApplier.onPublishClusterState(publishRequest.toString(), () -> publishClusterState);

if (sourceNode.equals(getLocalNode())) {
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface ClusterApplier {
*/
void setInitialState(ClusterState initialState);

void onPublishClusterState(String source, Supplier<ClusterState> clusterStateSupplier);

/**
* Method to invoke when a new cluster state is available to be applied
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>();
private final AtomicReference<ClusterState> publishState = new AtomicReference<>(); // last published state

private final AtomicReference<ClusterState> state; // last applied state

Expand Down Expand Up @@ -233,6 +234,10 @@ public ClusterState state() {
return clusterState;
}

public ClusterState publishState() {
return publishState.get();
}

/**
* Returns true if the appliedClusterState is not null
*/
Expand Down Expand Up @@ -368,6 +373,14 @@ public ThreadPool threadPool() {
return threadPool;
}

@Override
public void onPublishClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier) {
ClusterState nextState = clusterStateSupplier.get();
if (nextState != null) {
publishState.set(nextState);
}
}

@Override
public void onNewClusterState(
final String source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ public ClusterState state() {
return clusterApplierService.state();
}

public ClusterState publishState() {
return clusterApplierService.publishState();
}

/**
* Adds a high priority applier of updated cluster states.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.DiffableStringMap;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -32,6 +33,7 @@
import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -114,6 +116,7 @@
*
* @opensearch.internal
*/
@InternalApi
public class RemoteClusterStateService implements Closeable {

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);
Expand Down Expand Up @@ -979,6 +982,8 @@ BlobStore getBlobStore() {
return blobStoreRepository.blobStore();
}

AtomicReference<ClusterState> lastDownloadState = new AtomicReference<>();

/**
* Fetch latest ClusterState from remote, including global metadata, index metadata and cluster state version
*
Expand All @@ -996,8 +1001,16 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID
String.format(Locale.ROOT, "Latest cluster metadata manifest is not present for the provided clusterUUID: %s", clusterUUID)
);
}
ClusterStateTermVersion clusterStateTermVersion = new ClusterStateTermVersion(
new ClusterName(clusterName),
clusterUUID,
clusterMetadataManifest.get().getClusterTerm(),
clusterMetadataManifest.get().getStateVersion()
);

return getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId, includeEphemeral);
ClusterState state = getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId, includeEphemeral);
lastDownloadState.set(state);
return state;
}

// package private for testing
Expand Down Expand Up @@ -1311,8 +1324,29 @@ public ClusterState getClusterStateForManifest(
String localNodeId,
boolean includeEphemeral
) throws IOException {

ClusterStateTermVersion clusterStateTermVersion = new ClusterStateTermVersion(
new ClusterName(clusterName),
manifest.getClusterUUID(),
manifest.getClusterTerm(),
manifest.getStateVersion()
);
ClusterState lastState = lastDownloadState.get();
if (lastState != null) {
ClusterStateTermVersion lastStateTermVersion = new ClusterStateTermVersion(
new ClusterName(clusterName),
lastState.stateUUID(),
lastState.term(),
lastState.version()
);
if (clusterStateTermVersion.equals(lastStateTermVersion)) {
return lastState;
}
}

ClusterState retState = null;
if (manifest.onOrAfterCodecVersion(CODEC_V2)) {
return readClusterStateInParallel(
retState = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
Expand Down Expand Up @@ -1354,9 +1388,23 @@ public ClusterState getClusterStateForManifest(
);
Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest));
mb.indices(clusterState.metadata().indices());
return ClusterState.builder(clusterState).metadata(mb).build();
retState = ClusterState.builder(clusterState).metadata(mb).build();
}
setLastDownloadState(retState);
return retState;
}

private void setLastDownloadState(final ClusterState newState) {
lastDownloadState.getAndUpdate(oldState -> {
if (oldState == null) {
return newState;
}
if (newState.term() > oldState.term() && newState.version() > oldState.version()) {
return newState;
} else {
return oldState;
}
});
}

public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId)
Expand Down Expand Up @@ -1437,11 +1485,14 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
indexRoutingTables.remove(indexName);
}

return clusterStateBuilder.stateUUID(manifest.getStateUUID())
final ClusterState newState = clusterStateBuilder.stateUUID(manifest.getStateUUID())
.version(manifest.getStateVersion())
.metadata(metadataBuilder)
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.build();

setLastDownloadState(newState);
return newState;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private List<BlobMetadata> getManifestFileNames(String clusterName, String clust
}
}

static String getManifestFilePrefixForTermVersion(long term, long version) {
public static String getManifestFilePrefixForTermVersion(long term, long version) {
return String.join(
DELIMITER,
RemoteClusterMetadataManifest.MANIFEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public void setInitialState(ClusterState initialState) {

}

@Override
public void onPublishClusterState(String source, Supplier<ClusterState> clusterStateSupplier) {

}

@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
listener.onSuccess(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2412,7 +2412,8 @@ public void onFailure(final Exception e) {
clusterService,
threadPool,
actionFilters,
indexNameExpressionResolver
indexNameExpressionResolver,
null
)
);
actions.put(
Expand Down