From f40711ed12785afe672b2faade99baa93526e300 Mon Sep 17 00:00:00 2001 From: "Daniel (dB.) Doubrovkine" Date: Mon, 2 Oct 2023 17:51:47 -0400 Subject: [PATCH] Fix: register mulitple extensions. (#10256) * Fix: register mulitple extensions. Signed-off-by: dblock * Updated CHANGELOG. Signed-off-by: dblock * Added tests. Signed-off-by: dblock --------- Signed-off-by: dblock (cherry picked from commit 1d66af346f7863bbca44a3102a1af2fed5fd3804) Signed-off-by: dblock --- CHANGELOG.md | 1 + .../extensions/ExtensionsManager.java | 21 ++- .../rest/RestActionsRequestHandler.java | 3 + .../rest/RestInitializeExtensionAction.java | 3 +- .../rest/RestSendToExtensionAction.java | 2 +- .../extensions/ExtensionsManagerTests.java | 146 +++++++++++++++++- .../RestInitializeExtensionActionTests.java | 19 +-- .../rest/RestSendToExtensionActionTests.java | 6 +- 8 files changed, 176 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 71ad6465bef7b..4da4d287a19d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix broken backward compatibility from 2.7 for IndexSorted field indices ([#10045](https://github.com/opensearch-project/OpenSearch/pull/10045)) - Fix concurrent search NPE when track_total_hits, terminate_after and size=0 are used ([#10082](https://github.com/opensearch-project/OpenSearch/pull/10082)) - Fix remove ingest processor handing ignore_missing parameter not correctly ([10089](https://github.com/opensearch-project/OpenSearch/pull/10089)) +- Fix registration and initialization of multiple extensions ([10256](https://github.com/opensearch-project/OpenSearch/pull/10256)) ### Security diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 8073fb9cb7f56..3e71fb16c10ae 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -289,7 +289,7 @@ private void registerRequestHandler(DynamicActionRegistry dynamicActionRegistry) * Loads a single extension * @param extension The extension to be loaded */ - public void loadExtension(Extension extension) throws IOException { + public DiscoveryExtensionNode loadExtension(Extension extension) throws IOException { validateExtension(extension); DiscoveryExtensionNode discoveryExtensionNode = new DiscoveryExtensionNode( extension.getName(), @@ -303,6 +303,12 @@ public void loadExtension(Extension extension) throws IOException { extensionIdMap.put(extension.getUniqueId(), discoveryExtensionNode); extensionSettingsMap.put(extension.getUniqueId(), extension); logger.info("Loaded extension with uniqueId " + extension.getUniqueId() + ": " + extension); + return discoveryExtensionNode; + } + + public void initializeExtension(Extension extension) throws IOException { + DiscoveryExtensionNode node = loadExtension(extension); + initializeExtensionNode(node); } private void validateField(String fieldName, String value) throws IOException { @@ -329,11 +335,11 @@ private void validateExtension(Extension extension) throws IOException { */ public void initialize() { for (DiscoveryExtensionNode extension : extensionIdMap.values()) { - initializeExtension(extension); + initializeExtensionNode(extension); } } - private void initializeExtension(DiscoveryExtensionNode extension) { + public void initializeExtensionNode(DiscoveryExtensionNode extensionNode) { final CompletableFuture inProgressFuture = new CompletableFuture<>(); final TransportResponseHandler initializeExtensionResponseHandler = new TransportResponseHandler< @@ -373,7 +379,8 @@ public String executor() { transportService.getThreadPool().generic().execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - extensionIdMap.remove(extension.getId()); + logger.warn("Error registering extension: " + extensionNode.getId(), e); + extensionIdMap.remove(extensionNode.getId()); if (e.getCause() instanceof ConnectTransportException) { logger.info("No response from extension to request.", e); throw (ConnectTransportException) e.getCause(); @@ -388,11 +395,11 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { - transportService.connectToExtensionNode(extension); + transportService.connectToExtensionNode(extensionNode); transportService.sendRequest( - extension, + extensionNode, REQUEST_EXTENSION_ACTION_NAME, - new InitializeExtensionRequest(transportService.getLocalNode(), extension), + new InitializeExtensionRequest(transportService.getLocalNode(), extensionNode), initializeExtensionResponseHandler ); } diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestActionsRequestHandler.java b/server/src/main/java/org/opensearch/extensions/rest/RestActionsRequestHandler.java index b6d628ae9253f..18adc37156ca1 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestActionsRequestHandler.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestActionsRequestHandler.java @@ -58,6 +58,9 @@ public TransportResponse handleRegisterRestActionsRequest( DynamicActionRegistry dynamicActionRegistry ) throws Exception { DiscoveryExtensionNode discoveryExtensionNode = extensionIdMap.get(restActionsRequest.getUniqueId()); + if (discoveryExtensionNode == null) { + throw new IllegalStateException("Missing extension node for " + restActionsRequest.getUniqueId()); + } RestHandler handler = new RestSendToExtensionAction( restActionsRequest, discoveryExtensionNode, diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java index 4b622b841a040..fc7c21a6eccd6 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java @@ -159,8 +159,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client extAdditionalSettings ); try { - extensionsManager.loadExtension(extension); - extensionsManager.initialize(); + extensionsManager.initializeExtension(extension); } catch (CompletionException e) { Throwable cause = e.getCause(); if (cause instanceof TimeoutException) { diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java index d33ddd3848fe4..cb84d36380c3c 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java @@ -151,7 +151,7 @@ public RestSendToExtensionAction( @Override public String getName() { - return SEND_TO_EXTENSION_ACTION; + return this.discoveryExtensionNode.getId() + ":" + SEND_TO_EXTENSION_ACTION; } @Override diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 75327d71d54ca..0eceec31b06c0 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -36,6 +36,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.discovery.InitializeExtensionRequest; import org.opensearch.env.Environment; import org.opensearch.env.EnvironmentSettingsResponse; import org.opensearch.extensions.ExtensionsSettings.Extension; @@ -77,6 +78,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -408,19 +410,93 @@ public void testInitialize() throws Exception { ) ); - // Test needs to be changed to mock the connection between the local node and an extension. Assert statment is commented out for - // now. + // Test needs to be changed to mock the connection between the local node and an extension. // Link to issue: https://github.com/opensearch-project/OpenSearch/issues/4045 // mockLogAppender.assertAllExpectationsMatched(); } } + public void testInitializeExtension() throws Exception { + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); + + TransportService mockTransportService = spy( + new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> null, + null, + Collections.emptySet(), + NoopTracer.INSTANCE + ) + ); + + doNothing().when(mockTransportService).connectToExtensionNode(any(DiscoveryExtensionNode.class)); + + doNothing().when(mockTransportService) + .sendRequest(any(DiscoveryExtensionNode.class), anyString(), any(InitializeExtensionRequest.class), any()); + + extensionsManager.initializeServicesAndRestHandler( + actionModule, + settingsModule, + mockTransportService, + clusterService, + settings, + client + ); + + Extension firstExtension = new Extension( + "firstExtension", + "uniqueid1", + "127.0.0.0", + "9301", + "0.0.7", + "2.0.0", + "2.0.0", + List.of(), + null + ); + + extensionsManager.initializeExtension(firstExtension); + + Extension secondExtension = new Extension( + "secondExtension", + "uniqueid2", + "127.0.0.0", + "9301", + "0.0.7", + "2.0.0", + "2.0.0", + List.of(), + null + ); + + extensionsManager.initializeExtension(secondExtension); + + ThreadPool.terminate(threadPool, 3, TimeUnit.SECONDS); + + verify(mockTransportService, times(2)).connectToExtensionNode(any(DiscoveryExtensionNode.class)); + + verify(mockTransportService, times(2)).sendRequest( + any(DiscoveryExtensionNode.class), + anyString(), + any(InitializeExtensionRequest.class), + any() + ); + } + public void testHandleRegisterRestActionsRequest() throws Exception { ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; + + extensionsManager.loadExtension( + new Extension("firstExtension", uniqueIdStr, "127.0.0.0", "9300", "0.0.7", "2.8.0", "2.8.0", List.of(), null) + ); + List actionsList = List.of("GET /foo foo", "PUT /bar bar", "POST /baz baz"); List deprecatedActionsList = List.of("GET /deprecated/foo foo_deprecated", "It's deprecated!"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList, deprecatedActionsList); @@ -430,6 +506,58 @@ public void testHandleRegisterRestActionsRequest() throws Exception { assertTrue(((AcknowledgedResponse) response).getStatus()); } + public void testHandleRegisterRestActionsRequestRequiresDiscoveryNode() throws Exception { + + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); + initialize(extensionsManager); + + RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest("uniqueId1", List.of(), List.of()); + + expectThrows( + IllegalStateException.class, + () -> extensionsManager.getRestActionsRequestHandler() + .handleRegisterRestActionsRequest(registerActionsRequest, actionModule.getDynamicActionRegistry()) + ); + } + + public void testHandleRegisterRestActionsRequestMultiple() throws Exception { + + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); + initialize(extensionsManager); + + List actionsList = List.of("GET /foo foo", "PUT /bar bar", "POST /baz baz"); + List deprecatedActionsList = List.of("GET /deprecated/foo foo_deprecated", "It's deprecated!"); + for (int i = 0; i < 2; i++) { + String uniqueIdStr = "uniqueid-%d" + i; + + Set> additionalSettings = extAwarePlugin.getExtensionSettings().stream().collect(Collectors.toSet()); + ExtensionScopedSettings extensionScopedSettings = new ExtensionScopedSettings(additionalSettings); + Extension firstExtension = new Extension( + "Extension %s" + i, + uniqueIdStr, + "127.0.0.0", + "9300", + "0.0.7", + "2.8.0", + "2.8.0", + List.of(), + extensionScopedSettings + ); + + extensionsManager.loadExtension(firstExtension); + + RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest( + uniqueIdStr, + actionsList, + deprecatedActionsList + ); + TransportResponse response = extensionsManager.getRestActionsRequestHandler() + .handleRegisterRestActionsRequest(registerActionsRequest, actionModule.getDynamicActionRegistry()); + assertEquals(AcknowledgedResponse.class, response.getClass()); + assertTrue(((AcknowledgedResponse) response).getStatus()); + } + } + public void testHandleRegisterSettingsRequest() throws Exception { ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); @@ -451,6 +579,9 @@ public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Excep initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; + extensionsManager.loadExtension( + new Extension("firstExtension", uniqueIdStr, "127.0.0.0", "9300", "0.0.7", "2.8.0", "2.8.0", List.of(), null) + ); List actionsList = List.of("FOO /foo", "PUT /bar", "POST /baz"); List deprecatedActionsList = List.of("GET /deprecated/foo", "It's deprecated!"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList, deprecatedActionsList); @@ -466,6 +597,9 @@ public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedMethod() th initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; + extensionsManager.loadExtension( + new Extension("firstExtension", uniqueIdStr, "127.0.0.0", "9300", "0.0.7", "2.8.0", "2.8.0", List.of(), null) + ); List actionsList = List.of("GET /foo", "PUT /bar", "POST /baz"); List deprecatedActionsList = List.of("FOO /deprecated/foo", "It's deprecated!"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList, deprecatedActionsList); @@ -480,6 +614,9 @@ public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exceptio ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; + extensionsManager.loadExtension( + new Extension("firstExtension", uniqueIdStr, "127.0.0.0", "9300", "0.0.7", "2.8.0", "2.8.0", List.of(), null) + ); List actionsList = List.of("GET", "PUT /bar", "POST /baz"); List deprecatedActionsList = List.of("GET /deprecated/foo", "It's deprecated!"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList, deprecatedActionsList); @@ -494,6 +631,9 @@ public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedUri() throw ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; + extensionsManager.loadExtension( + new Extension("firstExtension", uniqueIdStr, "127.0.0.0", "9300", "0.0.7", "2.8.0", "2.8.0", List.of(), null) + ); List actionsList = List.of("GET /foo", "PUT /bar", "POST /baz"); List deprecatedActionsList = List.of("GET", "It's deprecated!"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList, deprecatedActionsList); @@ -786,7 +926,7 @@ public void testIncompatibleExtensionRegistration() throws IOException { "127.0.0.0", "9300", "0.0.7", - "3.0.0", + "2.8.0", "3.99.0", List.of(), null diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java index 87767978147cd..47239f30d10e5 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java @@ -19,8 +19,9 @@ import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.extensions.DiscoveryExtensionNode; import org.opensearch.extensions.ExtensionsManager; -import org.opensearch.extensions.ExtensionsSettings; +import org.opensearch.extensions.ExtensionsSettings.Extension; import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; @@ -156,8 +157,8 @@ public void testRestInitializeExtensionActionResponseWithAdditionalSettings() th // optionally, you can stub out some methods: when(spy.getAdditionalSettings()).thenCallRealMethod(); - Mockito.doCallRealMethod().when(spy).loadExtension(any(ExtensionsSettings.Extension.class)); - Mockito.doNothing().when(spy).initialize(); + Mockito.doCallRealMethod().when(spy).loadExtension(any(Extension.class)); + Mockito.doNothing().when(spy).initializeExtensionNode(any(DiscoveryExtensionNode.class)); RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(spy); final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"ad-extension\",\"hostAddress\":\"127.0.0.1\"," + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" @@ -173,10 +174,10 @@ public void testRestInitializeExtensionActionResponseWithAdditionalSettings() th FakeRestChannel channel = new FakeRestChannel(request, false, 0); restInitializeExtensionAction.handleRequest(request, channel, null); - assertEquals(channel.capturedResponse().status(), RestStatus.ACCEPTED); + assertEquals(RestStatus.ACCEPTED, channel.capturedResponse().status()); assertTrue(channel.capturedResponse().content().utf8ToString().contains("A request to initialize an extension has been sent.")); - Optional extension = spy.lookupExtensionSettingsById("ad-extension"); + Optional extension = spy.lookupExtensionSettingsById("ad-extension"); assertTrue(extension.isPresent()); assertEquals(true, extension.get().getAdditionalSettings().get(boolSetting)); assertEquals("customSetting", extension.get().getAdditionalSettings().get(stringSetting)); @@ -203,8 +204,8 @@ public void testRestInitializeExtensionActionResponseWithAdditionalSettingsUsing // optionally, you can stub out some methods: when(spy.getAdditionalSettings()).thenCallRealMethod(); - Mockito.doCallRealMethod().when(spy).loadExtension(any(ExtensionsSettings.Extension.class)); - Mockito.doNothing().when(spy).initialize(); + Mockito.doCallRealMethod().when(spy).loadExtension(any(Extension.class)); + Mockito.doNothing().when(spy).initializeExtensionNode(any(DiscoveryExtensionNode.class)); RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(spy); final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"ad-extension\",\"hostAddress\":\"127.0.0.1\"," + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" @@ -220,10 +221,10 @@ public void testRestInitializeExtensionActionResponseWithAdditionalSettingsUsing FakeRestChannel channel = new FakeRestChannel(request, false, 0); restInitializeExtensionAction.handleRequest(request, channel, null); - assertEquals(channel.capturedResponse().status(), RestStatus.ACCEPTED); + assertEquals(RestStatus.ACCEPTED, channel.capturedResponse().status()); assertTrue(channel.capturedResponse().content().utf8ToString().contains("A request to initialize an extension has been sent.")); - Optional extension = spy.lookupExtensionSettingsById("ad-extension"); + Optional extension = spy.lookupExtensionSettingsById("ad-extension"); assertTrue(extension.isPresent()); assertEquals(false, extension.get().getAdditionalSettings().get(boolSetting)); assertEquals("default", extension.get().getAdditionalSettings().get(stringSetting)); diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java index ea6c44d8c36ba..e1a8620f35f05 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java @@ -147,7 +147,7 @@ public void testRestSendToExtensionAction() throws Exception { dynamicActionRegistry ); - assertEquals("send_to_extension_action", restSendToExtensionAction.getName()); + assertEquals("uniqueid1:send_to_extension_action", restSendToExtensionAction.getName()); List expected = new ArrayList<>(); String uriPrefix = "/_extensions/_uniqueid1"; expected.add(new Route(Method.GET, uriPrefix + "/foo")); @@ -179,7 +179,7 @@ public void testRestSendToExtensionActionWithNamedRoute() throws Exception { dynamicActionRegistry ); - assertEquals("send_to_extension_action", restSendToExtensionAction.getName()); + assertEquals("uniqueid1:send_to_extension_action", restSendToExtensionAction.getName()); List expected = new ArrayList<>(); String uriPrefix = "/_extensions/_uniqueid1"; NamedRoute nr1 = new NamedRoute.Builder().method(Method.GET).path(uriPrefix + "/foo").uniqueName("foo").build(); @@ -224,7 +224,7 @@ public void testRestSendToExtensionActionWithNamedRouteAndLegacyActionName() thr dynamicActionRegistry ); - assertEquals("send_to_extension_action", restSendToExtensionAction.getName()); + assertEquals("uniqueid1:send_to_extension_action", restSendToExtensionAction.getName()); List expected = new ArrayList<>(); String uriPrefix = "/_extensions/_uniqueid1"; NamedRoute nr1 = new NamedRoute.Builder().method(Method.GET)