Skip to content

Commit

Permalink
Add limit on number of processors in Ingest pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
Rai committed Aug 28, 2024
1 parent 23cba28 commit 290dbff
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- Add limit on number of processors in Ingest pipelines([#15460](https://github.com/opensearch-project/OpenSearch/issues/15460)).

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
package org.opensearch.action.ingest;

import org.opensearch.action.ActionRunnable;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.ingest.CompoundProcessor;
import org.opensearch.ingest.IngestPipelineValidator;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Pipeline;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -56,9 +58,11 @@ class SimulateExecutionService {
private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;

private final ThreadPool threadPool;
private final ClusterService clusterService;

SimulateExecutionService(ThreadPool threadPool) {
SimulateExecutionService(ThreadPool threadPool, ClusterService clusterService) {
this.threadPool = threadPool;
this.clusterService = clusterService;
}

void executeDocument(
Expand Down Expand Up @@ -91,6 +95,9 @@ void executeDocument(
}

public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {

IngestPipelineValidator.validateIngestPipeline(request.getPipeline(), clusterService);

threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
final AtomicInteger counter = new AtomicInteger();
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
Expand All @@ -60,7 +61,7 @@ public SimulatePipelineTransportAction(
ThreadPool threadPool,
TransportService transportService,
ActionFilters actionFilters,
IngestService ingestService
IngestService ingestService, ClusterService clusterService
) {
super(
SimulatePipelineAction.NAME,
Expand All @@ -69,7 +70,7 @@ public SimulatePipelineTransportAction(
(Writeable.Reader<SimulatePipelineRequest>) SimulatePipelineRequest::new
);
this.ingestService = ingestService;
this.executionService = new SimulateExecutionService(threadPool);
this.executionService = new SimulateExecutionService(threadPool, clusterService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.ingest.IngestPipelineValidator;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.monitor.jvm.JvmGcMonitorService;
Expand Down Expand Up @@ -405,6 +406,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterService.USER_DEFINED_METADATA,
ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest;

import java.util.List;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;

/**
* This class contains methods to validate the ingest pipeline.
*/
public class IngestPipelineValidator {

/**
* Defines the limit for the number of processors which can run on a given document during ingestion.
*
*/
public static final Setting<Integer> MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting(
"cluster.ingest.max_number_processors",
Integer.MAX_VALUE,
1,
Integer.MAX_VALUE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Validates that the number of compound processors in the pipeline does not exceed the configured limit.
* @param pipeline
* @param clusterService
*/
public static void validateIngestPipeline(Pipeline pipeline, ClusterService clusterService) {

List<Processor> processors = pipeline.getCompoundProcessor().getProcessors();
int maxNumberOfIngestProcessorsAllowed = clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS);

if (processors.size() > maxNumberOfIngestProcessorsAllowed) {
throw new IllegalStateException(
"Cannot use more than the maximum processors allowed. Number of processors configured is ["
+ processors.size() + "] which exceeds the maximum allowed configuration of [" + maxNumberOfIngestProcessorsAllowed +
"] processors.");
}
}
}
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq

Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2();
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);

IngestPipelineValidator.validateIngestPipeline(pipeline, clusterService);

List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
Expand Down Expand Up @@ -1099,6 +1102,7 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
processorFactories,
scriptService
);
IngestPipelineValidator.validateIngestPipeline(newPipeline, clusterService);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));

if (previous == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.ingest;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.VersionType;
import org.opensearch.ingest.AbstractProcessor;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import static org.mockito.Mockito.mock;
import static org.opensearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -75,11 +77,13 @@ public class SimulateExecutionServiceTests extends OpenSearchTestCase {
private TestThreadPool threadPool;
private SimulateExecutionService executionService;
private IngestDocument ingestDocument;
private ClusterService clusterService;

@Before
public void setup() {
threadPool = new TestThreadPool(SimulateExecutionServiceTests.class.getSimpleName());
executionService = new SimulateExecutionService(threadPool);
clusterService = mock(ClusterService.class);
executionService = new SimulateExecutionService(threadPool, clusterService);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
}

Expand Down

0 comments on commit 290dbff

Please sign in to comment.