Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] Extensibility support for getNamedWriteables extension point #3925

Merged
merged 25 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9e31e52
extensibility support for getNamedWriteables extension point. Adding …
joshpalis Jul 15, 2022
bbfc81d
removing string response from parse request, this was added only for …
joshpalis Jul 15, 2022
3d3c113
fixing syntax errors
joshpalis Jul 15, 2022
1c6906f
modified logs for NamedWriteableRegistryRequest
joshpalis Jul 15, 2022
62fe9f3
addressing PR comments
joshpalis Jul 19, 2022
d64f378
addressing PR comments
joshpalis Jul 20, 2022
07df663
Merge branch 'feature/extensions' of https://github.com/opensearch-pr…
joshpalis Jul 21, 2022
1c99994
fixing gradle precommit issues, fixed logger ussage errors
joshpalis Jul 21, 2022
34c2a5e
Fixing all code comment format
joshpalis Jul 21, 2022
9800380
added unit tests, added more javadocs, fixed handleException message …
joshpalis Jul 22, 2022
72da0f5
Addressing PR comments, adding logs for eacch request sent to SDK, re…
joshpalis Jul 23, 2022
d373035
addressing PR comments, added additional unit tests for NamedWriteabl…
joshpalis Jul 25, 2022
d502edb
updating response handler test to check if ExtensionReader callback i…
joshpalis Jul 25, 2022
65fd479
Updated ExtensionReader to use a StreamInput object rather than a byt…
joshpalis Jul 26, 2022
479e094
fixing gradle precommit issues, spotless java check
joshpalis Jul 26, 2022
b193229
modified ExtensdionReader functional interface, abstracted context in…
joshpalis Jul 27, 2022
8580a84
Modified Parse Request to take in a class instead of the fully qualif…
joshpalis Jul 27, 2022
b9f4a80
Updated ExtensionsOrchestratorTests, moved extensionYmlLines and exte…
joshpalis Jul 29, 2022
467182e
linking additional issue to TODO
joshpalis Aug 1, 2022
bdc558c
using extensionsInitializedList instead of extensionsList to ensure t…
joshpalis Aug 1, 2022
d3d52a4
Merge branch 'feature/extensions' into namedwriteables
joshpalis Aug 2, 2022
861daa4
updated tests to pass, after changing extension orchestrator to use t…
joshpalis Aug 2, 2022
4bc44c2
Merge branch 'feature/extensions' into namedwriteables
joshpalis Aug 3, 2022
39ab25c
addressing PR comments, moving named writeable registry support from …
joshpalis Aug 3, 2022
cd222aa
cleaning up comments/ logs, modified extensionsInitialized to be asyn…
joshpalis Aug 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressing PR comments, adding logs for eacch request sent to SDK, re…
…moved IndicesModuleNameResponse, replaced with ExtensionBooleanResponse, refactored ExtensionOrchestrator, moved NamedWriteableRegistry response handler to separate file, moved parseNamedWriteable callback method to response handler file, updated unit tests to reflect these changes

Signed-off-by: Joshua Palis <jpalis@amazon.com>
  • Loading branch information
joshpalis committed Jul 23, 2022
commit 72da0f5da254b943420f69ba63f610fb0d3352fb
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
*
* @opensearch.internal
*/
public class BooleanResponse extends TransportResponse {
public class ExtensionBooleanResponse extends TransportResponse {

private final boolean status;

/**
* @param status Boolean indicating the status of the parse request sent to the SDK
*/
public BooleanResponse(boolean status) {
public ExtensionBooleanResponse(boolean status) {
this.status = status;
}

public BooleanResponse(StreamInput in) throws IOException {
public ExtensionBooleanResponse(StreamInput in) throws IOException {
super(in);
this.status = in.readBoolean();
}
Expand All @@ -42,14 +42,14 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return "BooleanResponse{" + "status=" + this.status + "}";
return "ExtensionBooleanResponse{" + "status=" + this.status + "}";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BooleanResponse that = (BooleanResponse) o;
ExtensionBooleanResponse that = (ExtensionBooleanResponse) o;
return Objects.equals(this.status, that.status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest;
import org.opensearch.common.io.stream.NamedWriteableRegistryResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
Expand All @@ -45,7 +43,6 @@
import org.opensearch.extensions.ExtensionsSettings.Extension;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndicesModuleNameResponse;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.index.shard.IndexEventListener;
Expand Down Expand Up @@ -258,6 +255,7 @@ public String executor() {
}
};
try {
logger.info("Sending extension request type: " + REQUEST_EXTENSION_ACTION_NAME);
transportService.connectToNode(extensionNode, true);
transportService.sendRequest(
extensionNode,
Expand Down Expand Up @@ -316,90 +314,13 @@ public void getNamedWriteables() {
*/
public Map<DiscoveryNode, Map<Class, Map<String, ExtensionReader>>> getNamedWriteables(DiscoveryNode extensionNode)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method have to be public?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I target this method within my unit tests since extensionsInitializedList cannot be populated unless extensionsInitialize() occurs. The method which calls this iterates through initialized discovery nodes and sends a request to the extension associated with the node, therefore, in order to test this, I would have to call this method directly, which is only possible if it is public

throws UnknownHostException {

// Initialize map of entries for this extension to return
final Map<DiscoveryNode, Map<Class, Map<String, ExtensionReader>>> extensionRegistry = new HashMap<>();

final TransportResponseHandler<NamedWriteableRegistryResponse> namedWriteableRegistryResponseHandler = new TransportResponseHandler<
NamedWriteableRegistryResponse>() {

@Override
public NamedWriteableRegistryResponse read(StreamInput in) throws IOException {
return new NamedWriteableRegistryResponse(in);
}

@Override
public void handleResponse(NamedWriteableRegistryResponse response) {

logger.info("response {}", response);
logger.info("EXTENSION [" + extensionNode.getName() + "] returned " + response.getRegistry().size() + " entries");

if (response.getRegistry().isEmpty() == false) {

// Extension has sent over entries to register, initialize inner category map
Map<Class, Map<String, ExtensionReader>> categoryMap = new HashMap<>();

// Reader map associated with this current category
Map<String, ExtensionReader> readers = null;
Class currentCategory = null;

// Extract response entries and process fully qualified class name into category class instance
for (Map.Entry<String, Class> entry : response.getRegistry().entrySet()) {

String name = entry.getKey();
Class categoryClass = entry.getValue();
if (currentCategory != categoryClass) {
// After first pass, readers and current category are set
if (currentCategory != null) {
categoryMap.put(currentCategory, readers);
}
readers = new HashMap<>();
currentCategory = categoryClass;
}

// Add name and callback method reference to inner reader map,
ExtensionReader newReader = (en, cc, context) -> parseNamedWriteable(en, cc, context);

// Validate that name has not yet been associated with a callback method
ExtensionReader oldReader = readers.put(name, newReader);
if (oldReader != null && oldReader.getClass() != null) {
throw new IllegalArgumentException(
"NamedWriteable ["
+ currentCategory.getName()
+ "]["
+ name
+ "]"
+ " is already registered for ["
+ oldReader.getClass().getName()
+ "],"
+ " cannot register ["
+ newReader.getClass().getName()
+ "]"
);
}
}

// Handle last category and reader entry
categoryMap.put(currentCategory, readers);

// Attach extension node to categoryMap
extensionRegistry.put(extensionNode, categoryMap);
}
}

@Override
public void handleException(TransportException exp) {
logger.error(new ParameterizedMessage("OpenSearchRequest failed"), exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};

logger.info("Sending extension request type: " + REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY);
NamedWriteableRegistryResponseHandler namedWriteableRegistryResponseHandler = new NamedWriteableRegistryResponseHandler(
extensionNode,
transportService,
REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE
);
try {
logger.info("Sending extension request type: " + REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY);
transportService.sendRequest(
extensionNode,
REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY,
Expand All @@ -410,7 +331,7 @@ public String executor() {
logger.error(e.toString());
}

return extensionRegistry;
return namedWriteableRegistryResponseHandler.getExtensionRegistry();
}

/**
Expand Down Expand Up @@ -462,30 +383,6 @@ public ExtensionReader getExtensionReader(DiscoveryNode extensionNode, Class cat
return reader;
}

/**
* Transports a byte array and associated category class to the given extension, identified by its discovery node
*
* @param extensionNode Discovery Node identifying the extension associated with the category class and name
* @param categoryClass Class that the Writeable object extends
* @param context Byte array generated from a {@link StreamInput} object to transport to the extension
* @throws UnknownHostException if connection to the extension node failed
*/
public void parseNamedWriteable(DiscoveryNode extensionNode, Class categoryClass, byte[] context) throws UnknownHostException {
logger.info("Sending extension request type: " + REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE);
NamedWriteableRegistryParseResponseHandler namedWriteableRegistryParseResponseHandler =
new NamedWriteableRegistryParseResponseHandler();
try {
transportService.sendRequest(
extensionNode,
REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE,
new NamedWriteableRegistryParseRequest(categoryClass.getName(), context),
namedWriteableRegistryParseResponseHandler
);
} catch (Exception e) {
logger.error(e.toString());
}
}

public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
for (DiscoveryNode extensionNode : extensionsList) {
onIndexModule(indexModule, extensionNode);
Expand All @@ -497,10 +394,10 @@ private void onIndexModule(IndexModule indexModule, DiscoveryNode extensionNode)
final CountDownLatch inProgressLatch = new CountDownLatch(1);
final CountDownLatch inProgressIndexNameLatch = new CountDownLatch(1);

final TransportResponseHandler<IndicesModuleNameResponse> indicesModuleNameResponseHandler = new TransportResponseHandler<
IndicesModuleNameResponse>() {
final TransportResponseHandler<ExtensionBooleanResponse> indicesModuleNameResponseHandler = new TransportResponseHandler<
ExtensionBooleanResponse>() {
@Override
public void handleResponse(IndicesModuleNameResponse response) {
public void handleResponse(ExtensionBooleanResponse response) {
logger.info("ACK Response" + response);
inProgressIndexNameLatch.countDown();
}
Expand All @@ -516,8 +413,8 @@ public String executor() {
}

@Override
public IndicesModuleNameResponse read(StreamInput in) throws IOException {
return new IndicesModuleNameResponse(in);
public ExtensionBooleanResponse read(StreamInput in) throws IOException {
return new ExtensionBooleanResponse(in);
}

};
Expand All @@ -544,7 +441,7 @@ public void beforeIndexRemoved(
String indexName = indexService.index().getName();
logger.info("Index Name" + indexName.toString());
try {
logger.info("Sending request of index name to extension");
logger.info("Sending extension request type: " + INDICES_EXTENSION_NAME_ACTION_NAME);
transportService.sendRequest(
extensionNode,
INDICES_EXTENSION_NAME_ACTION_NAME,
Expand Down Expand Up @@ -578,7 +475,7 @@ public String executor() {
};

try {
logger.info("Sending request to extension");
logger.info("Sending extension request type: " + INDICES_EXTENSION_POINT_ACTION_NAME);
transportService.sendRequest(
extensionNode,
INDICES_EXTENSION_POINT_ACTION_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
*
* @opensearch.internal
*/
public class NamedWriteableRegistryParseResponseHandler implements TransportResponseHandler<BooleanResponse> {
public class NamedWriteableRegistryParseResponseHandler implements TransportResponseHandler<ExtensionBooleanResponse> {
private static final Logger logger = LogManager.getLogger(NamedWriteableRegistryParseResponseHandler.class);

@Override
public BooleanResponse read(StreamInput in) throws IOException {
return new BooleanResponse(in);
public ExtensionBooleanResponse read(StreamInput in) throws IOException {
return new ExtensionBooleanResponse(in);
}

@Override
public void handleResponse(BooleanResponse response) {
public void handleResponse(ExtensionBooleanResponse response) {
logger.info("response {}", response.getStatus());
}

Expand Down
Loading