@@ -43,12 +43,22 @@ class PipelineWithMetrics extends Pipeline {
43
43
Integer version ,
44
44
List <SearchRequestProcessor > requestProcessors ,
45
45
List <SearchResponseProcessor > responseProcessors ,
46
+ List <SearchPhaseResultsProcessor > phaseResultsProcessors ,
46
47
NamedWriteableRegistry namedWriteableRegistry ,
47
48
OperationMetrics totalRequestMetrics ,
48
49
OperationMetrics totalResponseMetrics ,
49
50
LongSupplier relativeTimeSupplier
50
51
) {
51
- super (id , description , version , requestProcessors , responseProcessors , namedWriteableRegistry , relativeTimeSupplier );
52
+ super (
53
+ id ,
54
+ description ,
55
+ version ,
56
+ requestProcessors ,
57
+ responseProcessors ,
58
+ phaseResultsProcessors ,
59
+ namedWriteableRegistry ,
60
+ relativeTimeSupplier
61
+ );
52
62
this .totalRequestMetrics = totalRequestMetrics ;
53
63
this .totalResponseMetrics = totalResponseMetrics ;
54
64
for (Processor requestProcessor : getSearchRequestProcessors ()) {
@@ -64,6 +74,7 @@ static PipelineWithMetrics create(
64
74
Map <String , Object > config ,
65
75
Map <String , Processor .Factory <SearchRequestProcessor >> requestProcessorFactories ,
66
76
Map <String , Processor .Factory <SearchResponseProcessor >> responseProcessorFactories ,
77
+ Map <String , Processor .Factory <SearchPhaseResultsProcessor >> phaseResultsProcessorFactories ,
67
78
NamedWriteableRegistry namedWriteableRegistry ,
68
79
OperationMetrics totalRequestProcessingMetrics ,
69
80
OperationMetrics totalResponseProcessingMetrics
@@ -79,6 +90,16 @@ static PipelineWithMetrics create(
79
90
RESPONSE_PROCESSORS_KEY
80
91
);
81
92
List <SearchResponseProcessor > responseProcessors = readProcessors (responseProcessorFactories , responseProcessorConfigs );
93
+ List <Map <String , Object >> phaseResultsProcessorConfigs = ConfigurationUtils .readOptionalList (
94
+ null ,
95
+ null ,
96
+ config ,
97
+ PHASE_PROCESSORS_KEY
98
+ );
99
+ List <SearchPhaseResultsProcessor > phaseResultsProcessors = readProcessors (
100
+ phaseResultsProcessorFactories ,
101
+ phaseResultsProcessorConfigs
102
+ );
82
103
if (config .isEmpty () == false ) {
83
104
throw new OpenSearchParseException (
84
105
"pipeline ["
@@ -93,6 +114,7 @@ static PipelineWithMetrics create(
93
114
version ,
94
115
requestProcessors ,
95
116
responseProcessors ,
117
+ phaseResultsProcessors ,
96
118
namedWriteableRegistry ,
97
119
totalRequestProcessingMetrics ,
98
120
totalResponseProcessingMetrics ,
0 commit comments