Skip to content

Commit

Permalink
[BUG] Reconstruct pit infos when deserialize GetAllPitNodesResponse (o…
Browse files Browse the repository at this point in the history
…pensearch-project#9410)

* [BUG] Reconstruct pit infos when deserialize GetAllPitNodesResponse

Signed-off-by: panguixin <panguixin@bytedance.com>

* add searializion test case

Signed-off-by: panguixin <panguixin@bytedance.com>

* run spotless

Signed-off-by: panguixin <panguixin@bytedance.com>

---------

Signed-off-by: panguixin <panguixin@bytedance.com>
  • Loading branch information
bugmakerrrrrr authored Aug 23, 2023
1 parent 9272aa2 commit 980bf3c
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public class GetAllPitNodesResponse extends BaseNodesResponse<GetAllPitNodeRespo

public GetAllPitNodesResponse(StreamInput in) throws IOException {
super(in);
Set<String> uniquePitIds = new HashSet<>();
pitInfos.addAll(
getNodes().stream()
.flatMap(p -> p.getPitInfos().stream().filter(t -> uniquePitIds.add(t.getPitId())))
.collect(Collectors.toList())
);
}

public GetAllPitNodesResponse(
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/action/search/ListPitInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg;

Expand Down Expand Up @@ -80,4 +81,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ListPitInfo that = (ListPitInfo) o;
return pitId.equals(that.pitId) && creationTime == that.creationTime && keepAlive == that.keepAlive;
}

@Override
public int hashCode() {
return Objects.hash(pitId, creationTime, keepAlive);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.Version;
import org.opensearch.action.FailedNodeException;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.TransportException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;

public class GetAllPitNodesResponseTests extends OpenSearchTestCase {
protected void assertEqualInstances(GetAllPitNodesResponse expected, GetAllPitNodesResponse actual) {
assertNotSame(expected, actual);
Set<ListPitInfo> expectedPitInfos = new HashSet<>(expected.getPitInfos());
Set<ListPitInfo> actualPitInfos = new HashSet<>(actual.getPitInfos());
assertEquals(expectedPitInfos, actualPitInfos);

List<GetAllPitNodeResponse> expectedResponses = expected.getNodes();
List<GetAllPitNodeResponse> actualResponses = actual.getNodes();
assertEquals(expectedResponses.size(), actualResponses.size());
for (int i = 0; i < expectedResponses.size(); i++) {
assertEquals(expectedResponses.get(i).getNode(), actualResponses.get(i).getNode());
Set<ListPitInfo> expectedNodePitInfos = new HashSet<>(expectedResponses.get(i).getPitInfos());
Set<ListPitInfo> actualNodePitInfos = new HashSet<>(actualResponses.get(i).getPitInfos());
assertEquals(expectedNodePitInfos, actualNodePitInfos);
}

List<FailedNodeException> expectedFailures = expected.failures();
List<FailedNodeException> actualFailures = actual.failures();
assertEquals(expectedFailures.size(), actualFailures.size());
for (int i = 0; i < expectedFailures.size(); i++) {
assertEquals(expectedFailures.get(i).nodeId(), actualFailures.get(i).nodeId());
assertEquals(expectedFailures.get(i).getMessage(), actualFailures.get(i).getMessage());
assertEquals(expectedFailures.get(i).getCause().getClass(), actualFailures.get(i).getCause().getClass());
}
}

protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.emptyList());
}

public void testSerialization() throws IOException {
GetAllPitNodesResponse response = createTestItem();
GetAllPitNodesResponse deserialized = copyWriteable(response, getNamedWriteableRegistry(), GetAllPitNodesResponse::new);
assertEqualInstances(response, deserialized);
}

private GetAllPitNodesResponse createTestItem() {
int numNodes = randomIntBetween(1, 10);
int numPits = randomInt(10);
List<ListPitInfo> candidatePitInfos = new ArrayList<>(numPits);
for (int i = 0; i < numNodes; i++) {
candidatePitInfos.add(new ListPitInfo(randomAlphaOfLength(10), randomLong(), randomLong()));
}

List<GetAllPitNodeResponse> responses = new ArrayList<>();
List<FailedNodeException> failures = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
DiscoveryNode node = new DiscoveryNode(
randomAlphaOfLength(10),
buildNewFakeTransportAddress(),
emptyMap(),
emptySet(),
Version.CURRENT
);
if (randomBoolean()) {
List<ListPitInfo> nodePitInfos = new ArrayList<>();
for (int j = 0; j < randomInt(numPits); j++) {
nodePitInfos.add(randomFrom(candidatePitInfos));
}
responses.add(new GetAllPitNodeResponse(node, nodePitInfos));
} else {
failures.add(
new FailedNodeException(node.getId(), randomAlphaOfLength(10), new TransportException(randomAlphaOfLength(10)))
);
}
}
return new GetAllPitNodesResponse(new ClusterName("test"), responses, failures);
}
}

0 comments on commit 980bf3c

Please sign in to comment.