23
23
import java .util .Collections ;
24
24
import java .util .HashMap ;
25
25
import java .util .HashSet ;
26
- import java .util .Iterator ;
27
26
import java .util .List ;
28
27
import java .util .Map ;
29
28
import java .util .Objects ;
30
- import java .util .Optional ;
31
29
import java .util .Set ;
32
30
import java .util .concurrent .TimeUnit ;
33
31
import java .util .function .BiConsumer ;
34
32
import java .util .function .Consumer ;
33
+ import java .util .stream .Collectors ;
34
+
35
35
import org .elasticsearch .ElasticsearchParseException ;
36
36
import org .elasticsearch .ExceptionsHelper ;
37
37
import org .elasticsearch .ResourceNotFoundException ;
49
49
import org .elasticsearch .cluster .metadata .MetaData ;
50
50
import org .elasticsearch .cluster .node .DiscoveryNode ;
51
51
import org .elasticsearch .cluster .service .ClusterService ;
52
- import org .elasticsearch .common .metrics .CounterMetric ;
53
- import org .elasticsearch .common .metrics .MeanMetric ;
54
52
import org .elasticsearch .common .regex .Regex ;
55
53
import org .elasticsearch .common .unit .TimeValue ;
56
54
import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
@@ -79,8 +77,7 @@ public class IngestService implements ClusterStateApplier {
79
77
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
80
78
private volatile Map <String , Pipeline > pipelines = new HashMap <>();
81
79
private final ThreadPool threadPool ;
82
- private final StatsHolder totalStats = new StatsHolder ();
83
- private volatile Map <String , StatsHolder > statsHolderPerPipeline = Collections .emptyMap ();
80
+ private final IngestMetric totalMetrics = new IngestMetric ();
84
81
85
82
public IngestService (ClusterService clusterService , ThreadPool threadPool ,
86
83
Environment env , ScriptService scriptService , AnalysisRegistry analysisRegistry ,
@@ -257,10 +254,16 @@ Map<String, Pipeline> pipelines() {
257
254
@ Override
258
255
public void applyClusterState (final ClusterChangedEvent event ) {
259
256
ClusterState state = event .state ();
257
+ Map <String , Pipeline > originalPipelines = pipelines ;
260
258
innerUpdatePipelines (event .previousState (), state );
261
- IngestMetadata ingestMetadata = state .getMetaData ().custom (IngestMetadata .TYPE );
262
- if (ingestMetadata != null ) {
263
- updatePipelineStats (ingestMetadata );
259
+ //pipelines changed, so add the old metrics to the new metrics
260
+ if (originalPipelines != pipelines ) {
261
+ pipelines .forEach ((id , pipeline ) -> {
262
+ Pipeline originalPipeline = originalPipelines .get (id );
263
+ if (originalPipeline != null ) {
264
+ pipeline .getMetrics ().add (originalPipeline .getMetrics ());
265
+ }
266
+ });
264
267
}
265
268
}
266
269
@@ -325,6 +328,7 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
325
328
public void executeBulkRequest (Iterable <DocWriteRequest <?>> actionRequests ,
326
329
BiConsumer <IndexRequest , Exception > itemFailureHandler , Consumer <Exception > completionHandler ,
327
330
Consumer <IndexRequest > itemDroppedHandler ) {
331
+
328
332
threadPool .executor (ThreadPool .Names .WRITE ).execute (new AbstractRunnable () {
329
333
330
334
@ Override
@@ -367,37 +371,11 @@ protected void doRun() {
367
371
}
368
372
369
373
public IngestStats stats () {
370
- Map <String , StatsHolder > statsHolderPerPipeline = this .statsHolderPerPipeline ;
371
374
372
- Map <String , IngestStats .Stats > statsPerPipeline = new HashMap <>(statsHolderPerPipeline .size ());
373
- for (Map .Entry <String , StatsHolder > entry : statsHolderPerPipeline .entrySet ()) {
374
- statsPerPipeline .put (entry .getKey (), entry .getValue ().createStats ());
375
- }
375
+ Map <String , IngestStats .Stats > statsPerPipeline =
376
+ pipelines .entrySet ().stream ().collect (Collectors .toMap (Map .Entry ::getKey , v -> v .getValue ().getMetrics ().createStats ()));
376
377
377
- return new IngestStats (totalStats .createStats (), statsPerPipeline );
378
- }
379
-
380
- void updatePipelineStats (IngestMetadata ingestMetadata ) {
381
- boolean changed = false ;
382
- Map <String , StatsHolder > newStatsPerPipeline = new HashMap <>(statsHolderPerPipeline );
383
- Iterator <String > iterator = newStatsPerPipeline .keySet ().iterator ();
384
- while (iterator .hasNext ()) {
385
- String pipeline = iterator .next ();
386
- if (ingestMetadata .getPipelines ().containsKey (pipeline ) == false ) {
387
- iterator .remove ();
388
- changed = true ;
389
- }
390
- }
391
- for (String pipeline : ingestMetadata .getPipelines ().keySet ()) {
392
- if (newStatsPerPipeline .containsKey (pipeline ) == false ) {
393
- newStatsPerPipeline .put (pipeline , new StatsHolder ());
394
- changed = true ;
395
- }
396
- }
397
-
398
- if (changed ) {
399
- statsHolderPerPipeline = Collections .unmodifiableMap (newStatsPerPipeline );
400
- }
378
+ return new IngestStats (totalMetrics .createStats (), statsPerPipeline );
401
379
}
402
380
403
381
private void innerExecute (IndexRequest indexRequest , Pipeline pipeline , Consumer <IndexRequest > itemDroppedHandler ) throws Exception {
@@ -408,10 +386,8 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
408
386
long startTimeInNanos = System .nanoTime ();
409
387
// the pipeline specific stat holder may not exist and that is fine:
410
388
// (e.g. the pipeline may have been removed while we're ingesting a document
411
- Optional <StatsHolder > pipelineStats = Optional .ofNullable (statsHolderPerPipeline .get (pipeline .getId ()));
412
389
try {
413
- totalStats .preIngest ();
414
- pipelineStats .ifPresent (StatsHolder ::preIngest );
390
+ totalMetrics .preIngest ();
415
391
String index = indexRequest .index ();
416
392
String type = indexRequest .type ();
417
393
String id = indexRequest .id ();
@@ -437,13 +413,11 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
437
413
indexRequest .source (ingestDocument .getSourceAndMetadata ());
438
414
}
439
415
} catch (Exception e ) {
440
- totalStats .ingestFailed ();
441
- pipelineStats .ifPresent (StatsHolder ::ingestFailed );
416
+ totalMetrics .ingestFailed ();
442
417
throw e ;
443
418
} finally {
444
419
long ingestTimeInMillis = TimeUnit .NANOSECONDS .toMillis (System .nanoTime () - startTimeInNanos );
445
- totalStats .postIngest (ingestTimeInMillis );
446
- pipelineStats .ifPresent (statsHolder -> statsHolder .postIngest (ingestTimeInMillis ));
420
+ totalMetrics .postIngest (ingestTimeInMillis );
447
421
}
448
422
}
449
423
@@ -480,27 +454,4 @@ private void innerUpdatePipelines(ClusterState previousState, ClusterState state
480
454
ExceptionsHelper .rethrowAndSuppress (exceptions );
481
455
}
482
456
483
- private static class StatsHolder {
484
-
485
- private final MeanMetric ingestMetric = new MeanMetric ();
486
- private final CounterMetric ingestCurrent = new CounterMetric ();
487
- private final CounterMetric ingestFailed = new CounterMetric ();
488
-
489
- void preIngest () {
490
- ingestCurrent .inc ();
491
- }
492
-
493
- void postIngest (long ingestTimeInMillis ) {
494
- ingestCurrent .dec ();
495
- ingestMetric .inc (ingestTimeInMillis );
496
- }
497
-
498
- void ingestFailed () {
499
- ingestFailed .inc ();
500
- }
501
-
502
- IngestStats .Stats createStats () {
503
- return new IngestStats .Stats (ingestMetric .count (), ingestMetric .sum (), ingestCurrent .count (), ingestFailed .count ());
504
- }
505
- }
506
457
}
0 commit comments