Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added extension Points, initial REST implementation and registering Transport Actions for extensions #5518

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211))
- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366))
- Added experimental extensions to main ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347))
- Expanded experimental extensions ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518))
ryanbogan marked this conversation as resolved.
Show resolved Hide resolved

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.io.stream;

import org.opensearch.transport.TransportRequest;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

/**
* Extensibility support for Named Writeable Registry: request to extensions to parse context
*
* @opensearch.internal
* */
public class NamedWriteableRegistryParseRequest extends TransportRequest {

private final Class categoryClass;
private byte[] context;

/**
* @param categoryClass Class category for this parse request
* @param context StreamInput object to convert into a byte array and transport to the extension
* @throws IllegalArgumentException if context bytes could not be read
*/
public NamedWriteableRegistryParseRequest(Class categoryClass, StreamInput context) {
try {
byte[] streamInputBytes = context.readAllBytes();
this.categoryClass = categoryClass;
this.context = Arrays.copyOf(streamInputBytes, streamInputBytes.length);
} catch (IOException e) {
throw new IllegalArgumentException("Invalid context", e);
}
}

/**
* @param in StreamInput from which class fields are read from
* @throws IllegalArgumentException if the fully qualified class name is invalid and the class object cannot be generated at runtime
*/
public NamedWriteableRegistryParseRequest(StreamInput in) throws IOException {
super(in);
try {
this.categoryClass = Class.forName(in.readString());
ryanbogan marked this conversation as resolved.
Show resolved Hide resolved
this.context = in.readByteArray();
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Category class definition not found", e);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(categoryClass.getName());
out.writeByteArray(context);
}

@Override
public String toString() {
return "NamedWriteableRegistryParseRequest{"
+ "categoryClass="
+ categoryClass.getName()
+ ", context="
+ context.toString()
+ " }";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NamedWriteableRegistryParseRequest that = (NamedWriteableRegistryParseRequest) o;
return Objects.equals(categoryClass, that.categoryClass) && Objects.equals(context, that.context);
}

@Override
public int hashCode() {
return Objects.hash(categoryClass, context);
}

/**
* Returns the class instance of the category class sent over by the SDK
*/
public Class getCategoryClass() {
return this.categoryClass;
}

/**
* Returns a copy of a byte array that a {@link Writeable.Reader} will be applied to. This byte array is generated from a {@link StreamInput} instance and transported to the SDK for deserialization.
*/
public byte[] getContext() {
return Arrays.copyOf(this.context, this.context.length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.io.stream;

import org.opensearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Extensibility support for Named Writeable Registry: response from extensions for name writeable registry entries
*
* @opensearch.internal
*/
public class NamedWriteableRegistryResponse extends TransportResponse {

private final Map<String, Class> registry;

/**
* @param registry Map of writeable names and their associated category class
*/
public NamedWriteableRegistryResponse(Map<String, Class> registry) {
this.registry = new HashMap<>(registry);
}

/**
* @param in StreamInput from which map entries of writeable names and their associated category classes are read from
* @throws IllegalArgumentException if the fully qualified class name is invalid and the class object cannot be generated at runtime
*/
public NamedWriteableRegistryResponse(StreamInput in) throws IOException {
super(in);
// Stream output for registry map begins with a variable integer that tells us the number of entries being sent across the wire
Map<String, Class> registry = new HashMap<>();
int registryEntryCount = in.readVInt();
for (int i = 0; i < registryEntryCount; i++) {
try {
String name = in.readString();
Class categoryClass = Class.forName(in.readString());
registry.put(name, categoryClass);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Category class definition not found", e);
}
}

this.registry = registry;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
// Stream out registry size prior to streaming out registry entries
out.writeVInt(this.registry.size());
for (Map.Entry<String, Class> entry : registry.entrySet()) {
out.writeString(entry.getKey()); // Unique named writeable name
out.writeString(entry.getValue().getName()); // Fully qualified category class name
}
}

@Override
public String toString() {
return "NamedWritableRegistryResponse{" + "registry=" + registry.toString() + "}";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NamedWriteableRegistryResponse that = (NamedWriteableRegistryResponse) o;
return Objects.equals(registry, that.registry);
}

@Override
public int hashCode() {
return Objects.hash(registry);
}

/**
* Returns a map of writeable names and their associated category class
*/
public Map<String, Class> getRegistry() {
return Collections.unmodifiableMap(this.registry);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,58 @@
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* PluginRequest to intialize plugin
* InitializeExtensionRequest to intialize plugin
*
* @opensearch.internal
*/
public class PluginRequest extends TransportRequest {
public class InitializeExtensionRequest extends TransportRequest {
private final DiscoveryNode sourceNode;
/*
* TODO change DiscoveryNode to Extension information
*/
private final List<DiscoveryExtensionNode> extensions;
private final DiscoveryExtensionNode extension;

public PluginRequest(DiscoveryNode sourceNode, List<DiscoveryExtensionNode> extensions) {
public InitializeExtensionRequest(DiscoveryNode sourceNode, DiscoveryExtensionNode extension) {
this.sourceNode = sourceNode;
this.extensions = extensions;
this.extension = extension;
}

public PluginRequest(StreamInput in) throws IOException {
public InitializeExtensionRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
extensions = in.readList(DiscoveryExtensionNode::new);
extension = new DiscoveryExtensionNode(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
out.writeList(extensions);
}

public List<DiscoveryExtensionNode> getExtensions() {
return extensions;
extension.writeTo(out);
}

public DiscoveryNode getSourceNode() {
return sourceNode;
}

public DiscoveryExtensionNode getExtension() {
return extension;
}

@Override
public String toString() {
return "PluginRequest{" + "sourceNode=" + sourceNode + ", extensions=" + extensions + '}';
return "InitializeExtensionsRequest{" + "sourceNode=" + sourceNode + ", extension=" + extension + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PluginRequest that = (PluginRequest) o;
return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(extensions, that.extensions);
InitializeExtensionRequest that = (InitializeExtensionRequest) o;
return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(extension, that.extension);
}

@Override
public int hashCode() {
return Objects.hash(sourceNode, extensions);
return Objects.hash(sourceNode, extension);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@
*
* @opensearch.internal
*/
public class PluginResponse extends TransportResponse {
public class InitializeExtensionResponse extends TransportResponse {
private String name;

public PluginResponse(String name) {
public InitializeExtensionResponse(String name) {
this.name = name;
}

public PluginResponse(StreamInput in) throws IOException {
public InitializeExtensionResponse(StreamInput in) throws IOException {
name = in.readString();
}

Expand All @@ -77,7 +77,7 @@ public String toString() {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PluginResponse that = (PluginResponse) o;
InitializeExtensionResponse that = (InitializeExtensionResponse) o;
return Objects.equals(name, that.name);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Objects;

/**
* Generic boolean response indicating the status of some previous request sent to the SDK
*
* @opensearch.internal
*/
public class ExtensionBooleanResponse extends TransportResponse {
Copy link
Collaborator

@reta reta Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is yet another variation if .opensearch.index.AcknowledgedResponse (I see it is removed), org.opensearch.action.support.master.AcknowledgedResponse ? May be we could (re)use the same pattern? Because ExtensionBooleanResponse is not saying anything about what its usage is. At least with acknowledged it reads as "request was accepted".


private final boolean status;

/**
* @param status Boolean indicating the status of the parse request sent to the SDK
*/
public ExtensionBooleanResponse(boolean status) {
this.status = status;
}

public ExtensionBooleanResponse(StreamInput in) throws IOException {
super(in);
this.status = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(status);
}

@Override
public String toString() {
return "ExtensionBooleanResponse{" + "status=" + this.status + "}";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExtensionBooleanResponse that = (ExtensionBooleanResponse) o;
return Objects.equals(this.status, that.status);
}

@Override
public int hashCode() {
return Objects.hash(status);
}

/**
* Returns a boolean indicating the success of the request sent to the SDK
*/
public boolean getStatus() {
return this.status;
}

}
Loading