2020import org .elasticsearch .xpack .core .ml .action .PreviewDatafeedAction ;
2121import org .elasticsearch .xpack .core .ml .datafeed .ChunkingConfig ;
2222import org .elasticsearch .xpack .core .ml .datafeed .DatafeedConfig ;
23+ import org .elasticsearch .xpack .core .ml .datafeed .DatafeedTimingStats ;
2324import org .elasticsearch .xpack .core .ml .datafeed .extractor .DataExtractor ;
2425import org .elasticsearch .xpack .core .security .SecurityContext ;
2526import org .elasticsearch .xpack .ml .datafeed .DatafeedTimingStatsReporter ;
2627import org .elasticsearch .xpack .ml .datafeed .extractor .DataExtractorFactory ;
2728import org .elasticsearch .xpack .ml .datafeed .persistence .DatafeedConfigProvider ;
2829import org .elasticsearch .xpack .ml .job .persistence .JobConfigProvider ;
29- import org .elasticsearch .xpack .ml .job .persistence .JobResultsProvider ;
3030
3131import java .io .BufferedReader ;
3232import java .io .InputStream ;
@@ -44,21 +44,19 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
4444 private final Client client ;
4545 private final JobConfigProvider jobConfigProvider ;
4646 private final DatafeedConfigProvider datafeedConfigProvider ;
47- private final JobResultsProvider jobResultsProvider ;
4847 private final NamedXContentRegistry xContentRegistry ;
4948 private final SecurityContext securityContext ;
5049
5150 @ Inject
5251 public TransportPreviewDatafeedAction (Settings settings , ThreadPool threadPool , TransportService transportService ,
5352 ActionFilters actionFilters , Client client , JobConfigProvider jobConfigProvider ,
54- DatafeedConfigProvider datafeedConfigProvider , JobResultsProvider jobResultsProvider ,
53+ DatafeedConfigProvider datafeedConfigProvider ,
5554 NamedXContentRegistry xContentRegistry ) {
5655 super (PreviewDatafeedAction .NAME , transportService , actionFilters , PreviewDatafeedAction .Request ::new );
5756 this .threadPool = threadPool ;
5857 this .client = client ;
5958 this .jobConfigProvider = jobConfigProvider ;
6059 this .datafeedConfigProvider = datafeedConfigProvider ;
61- this .jobResultsProvider = jobResultsProvider ;
6260 this .xContentRegistry = xContentRegistry ;
6361 this .securityContext = XPackSettings .SECURITY_ENABLED .get (settings ) ?
6462 new SecurityContext (settings , threadPool .getThreadContext ()) : null ;
@@ -74,33 +72,30 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio
7472 DatafeedConfig .Builder previewDatafeed = buildPreviewDatafeed (datafeedConfig );
7573 useSecondaryAuthIfAvailable (securityContext , () -> {
7674 previewDatafeed .setHeaders (filterSecurityHeaders (threadPool .getThreadContext ().getHeaders ()));
77- jobResultsProvider .datafeedTimingStats (
78- jobBuilder .getId (),
79- timingStats -> {
80- // NB: this is using the client from the transport layer, NOT the internal client.
81- // This is important because it means the datafeed search will fail if the user
82- // requesting the preview doesn't have permission to search the relevant indices.
83- DataExtractorFactory .create (
84- client ,
85- previewDatafeed .build (),
86- jobBuilder .build (),
87- xContentRegistry ,
88- // Fake DatafeedTimingStatsReporter that does not have access to results index
89- new DatafeedTimingStatsReporter (timingStats , (ts , refreshPolicy ) -> {}),
90- new ActionListener <DataExtractorFactory >() {
91- @ Override
92- public void onResponse (DataExtractorFactory dataExtractorFactory ) {
93- DataExtractor dataExtractor = dataExtractorFactory .newExtractor (0 , Long .MAX_VALUE );
94- threadPool .generic ().execute (() -> previewDatafeed (dataExtractor , listener ));
95- }
9675
97- @ Override
98- public void onFailure (Exception e ) {
99- listener .onFailure (e );
100- }
101- });
102- },
103- listener ::onFailure );
76+ // NB: this is using the client from the transport layer, NOT the internal client.
77+ // This is important because it means the datafeed search will fail if the user
78+ // requesting the preview doesn't have permission to search the relevant indices.
79+ DataExtractorFactory .create (
80+ client ,
81+ previewDatafeed .build (),
82+ jobBuilder .build (),
83+ xContentRegistry ,
84+ // Fake DatafeedTimingStatsReporter that does not have access to results index
85+ new DatafeedTimingStatsReporter (new DatafeedTimingStats (datafeedConfig .getJobId ()), (ts , refreshPolicy ) -> {
86+ }),
87+ new ActionListener <DataExtractorFactory >() {
88+ @ Override
89+ public void onResponse (DataExtractorFactory dataExtractorFactory ) {
90+ DataExtractor dataExtractor = dataExtractorFactory .newExtractor (0 , Long .MAX_VALUE );
91+ threadPool .generic ().execute (() -> previewDatafeed (dataExtractor , listener ));
92+ }
93+
94+ @ Override
95+ public void onFailure (Exception e ) {
96+ listener .onFailure (e );
97+ }
98+ });
10499 });
105100 },
106101 listener ::onFailure ));
@@ -143,7 +138,7 @@ static void previewDatafeed(DataExtractor dataExtractor, ActionListener<PreviewD
143138 }
144139 responseBuilder .append ("]" );
145140 listener .onResponse (new PreviewDatafeedAction .Response (
146- new BytesArray (responseBuilder .toString ().getBytes (StandardCharsets .UTF_8 ))));
141+ new BytesArray (responseBuilder .toString ().getBytes (StandardCharsets .UTF_8 ))));
147142 } catch (Exception e ) {
148143 listener .onFailure (e );
149144 } finally {
0 commit comments