Skip to content

[8.0] [ML] Fix datafeed preview with remote indices (#81099) #81103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void previewDatafeed(DatafeedConfig datafeedConfig, Job job, ActionListe
new DatafeedTimingStatsReporter(new DatafeedTimingStats(datafeedConfig.getJobId()), (ts, refreshPolicy) -> {}),
listener.delegateFailure((l, dataExtractorFactory) -> {
isDateNanos(
previewDatafeedConfig.getHeaders(),
previewDatafeedConfig,
job.getDataDescription().getTimeField(),
listener.delegateFailure((l2, isDateNanos) -> {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(
Expand Down Expand Up @@ -151,13 +151,16 @@ static DatafeedConfig.Builder buildPreviewDatafeed(DatafeedConfig datafeed) {
return previewDatafeed;
}

private void isDateNanos(Map<String, String> headers, String timeField, ActionListener<Boolean> listener) {
private void isDateNanos(DatafeedConfig datafeed, String timeField, ActionListener<Boolean> listener) {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[0])).indicesOptions(datafeed.getIndicesOptions());
fieldCapabilitiesRequest.fields(timeField);
executeWithHeadersAsync(
headers,
datafeed.getHeaders(),
ML_ORIGIN,
client,
FieldCapabilitiesAction.INSTANCE,
new FieldCapabilitiesRequest().fields(timeField),
fieldCapabilitiesRequest,
ActionListener.wrap(fieldCapsResponse -> {
Map<String, FieldCapabilities> timeFieldCaps = fieldCapsResponse.getField(timeField);
listener.onResponse(timeFieldCaps.keySet().contains(DateFieldMapper.DATE_NANOS_CONTENT_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -51,21 +50,15 @@ public void setUpTests() {
dataExtractor = mock(DataExtractor.class);
actionListener = mock(ActionListener.class);

doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) {
PreviewDatafeedAction.Response response = (PreviewDatafeedAction.Response) invocationOnMock.getArguments()[0];
capturedResponse = response.toString();
return null;
}
doAnswer((Answer<Void>) invocationOnMock -> {
PreviewDatafeedAction.Response response = (PreviewDatafeedAction.Response) invocationOnMock.getArguments()[0];
capturedResponse = response.toString();
return null;
}).when(actionListener).onResponse(any());

doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) {
capturedFailure = (Exception) invocationOnMock.getArguments()[0];
return null;
}
doAnswer((Answer<Void>) invocationOnMock -> {
capturedFailure = (Exception) invocationOnMock.getArguments()[0];
return null;
}).when(actionListener).onFailure(any());
}

Expand Down Expand Up @@ -95,7 +88,7 @@ public void testBuildPreviewDatafeed_GivenAggregations() {
assertThat(previewDatafeed.getChunkingConfig(), equalTo(datafeed.build().getChunkingConfig()));
}

public void testPreviewDatafed_GivenEmptyStream() throws IOException {
public void testPreviewDatafeed_GivenEmptyStream() throws IOException {
when(dataExtractor.next()).thenReturn(Optional.empty());

TransportPreviewDatafeedAction.previewDatafeed(dataExtractor, actionListener);
Expand All @@ -105,7 +98,7 @@ public void testPreviewDatafed_GivenEmptyStream() throws IOException {
verify(dataExtractor).cancel();
}

public void testPreviewDatafed_GivenNonEmptyStream() throws IOException {
public void testPreviewDatafeed_GivenNonEmptyStream() throws IOException {
String streamAsString = "{\"a\":1, \"b\":2} {\"c\":3, \"d\":4}\n{\"e\":5, \"f\":6}";
InputStream stream = new ByteArrayInputStream(streamAsString.getBytes(StandardCharsets.UTF_8));
when(dataExtractor.next()).thenReturn(Optional.of(stream));
Expand All @@ -117,7 +110,7 @@ public void testPreviewDatafed_GivenNonEmptyStream() throws IOException {
verify(dataExtractor).cancel();
}

public void testPreviewDatafed_GivenFailure() throws IOException {
public void testPreviewDatafeed_GivenFailure() throws IOException {
doThrow(new RuntimeException("failed")).when(dataExtractor).next();

TransportPreviewDatafeedAction.previewDatafeed(dataExtractor, actionListener);
Expand Down