Skip to content

Commit 4fd9b1d

Browse files
authored
Lower contention on requests with many aggs (backport of #66895) (#66941)
This lowers the contention on the `REQUEST` circuit breaker when building many aggregations on many threads by preallocating a chunk of breaker up front. This cuts down on the number of times we enter the busy loop in `ChildMemoryCircuitBreaker.limit`. Now we hit it one time when building aggregations. We still hit the busy loop if we collect many buckets. We let the `AggregationBuilder` pick size of the "chunk" that we preallocate but it doesn't have much to go on - not even the field types. But it is available in a convenient spot and the estimates don't have to be particularly accurate. The benchmarks on my 12 core desktop are interesting: ``` Benchmark (breaker) Mode Cnt Score Error Units sum noop avgt 10 1.672 ± 0.042 us/op sum real avgt 10 4.100 ± 0.027 us/op sum preallocate avgt 10 4.230 ± 0.034 us/op termsSixtySums noop avgt 10 92.658 ± 0.939 us/op termsSixtySums real avgt 10 278.764 ± 39.751 us/op termsSixtySums preallocate avgt 10 120.896 ± 16.097 us/op termsSum noop avgt 10 4.573 ± 0.095 us/op termsSum real avgt 10 9.932 ± 0.211 us/op termsSum preallocate avgt 10 7.695 ± 0.313 us/op ``` They show pretty clearly that not using the circuit breaker at all is faster. But we can't do that because we don't want to bring the node down on bad aggs. When there are many aggs (termsSixtySums) the preallocation claws back much of the performance. It even helps marginally when there are two aggs (termsSum). For a single agg (sum) we see a 130 nanosecond hit. Fine. But these values are all pretty small. At best we're seeing a 160 microsecond savings. Not so on a 160 vCPU machine: ``` Benchmark (breaker) Mode Cnt Score Error Units sum noop avgt 10 44.956 ± 8.851 us/op sum real avgt 10 118.008 ± 19.505 us/op sum preallocate avgt 10 241.234 ± 305.998 us/op termsSixtySums noop avgt 10 1339.802 ± 51.410 us/op termsSixtySums real avgt 10 12077.671 ± 12110.993 us/op termsSixtySums preallocate avgt 10 3804.515 ± 1458.702 us/op termsSum noop avgt 10 59.478 ± 2.261 us/op termsSum real avgt 10 293.756 ± 253.854 us/op termsSum preallocate avgt 10 197.963 ± 41.578 us/op ``` All of these numbers are larger because we're running all the CPUs flat out and we're seeing more contention everywhere. Even the "noop" breaker sees some contention, but I think it is mostly around memory allocation. Anyway, with many many (termsSixtySums) aggs we're looking at 8 milliseconds of savings by preallocating. Just by dodging the busy loop as much as possible. The error in the measurements there are substantial. Here are the runs: ``` real: Iteration 1: 8679.417 ±(99.9%) 273.220 us/op Iteration 2: 5849.538 ±(99.9%) 179.258 us/op Iteration 3: 5953.935 ±(99.9%) 152.829 us/op Iteration 4: 5763.465 ±(99.9%) 150.759 us/op Iteration 5: 14157.592 ±(99.9%) 395.224 us/op Iteration 1: 24857.020 ±(99.9%) 1133.847 us/op Iteration 2: 24730.903 ±(99.9%) 1107.718 us/op Iteration 3: 18894.383 ±(99.9%) 738.706 us/op Iteration 4: 5493.965 ±(99.9%) 120.529 us/op Iteration 5: 6396.493 ±(99.9%) 143.630 us/op preallocate: Iteration 1: 5512.590 ±(99.9%) 110.222 us/op Iteration 2: 3087.771 ±(99.9%) 120.084 us/op Iteration 3: 3544.282 ±(99.9%) 110.373 us/op Iteration 4: 3477.228 ±(99.9%) 107.270 us/op Iteration 5: 4351.820 ±(99.9%) 82.946 us/op Iteration 1: 3185.250 ±(99.9%) 154.102 us/op Iteration 2: 3058.000 ±(99.9%) 143.758 us/op Iteration 3: 3199.920 ±(99.9%) 61.589 us/op Iteration 4: 3163.735 ±(99.9%) 71.291 us/op Iteration 5: 5464.556 ±(99.9%) 59.034 us/op ``` That variability from 5.5ms to 25ms is terrible. It makes me not particularly trust the 8ms savings from the report. But still, the preallocating method has much less variability between runs and almost all the runs are faster than all of the non-preallocated runs. Maybe the savings is more like 2 or 3 milliseconds, but still. Or maybe we should think of hte savings as worst vs worst? If so its 19 milliseconds. Anyway, its hard to measure how much this helps. But, certainly some. Closes #58647
1 parent 081600d commit 4fd9b1d

File tree

16 files changed

+905
-61
lines changed

16 files changed

+905
-61
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.benchmark.search.aggregations;
21+
22+
import org.apache.lucene.analysis.Analyzer;
23+
import org.apache.lucene.search.IndexSearcher;
24+
import org.apache.lucene.search.MatchAllDocsQuery;
25+
import org.apache.lucene.search.Query;
26+
import org.elasticsearch.Version;
27+
import org.elasticsearch.common.breaker.CircuitBreaker;
28+
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
29+
import org.elasticsearch.common.lease.Releasable;
30+
import org.elasticsearch.common.lease.Releasables;
31+
import org.elasticsearch.common.settings.ClusterSettings;
32+
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.common.util.BigArrays;
34+
import org.elasticsearch.common.util.PageCacheRecycler;
35+
import org.elasticsearch.index.Index;
36+
import org.elasticsearch.index.IndexSettings;
37+
import org.elasticsearch.index.analysis.NamedAnalyzer;
38+
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
39+
import org.elasticsearch.index.fielddata.IndexFieldData;
40+
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
41+
import org.elasticsearch.index.mapper.MappedFieldType;
42+
import org.elasticsearch.index.mapper.NumberFieldMapper;
43+
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
44+
import org.elasticsearch.index.mapper.ObjectMapper;
45+
import org.elasticsearch.index.query.QueryBuilder;
46+
import org.elasticsearch.index.query.support.NestedScope;
47+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
48+
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
49+
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
50+
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
51+
import org.elasticsearch.script.Script;
52+
import org.elasticsearch.script.ScriptContext;
53+
import org.elasticsearch.search.SearchModule;
54+
import org.elasticsearch.search.aggregations.Aggregator;
55+
import org.elasticsearch.search.aggregations.AggregatorFactories;
56+
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
57+
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
58+
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
59+
import org.elasticsearch.search.aggregations.support.AggregationContext;
60+
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
61+
import org.elasticsearch.search.internal.SubSearchContext;
62+
import org.elasticsearch.search.lookup.SearchLookup;
63+
import org.elasticsearch.search.sort.BucketedSort;
64+
import org.elasticsearch.search.sort.BucketedSort.ExtraData;
65+
import org.elasticsearch.search.sort.SortAndFormats;
66+
import org.elasticsearch.search.sort.SortBuilder;
67+
import org.openjdk.jmh.annotations.Benchmark;
68+
import org.openjdk.jmh.annotations.BenchmarkMode;
69+
import org.openjdk.jmh.annotations.Fork;
70+
import org.openjdk.jmh.annotations.Measurement;
71+
import org.openjdk.jmh.annotations.Mode;
72+
import org.openjdk.jmh.annotations.OutputTimeUnit;
73+
import org.openjdk.jmh.annotations.Param;
74+
import org.openjdk.jmh.annotations.Scope;
75+
import org.openjdk.jmh.annotations.Setup;
76+
import org.openjdk.jmh.annotations.State;
77+
import org.openjdk.jmh.annotations.Threads;
78+
import org.openjdk.jmh.annotations.Warmup;
79+
80+
import java.io.IOException;
81+
import java.util.ArrayList;
82+
import java.util.List;
83+
import java.util.Optional;
84+
import java.util.concurrent.TimeUnit;
85+
import java.util.function.Function;
86+
87+
/**
88+
* Benchmarks the overhead of constructing {@link Aggregator}s in many
89+
* parallel threads. Machines with different numbers of cores will see
90+
* wildly different results running this from running this with more
91+
* cores seeing more benefits from preallocation.
92+
*/
93+
@Fork(2)
94+
@Warmup(iterations = 10)
95+
@Measurement(iterations = 5)
96+
@BenchmarkMode(Mode.AverageTime)
97+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
98+
@State(Scope.Benchmark)
99+
@Threads(Threads.MAX)
100+
public class AggConstructionContentionBenchmark {
101+
private final SearchModule searchModule = new SearchModule(Settings.EMPTY, false, org.elasticsearch.common.collect.List.of());
102+
private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
103+
private final PageCacheRecycler recycler = new PageCacheRecycler(Settings.EMPTY);
104+
private final Index index = new Index("test", "uuid");
105+
private final IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(
106+
Settings.EMPTY,
107+
new IndexFieldDataCache.Listener() {
108+
}
109+
);
110+
111+
private CircuitBreakerService breakerService;
112+
private BigArrays bigArrays;
113+
private boolean preallocateBreaker;
114+
115+
@Param({ "noop", "real", "preallocate" })
116+
private String breaker;
117+
118+
@Setup
119+
public void setup() {
120+
switch (breaker) {
121+
case "real":
122+
breakerService = new HierarchyCircuitBreakerService(
123+
Settings.EMPTY,
124+
org.elasticsearch.common.collect.List.of(),
125+
clusterSettings
126+
);
127+
break;
128+
case "preallocate":
129+
preallocateBreaker = true;
130+
breakerService = new HierarchyCircuitBreakerService(
131+
Settings.EMPTY,
132+
org.elasticsearch.common.collect.List.of(),
133+
clusterSettings
134+
);
135+
break;
136+
case "noop":
137+
breakerService = new NoneCircuitBreakerService();
138+
break;
139+
default:
140+
throw new UnsupportedOperationException();
141+
}
142+
bigArrays = new BigArrays(recycler, breakerService, "request");
143+
}
144+
145+
@Benchmark
146+
public void sum() throws IOException {
147+
buildFactories(new AggregatorFactories.Builder().addAggregator(new SumAggregationBuilder("s").field("int_1")));
148+
}
149+
150+
@Benchmark
151+
public void termsSum() throws IOException {
152+
buildFactories(
153+
new AggregatorFactories.Builder().addAggregator(
154+
new TermsAggregationBuilder("t").field("int_1").subAggregation(new SumAggregationBuilder("s").field("int_2"))
155+
)
156+
);
157+
}
158+
159+
@Benchmark
160+
public void termsSixtySums() throws IOException {
161+
TermsAggregationBuilder b = new TermsAggregationBuilder("t").field("int_1");
162+
for (int i = 0; i < 60; i++) {
163+
b.subAggregation(new SumAggregationBuilder("s" + i).field("int_" + i));
164+
}
165+
buildFactories(new AggregatorFactories.Builder().addAggregator(b));
166+
}
167+
168+
private void buildFactories(AggregatorFactories.Builder factories) throws IOException {
169+
try (DummyAggregationContext context = new DummyAggregationContext(factories.bytesToPreallocate())) {
170+
factories.build(context, null).createTopLevelAggregators();
171+
}
172+
}
173+
174+
private class DummyAggregationContext extends AggregationContext {
175+
private final Query query = new MatchAllDocsQuery();
176+
private final List<Releasable> releaseMe = new ArrayList<>();
177+
178+
private final CircuitBreaker breaker;
179+
private final PreallocatedCircuitBreakerService preallocated;
180+
private final MultiBucketConsumer multiBucketConsumer;
181+
182+
DummyAggregationContext(long bytesToPreallocate) {
183+
CircuitBreakerService breakerService;
184+
if (preallocateBreaker) {
185+
breakerService = preallocated = new PreallocatedCircuitBreakerService(
186+
AggConstructionContentionBenchmark.this.breakerService,
187+
CircuitBreaker.REQUEST,
188+
bytesToPreallocate,
189+
"aggregations"
190+
);
191+
} else {
192+
breakerService = AggConstructionContentionBenchmark.this.breakerService;
193+
preallocated = null;
194+
}
195+
breaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
196+
multiBucketConsumer = new MultiBucketConsumer(Integer.MAX_VALUE, breaker);
197+
}
198+
199+
@Override
200+
public Query query() {
201+
return query;
202+
}
203+
204+
@Override
205+
public Aggregator profileIfEnabled(Aggregator agg) throws IOException {
206+
return agg;
207+
}
208+
209+
@Override
210+
public boolean profiling() {
211+
return false;
212+
}
213+
214+
@Override
215+
public long nowInMillis() {
216+
return 0;
217+
}
218+
219+
@Override
220+
protected IndexFieldData<?> buildFieldData(MappedFieldType ft) {
221+
IndexFieldDataCache indexFieldDataCache = indicesFieldDataCache.buildIndexFieldDataCache(new IndexFieldDataCache.Listener() {
222+
}, index, ft.name());
223+
return ft.fielddataBuilder("test", this::lookup).build(indexFieldDataCache, breakerService);
224+
}
225+
226+
@Override
227+
public MappedFieldType getFieldType(String path) {
228+
if (path.startsWith("int")) {
229+
return new NumberFieldMapper.NumberFieldType(path, NumberType.INTEGER);
230+
}
231+
throw new UnsupportedOperationException();
232+
}
233+
234+
@Override
235+
public boolean isFieldMapped(String field) {
236+
return field.startsWith("int");
237+
}
238+
239+
@Override
240+
public <FactoryType> FactoryType compile(Script script, ScriptContext<FactoryType> context) {
241+
throw new UnsupportedOperationException();
242+
}
243+
244+
@Override
245+
public SearchLookup lookup() {
246+
throw new UnsupportedOperationException();
247+
}
248+
249+
@Override
250+
public ValuesSourceRegistry getValuesSourceRegistry() {
251+
return searchModule.getValuesSourceRegistry();
252+
}
253+
254+
@Override
255+
public BigArrays bigArrays() {
256+
return bigArrays;
257+
}
258+
259+
@Override
260+
public IndexSearcher searcher() {
261+
return null;
262+
}
263+
264+
@Override
265+
public Query buildQuery(QueryBuilder builder) throws IOException {
266+
throw new UnsupportedOperationException();
267+
}
268+
269+
@Override
270+
public IndexSettings getIndexSettings() {
271+
throw new UnsupportedOperationException();
272+
}
273+
274+
@Override
275+
public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sortBuilders) throws IOException {
276+
throw new UnsupportedOperationException();
277+
}
278+
279+
@Override
280+
public ObjectMapper getObjectMapper(String path) {
281+
throw new UnsupportedOperationException();
282+
}
283+
284+
@Override
285+
public NestedScope nestedScope() {
286+
throw new UnsupportedOperationException();
287+
}
288+
289+
@Override
290+
public SubSearchContext subSearchContext() {
291+
throw new UnsupportedOperationException();
292+
}
293+
294+
@Override
295+
public void addReleasable(Aggregator aggregator) {
296+
releaseMe.add(aggregator);
297+
}
298+
299+
@Override
300+
public MultiBucketConsumer multiBucketConsumer() {
301+
return multiBucketConsumer;
302+
}
303+
304+
@Override
305+
public BitsetFilterCache bitsetFilterCache() {
306+
throw new UnsupportedOperationException();
307+
}
308+
309+
@Override
310+
public BucketedSort buildBucketedSort(SortBuilder<?> sort, int size, ExtraData values) throws IOException {
311+
throw new UnsupportedOperationException();
312+
}
313+
314+
@Override
315+
public int shardRandomSeed() {
316+
return 0;
317+
}
318+
319+
@Override
320+
public long getRelativeTimeInMillis() {
321+
return 0;
322+
}
323+
324+
@Override
325+
public boolean isCancelled() {
326+
return false;
327+
}
328+
329+
@Override
330+
public CircuitBreaker breaker() {
331+
return breaker;
332+
}
333+
334+
@Override
335+
public Analyzer getIndexAnalyzer(Function<String, NamedAnalyzer> unindexedFieldAnalyzer) {
336+
throw new UnsupportedOperationException();
337+
}
338+
339+
@Override
340+
public boolean isCacheable() {
341+
throw new UnsupportedOperationException();
342+
}
343+
344+
@Override
345+
public Version indexVersionCreated() {
346+
return Version.CURRENT;
347+
}
348+
349+
@Override
350+
public void close() {
351+
List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);
352+
releaseMe.add(preallocated);
353+
Releasables.close(releaseMe);
354+
}
355+
}
356+
}

server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,11 @@ public BucketCardinality bucketCardinality() {
536536
public String getType() {
537537
return "test";
538538
}
539+
540+
@Override
541+
public long bytesToPreallocate() {
542+
return 0;
543+
}
539544
}
540545

541546
/**
@@ -570,13 +575,13 @@ public Aggregator subAggregator(String name) {
570575
@Override
571576
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
572577
return new InternalAggregation[] {
573-
new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap())
578+
buildEmptyAggregation()
574579
};
575580
}
576581

577582
@Override
578583
public InternalAggregation buildEmptyAggregation() {
579-
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap());
584+
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, null);
580585
}
581586

582587
@Override

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
624624
.addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get());
625625
logger.info("--> got an expected exception", e);
626626
assertThat(e.getCause(), notNullValue());
627-
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [<agg [foo_terms]>]"));
627+
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [preallocate[aggregations]]"));
628628

629629
client().admin().cluster().prepareUpdateSettings()
630630
.setTransientSettings(Settings.builder()

0 commit comments

Comments
 (0)