Skip to content

Adding dry_run mode for setting data stream settings #128269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 23, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.CountDownActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -107,53 +106,46 @@ protected void masterOperation(
request.indices()
);
List<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> dataStreamSettingsResponse = new ArrayList<>();
CountDownActionListener countDownListener = new CountDownActionListener(dataStreamNames.size() + 1, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
listener.onResponse(new UpdateDataStreamSettingsAction.Response(dataStreamSettingsResponse));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
CountDownActionListener countDownListener = new CountDownActionListener(
dataStreamNames.size() + 1,
listener.delegateFailure(
(responseActionListener, unused) -> responseActionListener.onResponse(
new UpdateDataStreamSettingsAction.Response(dataStreamSettingsResponse)
)
)
);
countDownListener.onResponse(null);
for (String dataStreamName : dataStreamNames) {
updateSingleDataStream(
dataStreamName,
request.getSettings(),
request.isDryRun(),
request.masterNodeTimeout(),
request.ackTimeout(),
new ActionListener<>() {
@Override
public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse dataStreamResponse) {
dataStreamSettingsResponse.add(dataStreamResponse);
countDownListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
dataStreamSettingsResponse.add(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
false,
e.getMessage(),
EMPTY,
EMPTY,
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
)
);
countDownListener.onResponse(null);
}
}
ActionListener.wrap(dataStreamResponse -> {
dataStreamSettingsResponse.add(dataStreamResponse);
countDownListener.onResponse(null);
}, e -> {
dataStreamSettingsResponse.add(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
false,
e.getMessage(),
EMPTY,
EMPTY,
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
)
);
countDownListener.onResponse(null);
})
);
}
}

private void updateSingleDataStream(
String dataStreamName,
Settings settingsOverrides,
boolean dryRun,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> listener
Expand Down Expand Up @@ -198,36 +190,30 @@ private void updateSingleDataStream(
ackTimeout,
dataStreamName,
settingsOverrides,
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
updateSettingsOnIndices(dataStreamName, settingsOverrides, masterNodeTimeout, ackTimeout, listener);
} else {
listener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
false,
"Updating settings not accepted for unknown reasons",
EMPTY,
EMPTY,
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
)
);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
dryRun,
listener.delegateFailure((dataStreamSettingsResponseActionListener, dataStream) -> {
if (dataStream != null) {
updateSettingsOnIndices(dataStream, settingsOverrides, dryRun, masterNodeTimeout, ackTimeout, listener);
} else {
dataStreamSettingsResponseActionListener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
false,
"Updating settings not accepted for unknown reasons",
EMPTY,
EMPTY,
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndicesSettingsResult.EMPTY
)
);
}
}
})
);
}

private void updateSettingsOnIndices(
String dataStreamName,
DataStream dataStream,
Settings requestSettings,
boolean dryRun,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse> listener
Expand All @@ -243,26 +229,15 @@ private void updateSettingsOnIndices(
appliedToDataStreamOnly.add(settingName);
}
}
final List<Index> concreteIndices = clusterService.state()
.projectState(projectResolver.getProjectId())
.metadata()
.dataStreams()
.get(dataStreamName)
.getIndices();
final List<Index> concreteIndices = dataStream.getIndices();
final List<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> indexSettingErrors = new ArrayList<>();

CountDownActionListener indexCountDownListener = new CountDownActionListener(concreteIndices.size() + 1, new ActionListener<>() {
// Called when all indices for all settings are complete
@Override
public void onResponse(Void unused) {
DataStream dataStream = clusterService.state()
.projectState(projectResolver.getProjectId())
.metadata()
.dataStreams()
.get(dataStreamName);
listener.onResponse(
CountDownActionListener indexCountDownListener = new CountDownActionListener(
concreteIndices.size() + 1,
listener.delegateFailure(
(dataStreamSettingsResponseActionListener, unused) -> dataStreamSettingsResponseActionListener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse(
dataStreamName,
dataStream.getName(),
true,
null,
settingsFilter.filter(dataStream.getSettings()),
Expand All @@ -275,37 +250,33 @@ public void onResponse(Void unused) {
indexSettingErrors
)
)
);
}
)
)
);

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
indexCountDownListener.onResponse(null); // handles the case where there were zero indices
Settings applyToIndexSettings = builder().loadFromMap(settingsToApply).build();
for (Index index : concreteIndices) {
updateSettingsOnSingleIndex(index, applyToIndexSettings, masterNodeTimeout, ackTimeout, new ActionListener<>() {
@Override
public void onResponse(UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError indexSettingError) {
updateSettingsOnSingleIndex(
index,
applyToIndexSettings,
dryRun,
masterNodeTimeout,
ackTimeout,
indexCountDownListener.delegateFailure((listener1, indexSettingError) -> {
if (indexSettingError != null) {
indexSettingErrors.add(indexSettingError);
}
indexCountDownListener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
indexCountDownListener.onFailure(e);
}
});
listener1.onResponse(null);
})
);
}
}

private void updateSettingsOnSingleIndex(
Index index,
Settings requestSettings,
boolean dryRun,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError> listener
Expand All @@ -326,19 +297,24 @@ private void updateSettingsOnSingleIndex(
);
return;
}
updateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
projectResolver.getProjectId(),
masterNodeTimeout,
ackTimeout,
requestSettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
index
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
if (dryRun) {
/*
* This is as far as we go with dry run mode. We get the benefit of having checked that all the indices that will be touced
* are not blocked, but there is no value in going beyond this. So just respond to the listener and move on.
*/
listener.onResponse(null);
} else {
updateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
projectResolver.getProjectId(),
masterNodeTimeout,
ackTimeout,
requestSettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
index
),
ActionListener.wrap(response -> {
UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError error;
if (response.isAcknowledged() == false) {
error = new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(
Expand All @@ -349,16 +325,13 @@ public void onResponse(AcknowledgedResponse response) {
error = null;
}
listener.onResponse(error);
}

@Override
public void onFailure(Exception e) {
listener.onResponse(
},
e -> listener.onResponse(
new UpdateDataStreamSettingsAction.DataStreamSettingsResponse.IndexSettingError(index.getName(), e.getMessage())
);
}
}
);
)
)
);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
try (XContentParser parser = request.contentParser()) {
settings = Settings.fromXContent(parser);
}
boolean dryRun = request.paramAsBoolean("dry_run", false);
UpdateDataStreamSettingsAction.Request putDataStreamRequest = new UpdateDataStreamSettingsAction.Request(
settings,
dryRun,
RestUtils.getMasterNodeTimeout(request),
RestUtils.getAckTimeout(request)
).indices(Strings.splitStringByCommaToArray(request.param("name")));
Expand Down
Loading