Skip to content

Commit

Permalink
Added extension Points, initial REST implementation and registering T…
Browse files Browse the repository at this point in the history
…ransport Actions for extensions (#5518)

* Merge second batch of feature/extensions into main

Signed-off-by: Ryan Bogan <rbogan@amazon.com>

* Update CHANGELOG

Signed-off-by: Ryan Bogan <rbogan@amazon.com>

* Change logger level from info to bug for one call

Signed-off-by: Ryan Bogan <rbogan@amazon.com>

* Removed extension NamedWriteableRegistry implementation and added TODO comments

Signed-off-by: Ryan Bogan <rbogan@amazon.com>

* Rename ExtensionBooleanResponse to AcknowledgedResponse

Signed-off-by: Ryan Bogan <rbogan@amazon.com>

Signed-off-by: Ryan Bogan <rbogan@amazon.com>
Co-authored-by: Daniel (dB.) Doubrovkine <dblock@amazon.com>
  • Loading branch information
ryanbogan and dblock authored Dec 16, 2022
1 parent b782299 commit 0f520f6
Show file tree
Hide file tree
Showing 23 changed files with 1,599 additions and 238 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211))
- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366))
- Added experimental extensions to main ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347))
- Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495))
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,58 @@
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* PluginRequest to intialize plugin
* InitializeExtensionRequest to intialize plugin
*
* @opensearch.internal
*/
public class PluginRequest extends TransportRequest {
public class InitializeExtensionRequest extends TransportRequest {
private final DiscoveryNode sourceNode;
/*
* TODO change DiscoveryNode to Extension information
*/
private final List<DiscoveryExtensionNode> extensions;
private final DiscoveryExtensionNode extension;

public PluginRequest(DiscoveryNode sourceNode, List<DiscoveryExtensionNode> extensions) {
public InitializeExtensionRequest(DiscoveryNode sourceNode, DiscoveryExtensionNode extension) {
this.sourceNode = sourceNode;
this.extensions = extensions;
this.extension = extension;
}

public PluginRequest(StreamInput in) throws IOException {
public InitializeExtensionRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
extensions = in.readList(DiscoveryExtensionNode::new);
extension = new DiscoveryExtensionNode(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
out.writeList(extensions);
}

public List<DiscoveryExtensionNode> getExtensions() {
return extensions;
extension.writeTo(out);
}

public DiscoveryNode getSourceNode() {
return sourceNode;
}

public DiscoveryExtensionNode getExtension() {
return extension;
}

@Override
public String toString() {
return "PluginRequest{" + "sourceNode=" + sourceNode + ", extensions=" + extensions + '}';
return "InitializeExtensionsRequest{" + "sourceNode=" + sourceNode + ", extension=" + extension + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PluginRequest that = (PluginRequest) o;
return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(extensions, that.extensions);
InitializeExtensionRequest that = (InitializeExtensionRequest) o;
return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(extension, that.extension);
}

@Override
public int hashCode() {
return Objects.hash(sourceNode, extensions);
return Objects.hash(sourceNode, extension);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@
*
* @opensearch.internal
*/
public class PluginResponse extends TransportResponse {
public class InitializeExtensionResponse extends TransportResponse {
private String name;

public PluginResponse(String name) {
public InitializeExtensionResponse(String name) {
this.name = name;
}

public PluginResponse(StreamInput in) throws IOException {
public InitializeExtensionResponse(StreamInput in) throws IOException {
name = in.readString();
}

Expand All @@ -77,7 +77,7 @@ public String toString() {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PluginResponse that = (PluginResponse) o;
InitializeExtensionResponse that = (InitializeExtensionResponse) o;
return Objects.equals(name, that.name);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Objects;

/**
* Generic boolean response indicating the status of some previous request sent to the SDK
*
* @opensearch.internal
*/
public class AcknowledgedResponse extends TransportResponse {

private final boolean status;

/**
* @param status Boolean indicating the status of the parse request sent to the SDK
*/
public AcknowledgedResponse(boolean status) {
this.status = status;
}

public AcknowledgedResponse(StreamInput in) throws IOException {
super(in);
this.status = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(status);
}

@Override
public String toString() {
return "AcknowledgedResponse{" + "status=" + this.status + "}";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AcknowledgedResponse that = (AcknowledgedResponse) o;
return Objects.equals(this.status, that.status);
}

@Override
public int hashCode() {
return Objects.hash(status);
}

/**
* Returns a boolean indicating the success of the request sent to the SDK
*/
public boolean getStatus() {
return this.status;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import java.net.UnknownHostException;
import org.opensearch.cluster.node.DiscoveryNode;

/**
* Reference to a method that transports a parse request to an extension. By convention, this method takes
* a category class used to identify the reader defined within the JVM that the extension is running on.
* Additionally, this method takes in the extension's corresponding DiscoveryNode and a byte array (context) that the
* extension's reader will be applied to.
*
* By convention the extensions' reader is a constructor that takes StreamInput as an argument for most classes and a static method for things like enums.
* Classes will implement this via a constructor (or a static method in the case of enumerations), it's something that should
* look like:
* <pre><code>
* public MyClass(final StreamInput in) throws IOException {
* * this.someValue = in.readVInt();
* this.someMap = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
* }
* </code></pre>
*
* @opensearch.internal
*/
@FunctionalInterface
public interface ExtensionReader {

/**
* Transports category class, and StreamInput (context), to the extension identified by the Discovery Node
*
* @param extensionNode Discovery Node identifying the Extension
* @param categoryClass Super class that the reader extends
* @param context Some context to transport
* @throws UnknownHostException if the extension node host IP address could not be determined
*/
void parse(DiscoveryNode extensionNode, Class categoryClass, Object context) throws UnknownHostException;

}
Loading

0 comments on commit 0f520f6

Please sign in to comment.