Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into feature/inflight-cancella…
Browse files Browse the repository at this point in the history
…tion-core
  • Loading branch information
ketanv3 authored Oct 14, 2022
2 parents 7f8ae89 + 5d7d83e commit 2421979
Show file tree
Hide file tree
Showing 49 changed files with 736 additions and 67 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Bumps `jempbox` from 1.8.16 to 1.8.17 ([#4550](https://github.com/opensearch-project/OpenSearch/pull/4550))
- Bumps `hadoop-hdfs` from 3.3.3 to 3.3.4 ([#4644](https://github.com/opensearch-project/OpenSearch/pull/4644))
- Bumps `jna` from 5.11.0 to 5.12.1 ([#4656](https://github.com/opensearch-project/OpenSearch/pull/4656))
- Update Jackson Databind to 2.13.4.2 (addressing CVE-2022-42003) ([#4779](https://github.com/opensearch-project/OpenSearch/pull/4779))
### Changed
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
Expand All @@ -69,6 +70,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- GET api for weighted shard routing([#4275](https://github.com/opensearch-project/OpenSearch/pull/4275/))
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
- Delete api for weighted shard routing([#4400](https://github.com/opensearch-project/OpenSearch/pull/4400/))
- Further simplification of the ZIP publication implementation ([#4360](https://github.com/opensearch-project/OpenSearch/pull/4360))
- Relax visibility of the HTTP_CHANNEL_KEY and HTTP_SERVER_CHANNEL_KEY to make it possible for the plugins to access associated Netty4HttpChannel / Netty4HttpServerChannel instance ([#4638](https://github.com/opensearch-project/OpenSearch/pull/4638))
- Load the deprecated master role in a dedicated method instead of in setAdditionalRoles() ([#4582](https://github.com/opensearch-project/OpenSearch/pull/4582))
Expand All @@ -78,6 +80,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Update to Apache Lucene 9.4.0 ([#4661](https://github.com/opensearch-project/OpenSearch/pull/4661))
- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590))
- Backport Apache Lucene version change for 2.4.0 ([#4677](https://github.com/opensearch-project/OpenSearch/pull/4677))
- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725))
- Fix weighted routing metadata deserialization error on process restart ([#4691](https://github.com/opensearch-project/OpenSearch/pull/4691))
- Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732))
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
Expand Down Expand Up @@ -130,6 +133,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix new race condition in DecommissionControllerTests ([4688](https://github.com/opensearch-project/OpenSearch/pull/4688))
- Fix SearchStats (de)serialization (caused by https://github.com/opensearch-project/OpenSearch/pull/4616) ([#4697](https://github.com/opensearch-project/OpenSearch/pull/4697))
- Fixing Gradle warnings associated with publishPluginZipPublicationToXxx tasks ([#4696](https://github.com/opensearch-project/OpenSearch/pull/4696))
- Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774))
- Update version check after backport ([4786](https://github.com/opensearch-project/OpenSearch/pull/4786))
### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))

Expand Down
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ bundled_jdk = 17.0.4+8
spatial4j = 0.7
jts = 1.15.0
jackson = 2.13.4
jackson_databind = 2.13.4
jackson_databind = 2.13.4.2
snakeyaml = 1.32
icu4j = 70.1
supercsv = 2.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ public void testApiNamingConventions() throws Exception {
"remote_store.restore",
"cluster.put_weighted_routing",
"cluster.get_weighted_routing",
"cluster.delete_weighted_routing",
"cluster.put_decommission_awareness",
"cluster.get_decommission_awareness",
"cluster.delete_decommission_awareness", };
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
325c06bdfeb628cfb80ebaaf1a26cc1eb558a585

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
325c06bdfeb628cfb80ebaaf1a26cc1eb558a585

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
325c06bdfeb628cfb80ebaaf1a26cc1eb558a585

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
325c06bdfeb628cfb80ebaaf1a26cc1eb558a585

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
325c06bdfeb628cfb80ebaaf1a26cc1eb558a585

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
325c06bdfeb628cfb80ebaaf1a26cc1eb558a585

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"cluster.delete_weighted_routing": {
"documentation": {
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/delete",
"description": "Delete weighted shard routing weights"
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_cluster/routing/awareness/weights",
"methods": [
"DELETE"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.cluster.routing;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -284,4 +285,67 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}

public void testDeleteWeightedRouting_WeightsNotSet() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}

public void testDeleteWeightedRouting_WeightsAreSet() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

// put api call to set weights
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.junit.Assert;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase {
@Override
protected int numberOfReplicas() {
return 2;
}

public void testSearchWithWRRShardRouting() throws IOException {
Settings commonSettings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

logger.info("--> starting 6 nodes on different zones");
List<String> nodes = internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

String A_0 = nodes.get(0);
String B_0 = nodes.get(1);
String B_1 = nodes.get(2);
String A_1 = nodes.get(3);
String C_0 = nodes.get(4);
String C_1 = nodes.get(5);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("6").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 10).put("index.number_of_replicas", 2))
);
ensureGreen();
logger.info("--> creating indices for test");
for (int i = 0; i < 100; i++) {
client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
}

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

Set<String> hitNodes = new HashSet<>();
// making search requests
for (int i = 0; i < 50; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(A_0, A_1, B_0, B_1))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}
// search should not go to nodes in zone c
assertThat(hitNodes.size(), lessThanOrEqualTo(4));
DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();
List<String> nodeIdsFromZoneWithWeightZero = new ArrayList<>();
for (DiscoveryNode node : dataNodes) {
if (node.getAttributes().get("zone").equals("c")) {
nodeIdsFromZoneWithWeightZero.add(node.getId());
}
}
for (String nodeId : nodeIdsFromZoneWithWeightZero) {
assertFalse(hitNodes.contains(nodeId));
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (stat.getNode().getAttributes().get("zone").equals("c")) {
assertEquals(0, searchStats.getQueryCount());
assertEquals(0, searchStats.getFetchCount());

} else {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
}
}

logger.info("--> deleted shard routing weights for weighted round robin");

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertEquals(deleteResponse.isAcknowledged(), true);

hitNodes = new HashSet<>();
// making search requests
for (int i = 0; i < 100; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(A_0, A_1, B_0, B_1))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}

// Check shard routing requests hit data nodes in zone c
for (String nodeId : nodeIdsFromZoneWithWeightZero) {
assertFalse(!hitNodes.contains(nodeId));
}
nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();

for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
}
}

}
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.TransportDeleteWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.TransportGetWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction;
Expand Down Expand Up @@ -302,6 +304,7 @@
import org.opensearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction;
import org.opensearch.rest.action.admin.cluster.RestCloneSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.opensearch.rest.action.admin.cluster.RestClusterDeleteWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction;
Expand Down Expand Up @@ -580,6 +583,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class);
actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
Expand Down Expand Up @@ -766,8 +770,10 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestCloseIndexAction());
registerHandler.accept(new RestOpenIndexAction());
registerHandler.accept(new RestAddIndexBlockAction());

registerHandler.accept(new RestClusterPutWeightedRoutingAction());
registerHandler.accept(new RestClusterGetWeightedRoutingAction());
registerHandler.accept(new RestClusterDeleteWeightedRoutingAction());

registerHandler.accept(new RestUpdateSettingsAction());
registerHandler.accept(new RestGetSettingsAction());
Expand Down
Loading

0 comments on commit 2421979

Please sign in to comment.