Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Passing IndicesService to ingest processor factory with processor params (#10307) #11281

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672))
- Add back half_float BKD based sort query optimization ([#11024](https://github.com/opensearch-project/OpenSearch/pull/11024))
- Request level coordinator slow logs ([#11246](https://github.com/opensearch-project/OpenSearch/pull/11246))
- Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -128,7 +129,8 @@ public IngestService(
ScriptService scriptService,
AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins,
Client client
Client client,
IndicesService indicesService
) {
this.clusterService = clusterService;
this.scriptService = scriptService;
Expand All @@ -143,7 +145,8 @@ public IngestService(
(delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC),
this,
client,
threadPool.generic()::execute
threadPool.generic()::execute,
indicesService
)
);
this.threadPool = threadPool;
Expand Down
7 changes: 6 additions & 1 deletion server/src/main/java/org/opensearch/ingest/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.env.Environment;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.indices.IndicesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.Scheduler;

Expand Down Expand Up @@ -156,6 +157,8 @@ class Parameters {
*/
public final Client client;

public final IndicesService indicesService;

public Parameters(
Environment env,
ScriptService scriptService,
Expand All @@ -165,7 +168,8 @@ public Parameters(
BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService,
Client client,
Consumer<Runnable> genericExecutor
Consumer<Runnable> genericExecutor,
IndicesService indicesService
) {
this.env = env;
this.scriptService = scriptService;
Expand All @@ -176,6 +180,7 @@ public Parameters(
this.ingestService = ingestService;
this.client = client;
this.genericExecutor = genericExecutor;
this.indicesService = indicesService;
}

}
Expand Down
21 changes: 12 additions & 9 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -628,15 +628,6 @@ protected Node(
metricsRegistry = metricsRegistryFactory.getMetricsRegistry();
resourcesToClose.add(tracer::close);
resourcesToClose.add(metricsRegistry::close);
final IngestService ingestService = new IngestService(
clusterService,
threadPool,
this.environment,
scriptService,
analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class),
client
);

final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();
Expand Down Expand Up @@ -822,6 +813,18 @@ protected Node(
remoteStoreStatsTrackerFactory,
recoverySettings
);

final IngestService ingestService = new IngestService(
clusterService,
threadPool,
this.environment,
scriptService,
analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class),
client,
indicesService
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.script.MockScriptEngine;
import org.opensearch.script.Script;
Expand Down Expand Up @@ -149,7 +150,8 @@ public void testIngestPlugin() {
null,
null,
Collections.singletonList(DUMMY_PLUGIN),
client
client,
mock(IndicesService.class)
);
Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
assertTrue(factories.containsKey("foo"));
Expand All @@ -167,7 +169,8 @@ public void testIngestPluginDuplicate() {
null,
null,
Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN),
client
client,
mock(IndicesService.class)
)
);
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
Expand All @@ -182,7 +185,8 @@ public void testExecuteIndexPipelineDoesNotExist() {
null,
null,
Collections.singletonList(DUMMY_PLUGIN),
client
client,
mock(IndicesService.class)
);
final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
.source(emptyMap())
Expand Down Expand Up @@ -1485,7 +1489,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
null,
null,
Arrays.asList(testPlugin),
client
client,
mock(IndicesService.class)
);
ingestService.addIngestClusterStateListener(ingestClusterStateListener);

Expand Down Expand Up @@ -1702,7 +1707,7 @@ private static IngestService createWithProcessors(Map<String, Processor.Factory>
public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
return processors;
}
}), client);
}), client, mock(IndicesService.class));
}

private CompoundProcessor mockCompoundProcessor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2210,7 +2210,8 @@ public void onFailure(final Exception e) {
scriptService,
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
Collections.emptyList(),
client
client,
indicesService
),
transportShardBulkAction,
client,
Expand Down
Loading