diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 31023fc85d5b8..4d8477257a26a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -30,11 +30,13 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -45,8 +47,12 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -86,9 +92,10 @@ public class IngestService implements ClusterStateApplier { public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, - List ingestPlugins) { + List ingestPlugins, IndicesService indicesService) { this.clusterService = clusterService; this.scriptService = scriptService; + final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(); this.processorFactories = processorFactories( ingestPlugins, new Processor.Parameters( @@ -96,7 +103,32 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool, threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC - ), this + ), this, indexExpression -> { + ClusterState state = clusterService.state(); + Index[] resolvedIndices = resolver.concreteIndices(state, IndicesOptions.STRICT_EXPAND_OPEN, indexExpression); + if (resolvedIndices.length != 1) { + throw new IllegalStateException("expression [" + indexExpression + "] can only point to a single concrete index"); + } + Index index = resolvedIndices[0]; + + // check if indexExpression matches with an alias that has a filter + // There is no guarantee that alias filters are applied, so fail if this is the case. + Set indicesAndAliases = resolver.resolveExpressions(state, indexExpression); + String[] aliasesWithFilter = resolver.filteringAliases(state, index.getName(), indicesAndAliases); + if (aliasesWithFilter != null && aliasesWithFilter.length > 0) { + throw new IllegalStateException("expression [" + indexExpression + "] points an alias with a filter"); + } + + IndexService indexService = indicesService.indexServiceSafe(index); + int numShards = indexService.getMetaData().getNumberOfShards(); + if (numShards != 1) { + throw new IllegalStateException("index [" + index.getName() + "] must have 1 shard, but has " + numShards + + " shards"); + } + + IndexShard indexShard = indexService.getShard(0); + return indexShard.acquireSearcher("ingest"); + } ) ); this.threadPool = threadPool; diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index c064ddb35a129..5d01101e8be20 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -22,11 +22,13 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.Scheduler; import java.util.Map; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.LongSupplier; /** @@ -110,9 +112,16 @@ class Parameters { */ public final BiFunction scheduler; + /** + * Provides access to an engine searcher of a locally allocated index specified for the provided index. + * + * The locally allocated index must be have a single primary shard. + */ + public final Function localShardSearcher; + public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, LongSupplier relativeTimeSupplier, BiFunction scheduler, - IngestService ingestService) { + IngestService ingestService, Function localShardSearcher) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; @@ -120,6 +129,7 @@ public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry this.relativeTimeSupplier = relativeTimeSupplier; this.scheduler = scheduler; this.ingestService = ingestService; + this.localShardSearcher = localShardSearcher; } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 42f80dbd87c47..c5b440c400fd3 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -348,8 +348,6 @@ protected Node( final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool); clusterService.addStateApplier(scriptModule.getScriptService()); resourcesToClose.add(clusterService); - final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, - scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client, @@ -419,6 +417,10 @@ protected Node( threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders, indexStoreFactories); + final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, + scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), + pluginsService.filterPlugins(IngestPlugin.class), indicesService); + final AliasValidator aliasValidator = new AliasValidator(); final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService( diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java new file mode 100644 index 0000000000000..d15f9d681500f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java @@ -0,0 +1,269 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.ingest; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.LeafReader; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexSearcherWrapper; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return Collections.singleton(TestPlugin.class); + } + + public void testLocalShardSearcher() throws Exception { + client().index(new IndexRequest("reference-index").id("1").source("{}", XContentType.JSON)).actionGet(); + client().admin().indices().refresh(new RefreshRequest("reference-index")).actionGet(); + + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet(); + client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet(); + + Map result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap(); + assertThat(result.size(), equalTo(1)); + assertThat(result.get("id"), equalTo("1")); + } + + public void testMultipleIndicesAreResolved() throws Exception { + createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1") + .addAlias(new Alias("reference-index"))); + createIndex("reference-index2", client().admin().indices().prepareCreate("reference-index2") + .addAlias(new Alias("reference-index"))); + + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline"); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet()); + assertThat(e.getRootCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getRootCause().getMessage(), equalTo("expression [reference-index] can only point to a single concrete index")); + } + + public void testMoreThanOnePrimaryShard() throws Exception { + createIndex("reference-index", Settings.builder().put("index.number_of_shards", 2).build()); + + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline"); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet()); + assertThat(e.getRootCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getRootCause().getMessage(), equalTo("index [reference-index] must have 1 shard, but has 2 shards")); + } + + public void testFailWithFilteredAlias() throws Exception { + createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1") + .addAlias(new Alias("reference-index").filter(QueryBuilders.matchAllQuery()))); + + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline"); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet()); + assertThat(e.getRootCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getRootCause().getMessage(), equalTo("expression [reference-index] points an alias with a filter")); + } + + public void testWithAlias() throws Exception { + createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1") + .addAlias(new Alias("reference-index"))); + + client().index(new IndexRequest("reference-index1").id("1").source("{}", XContentType.JSON)).actionGet(); + client().admin().indices().refresh(new RefreshRequest("reference-index1")).actionGet(); + + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet(); + client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet(); + + Map result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap(); + assertThat(result.size(), equalTo(1)); + assertThat(result.get("id"), equalTo("1")); + } + + public void testSearchWrapperIsApplied() throws Exception { + client().index(new IndexRequest("reference-index").id("1").source("{}", XContentType.JSON)).actionGet(); + client().admin().indices().refresh(new RefreshRequest("reference-index")).actionGet(); + + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet(); + client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet(); + + Map result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap(); + assertThat(result.size(), equalTo(1)); + assertThat(result.get("id"), equalTo("1")); + } + + private static BytesReference createPipelineSource() throws IOException { + return BytesReference.bytes(jsonBuilder().startObject() + .startArray("processors") + .startObject() + .startObject(TestProcessor.NAME) + .endObject() + .endObject() + .endArray() + .endObject()); + } + + public static class TestPlugin extends Plugin implements IngestPlugin { + + @Override + public Map getProcessors(Processor.Parameters parameters) { + return Collections.singletonMap(TestProcessor.NAME, new TestProcessor.Factory(parameters.localShardSearcher)); + } + + @Override + public void onIndexModule(IndexModule indexModule) { + indexModule.setSearcherWrapper(indexService -> new IndexSearcherWrapper() { + + @Override + protected DirectoryReader wrap(DirectoryReader reader) throws IOException { + return new TestDirectyReader(reader); + } + }); + } + } + + static class TestProcessor extends AbstractProcessor { + + static final String NAME = "test_processor"; + + private final Function localShardSearcher; + + TestProcessor(String tag, Function localShardSearcher) { + super(tag); + this.localShardSearcher = localShardSearcher; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + String indexExpression = "reference-index"; + try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression)) { + // Ensure that search wrapper has been invoked by checking the directory instance type: + if ((engineSearcher.getDirectoryReader() instanceof TestDirectyReader) == false) { + // asserting or throwing a AssertionError makes this test hang: + // so just throw a runtime exception here: + throw new RuntimeException("unexpected directory instance type"); + } + Document document = engineSearcher.searcher().doc(0); + ingestDocument.setFieldValue("id", Uid.decodeId(document.getBinaryValue("_id").bytes)); + } + return ingestDocument; + } + + @Override + public String getType() { + return NAME; + } + + static class Factory implements Processor.Factory { + + private final Function localShardSearcher; + + Factory(Function localShardSearcher) { + this.localShardSearcher = localShardSearcher; + } + + @Override + public Processor create(Map processorFactories, + String tag, Map config) throws Exception { + return new TestProcessor(tag, localShardSearcher); + } + } + + } + + static class TestDirectyReader extends FilterDirectoryReader { + + TestDirectyReader(DirectoryReader in) throws IOException { + super(in, new TestSubReaderWrapper()); + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new TestDirectyReader(in); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + + static class TestSubReaderWrapper extends SubReaderWrapper { + @Override + public LeafReader wrap(LeafReader reader) { + return new TestLeafReader(reader); + } + } + + static class TestLeafReader extends FilterLeafReader { + + TestLeafReader(LeafReader in) { + super(in); + } + + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index e5aea1f5d5ce1..8754acc091c99 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -98,7 +98,7 @@ public Map getProcessors(Processor.Parameters paramet public void testIngestPlugin() { ThreadPool tp = mock(ThreadPool.class); IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null, - null, Collections.singletonList(DUMMY_PLUGIN)); + null, Collections.singletonList(DUMMY_PLUGIN), null); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); @@ -108,7 +108,7 @@ public void testIngestPluginDuplicate() { ThreadPool tp = mock(ThreadPool.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new IngestService(mock(ClusterService.class), tp, null, null, - null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))); + null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), null)); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } @@ -117,7 +117,7 @@ public void testExecuteIndexPipelineDoesNotExist() { final ExecutorService executorService = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(executorService); IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, - null, Collections.singletonList(DUMMY_PLUGIN)); + null, Collections.singletonList(DUMMY_PLUGIN), null); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final SetOnce failure = new SetOnce<>(); @@ -1001,7 +1001,7 @@ private static IngestService createWithProcessors(Map public Map getProcessors(final Processor.Parameters parameters) { return processors; } - })); + }), null); } private class IngestDocumentMatcher extends ArgumentMatcher {