Skip to content

Commit 468cf11

Browse files
bucket aggregation circuit breaker optimization
1 parent de17140 commit 468cf11

File tree

11 files changed

+124
-54
lines changed

11 files changed

+124
-54
lines changed

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ public void apply(Settings value, Settings current, Settings previous) {
375375
SearchService.KEEPALIVE_INTERVAL_SETTING,
376376
SearchService.MAX_KEEPALIVE_SETTING,
377377
MultiBucketConsumerService.MAX_BUCKET_SETTING,
378+
MultiBucketConsumerService.CHECK_BUCKETS_STEP_SIZE_SETTING,
378379
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
379380
SearchService.MAX_OPEN_SCROLL_CONTEXT,
380381
Node.WRITE_PORTS_FILE_SETTING,

server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ long currentMemoryUsage() {
302302
}
303303
}
304304

305+
public long getParentLimit() {
306+
return this.parentSettings.getLimit();
307+
}
308+
305309
/**
306310
* Checks whether the parent breaker has been tripped
307311
*/

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ protected Node(
514514

515515
final SearchService searchService = newSearchService(clusterService, indicesService,
516516
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
517-
responseCollectorService);
517+
responseCollectorService, circuitBreakerService);
518518

519519
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
520520
.filterPlugins(PersistentTaskPlugin.class).stream()
@@ -991,9 +991,10 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) {
991991
*/
992992
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
993993
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
994-
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
994+
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
995+
CircuitBreakerService circuitBreakerService) {
995996
return new SearchService(clusterService, indicesService, threadPool,
996-
scriptService, bigArrays, fetchPhase, responseCollectorService);
997+
scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService);
997998
}
998999

9991000
/**

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.index.shard.IndexShard;
5959
import org.elasticsearch.index.shard.SearchOperationListener;
6060
import org.elasticsearch.indices.IndicesService;
61+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
6162
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
6263
import org.elasticsearch.node.ResponseCollectorService;
6364
import org.elasticsearch.script.FieldScript;
@@ -194,7 +195,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
194195

195196
public SearchService(ClusterService clusterService, IndicesService indicesService,
196197
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
197-
ResponseCollectorService responseCollectorService) {
198+
ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService) {
198199
Settings settings = clusterService.getSettings();
199200
this.threadPool = threadPool;
200201
this.clusterService = clusterService;
@@ -204,7 +205,7 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
204205
this.bigArrays = bigArrays;
205206
this.queryPhase = new QueryPhase();
206207
this.fetchPhase = fetchPhase;
207-
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings);
208+
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreakerService);
208209

209210
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
210211
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));

server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.elasticsearch.common.settings.Setting;
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.common.xcontent.XContentBuilder;
27+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
28+
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
2729
import org.elasticsearch.rest.RestStatus;
2830
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
2931

@@ -38,14 +40,27 @@
3840
*/
3941
public class MultiBucketConsumerService {
4042
public static final int DEFAULT_MAX_BUCKETS = 10000;
43+
public static final int DEFAULT_CHECK_BUCKETS_STEP_SIZE = 10000;
4144
public static final Setting<Integer> MAX_BUCKET_SETTING =
4245
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);
4346

44-
private volatile int maxBucket;
47+
public static final Setting<Integer> CHECK_BUCKETS_STEP_SIZE_SETTING =
48+
Setting.intSetting("search.check_buckets_step_size", DEFAULT_CHECK_BUCKETS_STEP_SIZE,
49+
-1, Setting.Property.NodeScope, Setting.Property.Dynamic);
50+
51+
private final CircuitBreakerService circuitBreakerService;
4552

46-
public MultiBucketConsumerService(ClusterService clusterService, Settings settings) {
47-
this.maxBucket = MAX_BUCKET_SETTING.get(settings);
48-
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
53+
private volatile int maxBucket;
54+
private volatile int checkBucketsStepSize;
55+
56+
public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreakerService circuitBreakerService) {
57+
this.circuitBreakerService = circuitBreakerService;
58+
this.maxBucket = MAX_BUCKET_SETTING.get(settings);
59+
this.checkBucketsStepSize = CHECK_BUCKETS_STEP_SIZE_SETTING.get(settings);
60+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
61+
clusterService.getClusterSettings().addSettingsUpdateConsumer(CHECK_BUCKETS_STEP_SIZE_SETTING, value -> {
62+
checkBucketsStepSize = value;
63+
});
4964
}
5065

5166
private void setMaxBucket(int maxBucket) {
@@ -94,11 +109,16 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws
94109
*/
95110
public static class MultiBucketConsumer implements IntConsumer {
96111
private final int limit;
112+
private final int checkBucketsStepSizeLimit;
113+
private final CircuitBreakerService circuitBreakerService;
114+
97115
// aggregations execute in a single thread so no atomic here
98116
private int count;
99117

100-
public MultiBucketConsumer(int limit) {
118+
public MultiBucketConsumer(int limit, int checkBucketsStepSizeLimit, CircuitBreakerService circuitBreakerService) {
101119
this.limit = limit;
120+
this.checkBucketsStepSizeLimit = checkBucketsStepSizeLimit;
121+
this.circuitBreakerService = circuitBreakerService;
102122
}
103123

104124
@Override
@@ -109,6 +129,11 @@ public void accept(int value) {
109129
+ "] but was [" + count + "]. This limit can be set by changing the [" +
110130
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
111131
}
132+
133+
if (value > 0 && this.circuitBreakerService instanceof HierarchyCircuitBreakerService
134+
&& checkBucketsStepSizeLimit > 0 && count % checkBucketsStepSizeLimit == 0) {
135+
((HierarchyCircuitBreakerService) this.circuitBreakerService).checkParentLimit(0, "check_allocation_buckets");
136+
}
112137
}
113138

114139
public void reset() {
@@ -125,6 +150,6 @@ public int getLimit() {
125150
}
126151

127152
public MultiBucketConsumer create() {
128-
return new MultiBucketConsumer(maxBucket);
153+
return new MultiBucketConsumer(maxBucket, checkBucketsStepSize, this.circuitBreakerService);
129154
}
130155
}

server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.unit.ByteSizeUnit;
2929
import org.elasticsearch.common.unit.ByteSizeValue;
30+
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
3031
import org.elasticsearch.test.ESTestCase;
3132

3233
import java.util.concurrent.atomic.AtomicBoolean;
@@ -293,6 +294,31 @@ public void testTrippedCircuitBreakerDurability() {
293294
}
294295
}
295296

297+
public void testAllocationBucketsBreaker() throws Exception {
298+
Settings clusterSettings = Settings.builder()
299+
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "20mb")
300+
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "true")
301+
.build();
302+
303+
try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
304+
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
305+
306+
long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit();
307+
assertEquals(new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(), parentLimitBytes);
308+
309+
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer =
310+
new MultiBucketConsumerService.MultiBucketConsumer(10000, 10000, service);
311+
312+
long currentMemory = ((HierarchyCircuitBreakerService) service).currentMemoryUsage();
313+
if (currentMemory > parentLimitBytes) {
314+
CircuitBreakingException exception =
315+
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(10000));
316+
assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [check_allocation_buckets] would be"));
317+
assertThat(exception.getMessage(), containsString("which is larger than the limit of [20971520/20mb]"));
318+
}
319+
}
320+
}
321+
296322
private long mb(long size) {
297323
return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes();
298324
}

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,7 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi
10711071
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
10721072
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
10731073
final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
1074-
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService);
1074+
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService());
10751075
actions.put(SearchAction.INSTANCE,
10761076
new TransportSearchAction(threadPool, transportService, searchService,
10771077
searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService,

test/framework/src/main/java/org/elasticsearch/node/MockNode.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@
5959
/**
6060
* A node for testing which allows:
6161
* <ul>
62-
* <li>Overriding Version.CURRENT</li>
63-
* <li>Adding test plugins that exist on the classpath</li>
62+
* <li>Overriding Version.CURRENT</li>
63+
* <li>Adding test plugins that exist on the classpath</li>
6464
* </ul>
6565
*/
6666
public class MockNode extends Node {
@@ -72,27 +72,27 @@ public MockNode(final Settings settings, final Collection<Class<? extends Plugin
7272
}
7373

7474
public MockNode(
75-
final Settings settings,
76-
final Collection<Class<? extends Plugin>> classpathPlugins,
77-
final boolean forbidPrivateIndexSettings) {
75+
final Settings settings,
76+
final Collection<Class<? extends Plugin>> classpathPlugins,
77+
final boolean forbidPrivateIndexSettings) {
7878
this(settings, classpathPlugins, null, forbidPrivateIndexSettings);
7979
}
8080

8181
public MockNode(
82-
final Settings settings,
83-
final Collection<Class<? extends Plugin>> classpathPlugins,
84-
final Path configPath,
85-
final boolean forbidPrivateIndexSettings) {
82+
final Settings settings,
83+
final Collection<Class<? extends Plugin>> classpathPlugins,
84+
final Path configPath,
85+
final boolean forbidPrivateIndexSettings) {
8686
this(
87-
InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"),
88-
classpathPlugins,
89-
forbidPrivateIndexSettings);
87+
InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), configPath, () -> "mock_ node"),
88+
classpathPlugins,
89+
forbidPrivateIndexSettings);
9090
}
9191

9292
private MockNode(
93-
final Environment environment,
94-
final Collection<Class<? extends Plugin>> classpathPlugins,
95-
final boolean forbidPrivateIndexSettings) {
93+
final Environment environment,
94+
final Collection<Class<? extends Plugin>> classpathPlugins,
95+
final boolean forbidPrivateIndexSettings) {
9696
super(environment, classpathPlugins, forbidPrivateIndexSettings);
9797
this.classpathPlugins = classpathPlugins;
9898
}
@@ -124,12 +124,14 @@ PageCacheRecycler createPageCacheRecycler(Settings settings) {
124124
@Override
125125
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
126126
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
127-
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
127+
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
128+
CircuitBreakerService circuitBreakerService) {
128129
if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) {
129130
return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase,
130-
responseCollectorService);
131+
responseCollectorService, circuitBreakerService);
131132
}
132-
return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase);
133+
return new MockSearchService(clusterService, indicesService, threadPool, scriptService,
134+
bigArrays, fetchPhase, circuitBreakerService);
133135
}
134136

135137
@Override

test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.util.BigArrays;
2424
import org.elasticsearch.indices.IndicesService;
25+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
2526
import org.elasticsearch.node.MockNode;
2627
import org.elasticsearch.plugins.Plugin;
2728
import org.elasticsearch.script.ScriptService;
@@ -67,9 +68,9 @@ static void removeActiveContext(SearchContext context) {
6768
}
6869

6970
public MockSearchService(ClusterService clusterService,
70-
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
71-
BigArrays bigArrays, FetchPhase fetchPhase) {
72-
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null);
71+
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
72+
BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) {
73+
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null, circuitBreakerService);
7374
}
7475

7576
@Override

0 commit comments

Comments
 (0)