Skip to content

Commit 4adcd77

Browse files
committed
ingest: correctly measure chained pipeline stats (#33912)
Prior to this change when a pipeline processor called another pipeline, only the stats for the first processor were recorded. The stats for the subsequent pipelines were ignored. This change properly accounts for pipelines irregardless if they are the first or subsequently called pipelines. This change moves the state of the stats from the IngestService to the pipeline itself. Cluster updates are safe since the pipelines map is atomically swapped, and if a cluster update happens while iterating over stats (now read directly from the pipeline) a slightly stale view of stats may be shown.
1 parent 025251e commit 4adcd77

File tree

5 files changed

+253
-110
lines changed

5 files changed

+253
-110
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest;
21+
22+
import org.elasticsearch.common.metrics.CounterMetric;
23+
import org.elasticsearch.common.metrics.MeanMetric;
24+
25+
/**
26+
* <p>Metrics to measure ingest actions.
27+
* <p>This counts measure documents and timings for a given scope.
28+
* The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline,
29+
* or you can use this class to count documents for a given pipeline or a specific processor.
30+
* This class does not make assumptions about it's given scope.
31+
*/
32+
class IngestMetric {
33+
34+
/**
35+
* The time it takes to complete the measured item.
36+
*/
37+
private final MeanMetric ingestTime = new MeanMetric();
38+
/**
39+
* The current count of things being measure. Should most likely ever be 0 or 1.
40+
* Useful when aggregating multiple metrics to see how many things are in flight.
41+
*/
42+
private final CounterMetric ingestCurrent = new CounterMetric();
43+
/**
44+
* The ever increasing count of things being measured
45+
*/
46+
private final CounterMetric ingestCount = new CounterMetric();
47+
/**
48+
* The only increasing count of failures
49+
*/
50+
private final CounterMetric ingestFailed = new CounterMetric();
51+
52+
/**
53+
* Call this prior to the ingest action.
54+
*/
55+
void preIngest() {
56+
ingestCurrent.inc();
57+
}
58+
59+
/**
60+
* Call this after the performing the ingest action, even if the action failed.
61+
* @param ingestTimeInMillis The time it took to perform the action.
62+
*/
63+
void postIngest(long ingestTimeInMillis) {
64+
ingestCurrent.dec();
65+
ingestTime.inc(ingestTimeInMillis);
66+
ingestCount.inc();
67+
}
68+
69+
/**
70+
* Call this if the ingest action failed.
71+
*/
72+
void ingestFailed() {
73+
ingestFailed.inc();
74+
}
75+
76+
/**
77+
* <p>Add two sets of metrics together.
78+
* <p><strong>Note -</strong> this method does <strong>not</strong> add the current count values.
79+
* The current count value is ephemeral and requires a increase/decrease operation pairs to keep the value correct.
80+
*
81+
* @param metrics The metric to add.
82+
*/
83+
void add(IngestMetric metrics) {
84+
ingestCount.inc(metrics.ingestCount.count());
85+
ingestTime.inc(metrics.ingestTime.sum());
86+
ingestFailed.inc(metrics.ingestFailed.count());
87+
}
88+
89+
/**
90+
* Creates a serializable representation for these metrics.
91+
*/
92+
IngestStats.Stats createStats() {
93+
return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.count(), ingestFailed.count());
94+
}
95+
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 19 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@
2323
import java.util.Collections;
2424
import java.util.HashMap;
2525
import java.util.HashSet;
26-
import java.util.Iterator;
2726
import java.util.List;
2827
import java.util.Map;
2928

3029
import java.util.Objects;
31-
import java.util.Optional;
3230
import java.util.Set;
3331
import java.util.concurrent.TimeUnit;
3432
import java.util.function.BiConsumer;
3533
import java.util.function.Consumer;
34+
import java.util.stream.Collectors;
35+
3636
import org.elasticsearch.ElasticsearchParseException;
3737
import org.elasticsearch.ExceptionsHelper;
3838
import org.elasticsearch.ResourceNotFoundException;
@@ -50,8 +50,6 @@
5050
import org.elasticsearch.cluster.metadata.MetaData;
5151
import org.elasticsearch.cluster.node.DiscoveryNode;
5252
import org.elasticsearch.cluster.service.ClusterService;
53-
import org.elasticsearch.common.metrics.CounterMetric;
54-
import org.elasticsearch.common.metrics.MeanMetric;
5553
import org.elasticsearch.common.regex.Regex;
5654
import org.elasticsearch.common.unit.TimeValue;
5755
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -80,8 +78,7 @@ public class IngestService implements ClusterStateApplier {
8078
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
8179
private volatile Map<String, Pipeline> pipelines = new HashMap<>();
8280
private final ThreadPool threadPool;
83-
private final StatsHolder totalStats = new StatsHolder();
84-
private volatile Map<String, StatsHolder> statsHolderPerPipeline = Collections.emptyMap();
81+
private final IngestMetric totalMetrics = new IngestMetric();
8582

8683
public IngestService(ClusterService clusterService, ThreadPool threadPool,
8784
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
@@ -258,10 +255,16 @@ Map<String, Pipeline> pipelines() {
258255
@Override
259256
public void applyClusterState(final ClusterChangedEvent event) {
260257
ClusterState state = event.state();
258+
Map<String, Pipeline> originalPipelines = pipelines;
261259
innerUpdatePipelines(event.previousState(), state);
262-
IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
263-
if (ingestMetadata != null) {
264-
updatePipelineStats(ingestMetadata);
260+
//pipelines changed, so add the old metrics to the new metrics
261+
if (originalPipelines != pipelines) {
262+
pipelines.forEach((id, pipeline) -> {
263+
Pipeline originalPipeline = originalPipelines.get(id);
264+
if (originalPipeline != null) {
265+
pipeline.getMetrics().add(originalPipeline.getMetrics());
266+
}
267+
});
265268
}
266269
}
267270

@@ -326,6 +329,7 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
326329
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
327330
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler,
328331
Consumer<IndexRequest> itemDroppedHandler) {
332+
329333
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
330334

331335
@Override
@@ -368,37 +372,11 @@ protected void doRun() {
368372
}
369373

370374
public IngestStats stats() {
371-
Map<String, StatsHolder> statsHolderPerPipeline = this.statsHolderPerPipeline;
372375

373-
Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size());
374-
for (Map.Entry<String, StatsHolder> entry : statsHolderPerPipeline.entrySet()) {
375-
statsPerPipeline.put(entry.getKey(), entry.getValue().createStats());
376-
}
376+
Map<String, IngestStats.Stats> statsPerPipeline =
377+
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats()));
377378

378-
return new IngestStats(totalStats.createStats(), statsPerPipeline);
379-
}
380-
381-
void updatePipelineStats(IngestMetadata ingestMetadata) {
382-
boolean changed = false;
383-
Map<String, StatsHolder> newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline);
384-
Iterator<String> iterator = newStatsPerPipeline.keySet().iterator();
385-
while (iterator.hasNext()) {
386-
String pipeline = iterator.next();
387-
if (ingestMetadata.getPipelines().containsKey(pipeline) == false) {
388-
iterator.remove();
389-
changed = true;
390-
}
391-
}
392-
for (String pipeline : ingestMetadata.getPipelines().keySet()) {
393-
if (newStatsPerPipeline.containsKey(pipeline) == false) {
394-
newStatsPerPipeline.put(pipeline, new StatsHolder());
395-
changed = true;
396-
}
397-
}
398-
399-
if (changed) {
400-
statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline);
401-
}
379+
return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
402380
}
403381

404382
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {
@@ -409,10 +387,8 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
409387
long startTimeInNanos = System.nanoTime();
410388
// the pipeline specific stat holder may not exist and that is fine:
411389
// (e.g. the pipeline may have been removed while we're ingesting a document
412-
Optional<StatsHolder> pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId()));
413390
try {
414-
totalStats.preIngest();
415-
pipelineStats.ifPresent(StatsHolder::preIngest);
391+
totalMetrics.preIngest();
416392
String index = indexRequest.index();
417393
String type = indexRequest.type();
418394
String id = indexRequest.id();
@@ -438,13 +414,11 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
438414
indexRequest.source(ingestDocument.getSourceAndMetadata());
439415
}
440416
} catch (Exception e) {
441-
totalStats.ingestFailed();
442-
pipelineStats.ifPresent(StatsHolder::ingestFailed);
417+
totalMetrics.ingestFailed();
443418
throw e;
444419
} finally {
445420
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
446-
totalStats.postIngest(ingestTimeInMillis);
447-
pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis));
421+
totalMetrics.postIngest(ingestTimeInMillis);
448422
}
449423
}
450424

@@ -481,27 +455,4 @@ private void innerUpdatePipelines(ClusterState previousState, ClusterState state
481455
ExceptionsHelper.rethrowAndSuppress(exceptions);
482456
}
483457

484-
private static class StatsHolder {
485-
486-
private final MeanMetric ingestMetric = new MeanMetric();
487-
private final CounterMetric ingestCurrent = new CounterMetric();
488-
private final CounterMetric ingestFailed = new CounterMetric();
489-
490-
void preIngest() {
491-
ingestCurrent.inc();
492-
}
493-
494-
void postIngest(long ingestTimeInMillis) {
495-
ingestCurrent.dec();
496-
ingestMetric.inc(ingestTimeInMillis);
497-
}
498-
499-
void ingestFailed() {
500-
ingestFailed.inc();
501-
}
502-
503-
IngestStats.Stats createStats() {
504-
return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count());
505-
}
506-
}
507458
}

server/src/main/java/org/elasticsearch/ingest/Pipeline.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import org.elasticsearch.ElasticsearchParseException;
2323
import org.elasticsearch.common.Nullable;
2424

25+
import java.time.Clock;
2526
import java.util.Arrays;
2627
import java.util.Collections;
2728
import java.util.List;
2829
import java.util.Map;
30+
2931
import org.elasticsearch.script.ScriptService;
3032

3133
/**
@@ -44,12 +46,21 @@ public final class Pipeline {
4446
@Nullable
4547
private final Integer version;
4648
private final CompoundProcessor compoundProcessor;
49+
private final IngestMetric metrics;
50+
private final Clock clock;
4751

4852
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
53+
this(id, description, version, compoundProcessor, Clock.systemUTC());
54+
}
55+
56+
//package private for testing
57+
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) {
4958
this.id = id;
5059
this.description = description;
5160
this.compoundProcessor = compoundProcessor;
5261
this.version = version;
62+
this.metrics = new IngestMetric();
63+
this.clock = clock;
5364
}
5465

5566
public static Pipeline create(String id, Map<String, Object> config,
@@ -78,7 +89,17 @@ public static Pipeline create(String id, Map<String, Object> config,
7889
* Modifies the data of a document to be indexed based on the processor this pipeline holds
7990
*/
8091
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
81-
return compoundProcessor.execute(ingestDocument);
92+
long startTimeInMillis = clock.millis();
93+
try {
94+
metrics.preIngest();
95+
return compoundProcessor.execute(ingestDocument);
96+
} catch (Exception e) {
97+
metrics.ingestFailed();
98+
throw e;
99+
} finally {
100+
long ingestTimeInMillis = clock.millis() - startTimeInMillis;
101+
metrics.postIngest(ingestTimeInMillis);
102+
}
82103
}
83104

84105
/**
@@ -135,4 +156,11 @@ public List<Processor> getOnFailureProcessors() {
135156
public List<Processor> flattenAllProcessors() {
136157
return compoundProcessor.flattenProcessors();
137158
}
159+
160+
/**
161+
* The metrics associated with this pipeline.
162+
*/
163+
public IngestMetric getMetrics() {
164+
return metrics;
165+
}
138166
}

0 commit comments

Comments
 (0)