Skip to content

Commit

Permalink
Clear templates before Adding; Use NamedWriteableAwareStreamInput for…
Browse files Browse the repository at this point in the history
… RemoteCustomMetadata

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Jun 24, 2024
1 parent 47feca7 commit 23039b7
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,11 @@ public Builder templates(Map<String, IndexTemplateMetadata> templates) {
return this;
}

public Builder removeAllTemplates() {
this.templates.clear();
return this;
}

public Builder templates(TemplatesMetadata templatesMetadata) {
this.templates.putAll(templatesMetadata.getTemplates());
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,7 @@ private ClusterState readClusterStateInParallel(
metadataBuilder.transientSettings((Settings) remoteReadResult.getObj());
break;
case TEMPLATES_METADATA:
metadataBuilder.removeAllTemplates();
metadataBuilder.templates((TemplatesMetadata) remoteReadResult.getObj());
break;
case HASHES_OF_CONSISTENT_SETTINGS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.compress.Compressor;
Expand Down Expand Up @@ -122,6 +123,8 @@ public UploadedMetadata getUploadedMetadata() {

public static Custom readFrom(StreamInput streamInput, NamedWriteableRegistry namedWriteableRegistry, String customType)
throws IOException {
return namedWriteableRegistry.getReader(Custom.class, customType).read(streamInput);
try (StreamInput in = new NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry)) {
return namedWriteableRegistry.getReader(Custom.class, customType).read(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,48 @@ public void testIsSegmentReplicationDisabled() {
assertFalse(metadata.isSegmentReplicationEnabled(indexName));
}

public void testTemplatesMetadata() {
TemplatesMetadata templatesMetadata1 = TemplatesMetadata.builder()
.put(
IndexTemplateMetadata.builder("template_1")
.patterns(Arrays.asList("bar-*", "foo-*"))
.settings(Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build())
.build()
)
.build();
Metadata metadata1 = Metadata.builder().templates(templatesMetadata1).build();
assertThat(metadata1.templates(), is(templatesMetadata1.getTemplates()));

TemplatesMetadata templatesMetadata2 = TemplatesMetadata.builder()
.put(
IndexTemplateMetadata.builder("template_2")
.patterns(Arrays.asList("bar-*", "foo-*"))
.settings(Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build())
.build()
)
.build();

Metadata metadata2 = Metadata.builder(metadata1).templates(templatesMetadata2).build();

Map<String, IndexTemplateMetadata> allTemplates = new HashMap<>(templatesMetadata1.getTemplates());
allTemplates.putAll(templatesMetadata2.getTemplates());

assertThat(metadata2.templates(), is(allTemplates));

TemplatesMetadata templatesMetadata3 = TemplatesMetadata.builder()
.put(
IndexTemplateMetadata.builder("template_3")
.patterns(Arrays.asList("bar-*", "foo-*"))
.settings(Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build())
.build()
)
.build();

Metadata metadata3 = Metadata.builder(metadata2).removeAllTemplates().templates(templatesMetadata3).build();

assertThat(metadata3.templates(), is(templatesMetadata3.getTemplates()));
}

public static Metadata randomMetadata() {
Metadata.Builder md = Metadata.builder()
.put(buildIndexMetadata("index", "alias", randomBoolean() ? null : randomBoolean()).build(), randomBoolean())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.gateway.remote.model;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.metadata.IndexGraveyard;
import org.opensearch.cluster.metadata.Metadata.Custom;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -16,13 +18,20 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry.Entry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.NoneCompressor;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.persistent.PersistentTaskParams;
import org.opensearch.persistent.PersistentTasksCustomMetadata;
import org.opensearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand All @@ -33,6 +42,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Objects;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER;
Expand Down Expand Up @@ -216,24 +226,88 @@ public void testGetUploadedMetadata() throws IOException {

public void testSerDe() throws IOException {
Custom customMetadata = getCustomMetadata();
verifySerDe(customMetadata, IndexGraveyard.TYPE);
}

public void testSerDeForPersistentTasks() throws IOException {
Custom customMetadata = getPersistentTasksMetadata();
verifySerDe(customMetadata, PersistentTasksCustomMetadata.TYPE);
}

private void verifySerDe(Custom objectToUpload, String objectType) throws IOException {
RemoteCustomMetadata remoteObjectForUpload = new RemoteCustomMetadata(
customMetadata,
IndexGraveyard.TYPE,
objectToUpload,
objectType,
METADATA_VERSION,
clusterUUID,
compressor,
namedWriteableRegistry
customWritableRegistry()
);
try (InputStream inputStream = remoteObjectForUpload.serialize()) {
remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath());
assertThat(inputStream.available(), greaterThan(0));
Custom readCustomMetadata = remoteObjectForUpload.deserialize(inputStream);
assertThat(readCustomMetadata, is(customMetadata));
assertThat(readCustomMetadata, is(objectToUpload));
}
}

private NamedWriteableRegistry customWritableRegistry() {
List<Entry> entries = ClusterModule.getNamedWriteables();
entries.add(new Entry(PersistentTaskParams.class, TestPersistentTaskParams.PARAM_NAME, TestPersistentTaskParams::new));
return new NamedWriteableRegistry(entries);
}

public static Custom getCustomMetadata() {
return IndexGraveyard.builder().addTombstone(new Index("test-index", "3q2423")).build();
}

private static Custom getPersistentTasksMetadata() {
return PersistentTasksCustomMetadata.builder()
.addTask("_task_1", "testTaskName", new TestPersistentTaskParams("task param data"), new Assignment(null, "_reason"))
.build();
}

public static class TestPersistentTaskParams implements PersistentTaskParams {

private static final String PARAM_NAME = "testTaskName";

private final String data;

public TestPersistentTaskParams(String data) {
this.data = data;
}

public TestPersistentTaskParams(StreamInput in) throws IOException {
this(in.readString());
}

@Override
public String getWriteableName() {
return PARAM_NAME;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_2_13_0;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(data);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("data_field", data);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestPersistentTaskParams that = (TestPersistentTaskParams) o;
return Objects.equals(data, that.data);
}
}

}

0 comments on commit 23039b7

Please sign in to comment.