Skip to content

Commit

Permalink
Merge second batch of feature/extensions into main
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Bogan <rbogan@amazon.com>
  • Loading branch information
ryanbogan committed Dec 12, 2022
1 parent e41cbe5 commit 60ea641
Show file tree
Hide file tree
Showing 26 changed files with 2,264 additions and 235 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.io.stream;

import org.opensearch.transport.TransportRequest;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

/**
* Extensibility support for Named Writeable Registry: request to extensions to parse context
*
* @opensearch.internal
* */
public class NamedWriteableRegistryParseRequest extends TransportRequest {

private final Class categoryClass;
private byte[] context;

/**
* @param categoryClass Class category for this parse request
* @param context StreamInput object to convert into a byte array and transport to the extension
* @throws IllegalArgumentException if context bytes could not be read
*/
public NamedWriteableRegistryParseRequest(Class categoryClass, StreamInput context) {
try {
byte[] streamInputBytes = context.readAllBytes();
this.categoryClass = categoryClass;
this.context = Arrays.copyOf(streamInputBytes, streamInputBytes.length);
} catch (IOException e) {
throw new IllegalArgumentException("Invalid context", e);
}
}

/**
* @param in StreamInput from which class fields are read from
* @throws IllegalArgumentException if the fully qualified class name is invalid and the class object cannot be generated at runtime
*/
public NamedWriteableRegistryParseRequest(StreamInput in) throws IOException {
super(in);
try {
this.categoryClass = Class.forName(in.readString());
this.context = in.readByteArray();
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Category class definition not found", e);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(categoryClass.getName());
out.writeByteArray(context);
}

@Override
public String toString() {
return "NamedWriteableRegistryParseRequest{"
+ "categoryClass="
+ categoryClass.getName()
+ ", context="
+ context.toString()
+ " }";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NamedWriteableRegistryParseRequest that = (NamedWriteableRegistryParseRequest) o;
return Objects.equals(categoryClass, that.categoryClass) && Objects.equals(context, that.context);
}

@Override
public int hashCode() {
return Objects.hash(categoryClass, context);
}

/**
* Returns the class instance of the category class sent over by the SDK
*/
public Class getCategoryClass() {
return this.categoryClass;
}

/**
* Returns a copy of a byte array that a {@link Writeable.Reader} will be applied to. This byte array is generated from a {@link StreamInput} instance and transported to the SDK for deserialization.
*/
public byte[] getContext() {
return Arrays.copyOf(this.context, this.context.length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.io.stream;

import org.opensearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Extensibility support for Named Writeable Registry: response from extensions for name writeable registry entries
*
* @opensearch.internal
*/
public class NamedWriteableRegistryResponse extends TransportResponse {

private final Map<String, Class> registry;

/**
* @param registry Map of writeable names and their associated category class
*/
public NamedWriteableRegistryResponse(Map<String, Class> registry) {
this.registry = new HashMap<>(registry);
}

/**
* @param in StreamInput from which map entries of writeable names and their associated category classes are read from
* @throws IllegalArgumentException if the fully qualified class name is invalid and the class object cannot be generated at runtime
*/
public NamedWriteableRegistryResponse(StreamInput in) throws IOException {
super(in);
// Stream output for registry map begins with a variable integer that tells us the number of entries being sent across the wire
Map<String, Class> registry = new HashMap<>();
int registryEntryCount = in.readVInt();
for (int i = 0; i < registryEntryCount; i++) {
try {
String name = in.readString();
Class categoryClass = Class.forName(in.readString());
registry.put(name, categoryClass);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Category class definition not found", e);
}
}

this.registry = registry;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
// Stream out registry size prior to streaming out registry entries
out.writeVInt(this.registry.size());
for (Map.Entry<String, Class> entry : registry.entrySet()) {
out.writeString(entry.getKey()); // Unique named writeable name
out.writeString(entry.getValue().getName()); // Fully qualified category class name
}
}

@Override
public String toString() {
return "NamedWritableRegistryResponse{" + "registry=" + registry.toString() + "}";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NamedWriteableRegistryResponse that = (NamedWriteableRegistryResponse) o;
return Objects.equals(registry, that.registry);
}

@Override
public int hashCode() {
return Objects.hash(registry);
}

/**
* Returns a map of writeable names and their associated category class
*/
public Map<String, Class> getRegistry() {
return Collections.unmodifiableMap(this.registry);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,58 @@
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* PluginRequest to intialize plugin
* InitializeExtensionRequest to intialize plugin
*
* @opensearch.internal
*/
public class PluginRequest extends TransportRequest {
public class InitializeExtensionRequest extends TransportRequest {
private final DiscoveryNode sourceNode;
/*
* TODO change DiscoveryNode to Extension information
*/
private final List<DiscoveryExtensionNode> extensions;
private final DiscoveryExtensionNode extension;

public PluginRequest(DiscoveryNode sourceNode, List<DiscoveryExtensionNode> extensions) {
public InitializeExtensionRequest(DiscoveryNode sourceNode, DiscoveryExtensionNode extension) {
this.sourceNode = sourceNode;
this.extensions = extensions;
this.extension = extension;
}

public PluginRequest(StreamInput in) throws IOException {
public InitializeExtensionRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
extensions = in.readList(DiscoveryExtensionNode::new);
extension = new DiscoveryExtensionNode(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
out.writeList(extensions);
}

public List<DiscoveryExtensionNode> getExtensions() {
return extensions;
extension.writeTo(out);
}

public DiscoveryNode getSourceNode() {
return sourceNode;
}

public DiscoveryExtensionNode getExtension() {
return extension;
}

@Override
public String toString() {
return "PluginRequest{" + "sourceNode=" + sourceNode + ", extensions=" + extensions + '}';
return "InitializeExtensionsRequest{" + "sourceNode=" + sourceNode + ", extension=" + extension + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PluginRequest that = (PluginRequest) o;
return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(extensions, that.extensions);
InitializeExtensionRequest that = (InitializeExtensionRequest) o;
return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(extension, that.extension);
}

@Override
public int hashCode() {
return Objects.hash(sourceNode, extensions);
return Objects.hash(sourceNode, extension);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@
*
* @opensearch.internal
*/
public class PluginResponse extends TransportResponse {
public class InitializeExtensionResponse extends TransportResponse {
private String name;

public PluginResponse(String name) {
public InitializeExtensionResponse(String name) {
this.name = name;
}

public PluginResponse(StreamInput in) throws IOException {
public InitializeExtensionResponse(StreamInput in) throws IOException {
name = in.readString();
}

Expand All @@ -77,7 +77,7 @@ public String toString() {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PluginResponse that = (PluginResponse) o;
InitializeExtensionResponse that = (InitializeExtensionResponse) o;
return Objects.equals(name, that.name);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Objects;

/**
* Generic boolean response indicating the status of some previous request sent to the SDK
*
* @opensearch.internal
*/
public class ExtensionBooleanResponse extends TransportResponse {

private final boolean status;

/**
* @param status Boolean indicating the status of the parse request sent to the SDK
*/
public ExtensionBooleanResponse(boolean status) {
this.status = status;
}

public ExtensionBooleanResponse(StreamInput in) throws IOException {
super(in);
this.status = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(status);
}

@Override
public String toString() {
return "ExtensionBooleanResponse{" + "status=" + this.status + "}";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExtensionBooleanResponse that = (ExtensionBooleanResponse) o;
return Objects.equals(this.status, that.status);
}

@Override
public int hashCode() {
return Objects.hash(status);
}

/**
* Returns a boolean indicating the success of the request sent to the SDK
*/
public boolean getStatus() {
return this.status;
}

}
Loading

0 comments on commit 60ea641

Please sign in to comment.