Skip to content

Commit

Permalink
[Remove] type from TaskResults index and IndexMetadata.getMappings (#…
Browse files Browse the repository at this point in the history
…2469)

Removes types from the TaskResults internal index along with the getMappings
method from IndexMetadata. This is needed to further remove types from
CreateIndexRequest.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize authored Mar 15, 2022
1 parent 02d000c commit 7df40ee
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,11 @@ public void testTaskStoringSuccessfulResult() throws Exception {
GetTaskResponse getResponse = expectFinishedTask(taskId);
assertEquals(result, getResponse.getTask().getResponseAsMap());
assertNull(getResponse.getTask().getError());

// run it again to check that the tasks index has been successfully created and can be re-used
client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request).get();
events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);
assertEquals(2, events.size());
}

public void testTaskStoringFailureResult() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -123,9 +122,8 @@ public void testMappingMetadataParsed() throws Exception {
.getState()
.metadata()
.index("test")
.getMappings()
.get(MapperService.SINGLE_MAPPING_NAME);
assertThat(mappingMd.routing().required(), equalTo(true));
.mapping();
assertThat(mappingMd.routingRequired(), equalTo(true));

logger.info("--> restarting nodes...");
internalCluster().fullRestart();
Expand All @@ -134,17 +132,8 @@ public void testMappingMetadataParsed() throws Exception {
ensureYellow();

logger.info("--> verify meta _routing required exists");
mappingMd = client().admin()
.cluster()
.prepareState()
.execute()
.actionGet()
.getState()
.metadata()
.index("test")
.getMappings()
.get(MapperService.SINGLE_MAPPING_NAME);
assertThat(mappingMd.routing().required(), equalTo(true));
mappingMd = client().admin().cluster().prepareState().execute().actionGet().getState().metadata().index("test").mapping();
assertThat(mappingMd.routingRequired(), equalTo(true));
}

public void testSimpleOpenClose() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,7 @@ public void testMetaWrittenWhenIndexIsClosedAndMetaUpdated() throws Exception {

// make sure it was also written on red node although index is closed
ImmutableOpenMap<String, IndexMetadata> indicesMetadata = getIndicesMetadataOnNode(dataNode);
assertNotNull(
((Map<String, ?>) (indicesMetadata.get(index).getMappings().get("_doc").getSourceAsMap().get("properties"))).get(
"integer_field"
)
);
assertNotNull(((Map<String, ?>) (indicesMetadata.get(index).mapping().getSourceAsMap().get("properties"))).get("integer_field"));
assertThat(indicesMetadata.get(index).getState(), equalTo(IndexMetadata.State.CLOSE));

/* Try the same and see if this also works if node was just restarted.
Expand Down Expand Up @@ -190,9 +186,7 @@ public void testMetaWrittenWhenIndexIsClosedAndMetaUpdated() throws Exception {

// make sure it was also written on red node although index is closed
indicesMetadata = getIndicesMetadataOnNode(dataNode);
assertNotNull(
((Map<String, ?>) (indicesMetadata.get(index).getMappings().get("_doc").getSourceAsMap().get("properties"))).get("float_field")
);
assertNotNull(((Map<String, ?>) (indicesMetadata.get(index).mapping().getSourceAsMap().get("properties"))).get("float_field"));
assertThat(indicesMetadata.get(index).getState(), equalTo(IndexMetadata.State.CLOSE));

// finally check that meta data is also written of index opened again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ public VersionType versionType() {
public void process(Version indexCreatedVersion, @Nullable MappingMetadata mappingMd, String concreteIndex) {
if (mappingMd != null) {
// might as well check for routing here
if (mappingMd.routing().required() && routing == null) {
if (mappingMd.routingRequired() && routing == null) {
throw new RoutingMissingException(concreteIndex, id);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,17 +660,6 @@ public ImmutableOpenMap<String, AliasMetadata> getAliases() {
return this.aliases;
}

/**
* Return an object that maps each type to the associated mappings.
* The return value is never {@code null} but may be empty if the index
* has no mappings.
* @deprecated Use {@link #mapping()} instead now that indices have a single type
*/
@Deprecated
public ImmutableOpenMap<String, MappingMetadata> getMappings() {
return mappings;
}

/**
* Return the concrete mapping for this index or {@code null} if this index has no mappings at all.
*/
Expand Down Expand Up @@ -1175,7 +1164,10 @@ public Builder putMapping(String source) throws IOException {
}

public Builder putMapping(MappingMetadata mappingMd) {
mappings.put(mappingMd.type(), mappingMd);
mappings.clear();
if (mappingMd != null) {
mappings.put(mappingMd.type(), mappingMd);
}
return this;
}

Expand Down Expand Up @@ -1464,23 +1456,25 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build

if (context != Metadata.XContentContext.API) {
builder.startArray(KEY_MAPPINGS);
for (ObjectObjectCursor<String, MappingMetadata> cursor : indexMetadata.getMappings()) {
MappingMetadata mmd = indexMetadata.mapping();
if (mmd != null) {
if (binary) {
builder.value(cursor.value.source().compressed());
builder.value(mmd.source().compressed());
} else {
builder.map(XContentHelper.convertToMap(cursor.value.source().uncompressed(), true).v2());
builder.map(XContentHelper.convertToMap(mmd.source().uncompressed(), true).v2());
}
}
builder.endArray();
} else {
builder.startObject(KEY_MAPPINGS);
for (ObjectObjectCursor<String, MappingMetadata> cursor : indexMetadata.getMappings()) {
Map<String, Object> mapping = XContentHelper.convertToMap(cursor.value.source().uncompressed(), false).v2();
if (mapping.size() == 1 && mapping.containsKey(cursor.key)) {
MappingMetadata mmd = indexMetadata.mapping();
if (mmd != null) {
Map<String, Object> mapping = XContentHelper.convertToMap(mmd.source().uncompressed(), false).v2();
if (mapping.size() == 1 && mapping.containsKey(mmd.type())) {
// the type name is the root value, reduce it
mapping = (Map<String, Object>) mapping.get(cursor.key);
mapping = (Map<String, Object>) mapping.get(mmd.type());
}
builder.field(cursor.key);
builder.field(mmd.type());
builder.map(mapping);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;

Expand All @@ -59,46 +60,16 @@
public class MappingMetadata extends AbstractDiffable<MappingMetadata> {
public static final MappingMetadata EMPTY_MAPPINGS = new MappingMetadata(MapperService.SINGLE_MAPPING_NAME, Collections.emptyMap());

public static class Routing {

public static final Routing EMPTY = new Routing(false);

private final boolean required;

public Routing(boolean required) {
this.required = required;
}

public boolean required() {
return required;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Routing routing = (Routing) o;

return required == routing.required;
}

@Override
public int hashCode() {
return getClass().hashCode() + (required ? 1 : 0);
}
}

private final String type;

private final CompressedXContent source;

private final Routing routing;
private final boolean routingRequired;

public MappingMetadata(DocumentMapper docMapper) {
this.type = docMapper.type();
this.source = docMapper.mappingSource();
this.routing = new Routing(docMapper.routingFieldMapper().required());
this.routingRequired = docMapper.routingFieldMapper().required();
}

@SuppressWarnings("unchecked")
Expand All @@ -109,7 +80,7 @@ public MappingMetadata(CompressedXContent mapping) {
throw new IllegalStateException("Can't derive type from mapping, no root type: " + mapping.string());
}
this.type = mappingMap.keySet().iterator().next();
this.routing = initRouting((Map<String, Object>) mappingMap.get(this.type));
this.routingRequired = isRoutingRequired((Map<String, Object>) mappingMap.get(this.type));
}

@SuppressWarnings("unchecked")
Expand All @@ -125,13 +96,13 @@ public MappingMetadata(String type, Map<String, Object> mapping) {
if (mapping.size() == 1 && mapping.containsKey(type)) {
withoutType = (Map<String, Object>) mapping.get(type);
}
this.routing = initRouting(withoutType);
this.routingRequired = isRoutingRequired(withoutType);
}

@SuppressWarnings("unchecked")
private Routing initRouting(Map<String, Object> withoutType) {
private boolean isRoutingRequired(Map<String, Object> withoutType) {
boolean required = false;
if (withoutType.containsKey("_routing")) {
boolean required = false;
Map<String, Object> routingNode = (Map<String, Object>) withoutType.get("_routing");
for (Map.Entry<String, Object> entry : routingNode.entrySet()) {
String fieldName = entry.getKey();
Expand All @@ -147,10 +118,8 @@ private Routing initRouting(Map<String, Object> withoutType) {
}
}
}
return new Routing(required);
} else {
return Routing.EMPTY;
}
return required;
}

public String type() {
Expand Down Expand Up @@ -180,16 +149,16 @@ public Map<String, Object> getSourceAsMap() throws OpenSearchParseException {
return sourceAsMap();
}

public Routing routing() {
return this.routing;
public boolean routingRequired() {
return this.routingRequired;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(type());
source().writeTo(out);
// routing
out.writeBoolean(routing().required());
out.writeBoolean(routingRequired);
if (out.getVersion().before(LegacyESVersion.V_7_0_0)) {
out.writeBoolean(false); // hasParentField
}
Expand All @@ -202,7 +171,7 @@ public boolean equals(Object o) {

MappingMetadata that = (MappingMetadata) o;

if (!routing.equals(that.routing)) return false;
if (!Objects.equals(this.routingRequired, that.routingRequired)) return false;
if (!source.equals(that.source)) return false;
if (!type.equals(that.type)) return false;

Expand All @@ -211,17 +180,14 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
int result = type.hashCode();
result = 31 * result + source.hashCode();
result = 31 * result + routing.hashCode();
return result;
return Objects.hash(type, source, routingRequired);
}

public MappingMetadata(StreamInput in) throws IOException {
type = in.readString();
source = CompressedXContent.readCompressedString(in);
// routing
routing = new Routing(in.readBoolean());
routingRequired = in.readBoolean();
if (in.getVersion().before(LegacyESVersion.V_7_0_0)) {
in.readBoolean(); // hasParentField
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ public boolean routingRequired(String concreteIndex) {
if (indexMetadata != null) {
MappingMetadata mappingMetadata = indexMetadata.mapping();
if (mappingMetadata != null) {
return mappingMetadata.routing().required();
return mappingMetadata.routingRequired();
}
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.index.mapper;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.DelegatingAnalyzerWrapper;
Expand Down Expand Up @@ -416,8 +415,8 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge
private synchronized Map<String, DocumentMapper> internalMerge(IndexMetadata indexMetadata, MergeReason reason) {
assert reason != MergeReason.MAPPING_UPDATE_PREFLIGHT;
Map<String, CompressedXContent> map = new LinkedHashMap<>();
for (ObjectCursor<MappingMetadata> cursor : indexMetadata.getMappings().values()) {
MappingMetadata mappingMetadata = cursor.value;
MappingMetadata mappingMetadata = indexMetadata.mapping();
if (mappingMetadata != null) {
map.put(mappingMetadata.type(), mappingMetadata.source());
}
return internalMerge(map, reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.index.shard;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
Expand Down Expand Up @@ -132,8 +131,8 @@ void recoverFromLocalShards(
throw new IllegalArgumentException("can't add shards from more than one index");
}
IndexMetadata sourceMetadata = shards.get(0).getIndexMetadata();
for (ObjectObjectCursor<String, MappingMetadata> mapping : sourceMetadata.getMappings()) {
mappingUpdateConsumer.accept(mapping.value);
if (sourceMetadata.mapping() != null) {
mappingUpdateConsumer.accept(sourceMetadata.mapping());
}
indexShard.mapperService().merge(sourceMetadata, MapperService.MergeReason.MAPPING_RECOVERY);
// now that the mapping is merged we can validate the index sort configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,11 @@ public class TaskResultsService {

public static final String TASK_INDEX = ".tasks";

public static final String TASK_TYPE = "task";

public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json";

public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";

public static final int TASK_RESULT_MAPPING_VERSION = 3;
public static final int TASK_RESULT_MAPPING_VERSION = 3; // must match version in task-index-mapping.json

/**
* The backoff policy to use when saving a task result fails. The total wait
Expand Down Expand Up @@ -115,7 +113,7 @@ public void storeResult(TaskResult taskResult, ActionListener<Void> listener) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.settings(taskResultIndexSettings());
createIndexRequest.index(TASK_INDEX);
createIndexRequest.mapping(TASK_TYPE, taskResultIndexMapping(), XContentType.JSON);
createIndexRequest.mapping(taskResultIndexMapping());
createIndexRequest.cause("auto(task api)");

client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
Expand Down Expand Up @@ -155,7 +153,7 @@ public void onFailure(Exception e) {
}

private int getTaskResultMappingVersion(IndexMetadata metadata) {
MappingMetadata mappingMetadata = metadata.getMappings().get(TASK_TYPE);
MappingMetadata mappingMetadata = metadata.mapping();
if (mappingMetadata == null) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"task" : {
"_doc" : {
"_meta": {
"version": 3
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testMappingClusterStateUpdateDoesntChangeExistingIndices() throws Ex
// the task really was a mapping update
assertThat(
indexService.mapperService().documentMapper().mappingSource(),
not(equalTo(result.resultingState.metadata().index("test").getMappings().get(MapperService.SINGLE_MAPPING_NAME).source()))
not(equalTo(result.resultingState.metadata().index("test").mapping().source()))
);
// since we never committed the cluster state update, the in-memory state is unchanged
assertThat(indexService.mapperService().documentMapper().mappingSource(), equalTo(currentMapping));
Expand Down

0 comments on commit 7df40ee

Please sign in to comment.