Skip to content

Commit cc307c8

Browse files
committed
Fix serialization bug for aggs
I created this bug today in elastic#53793. When a `DelayableWriteable` that references an existing object serializes itself it wasn't taking the version of the node on the other side of the wire into account. This fixes that.
1 parent b9bfba2 commit cc307c8

File tree

2 files changed

+29
-5
lines changed

2 files changed

+29
-5
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919

2020
package org.elasticsearch.common.io.stream;
2121

22-
import org.elasticsearch.Version;
23-
import org.elasticsearch.common.bytes.BytesReference;
24-
2522
import java.io.IOException;
2623
import java.util.function.Supplier;
2724

25+
import org.elasticsearch.Version;
26+
import org.elasticsearch.common.bytes.BytesReference;
27+
2828
/**
2929
* A holder for {@link Writeable}s that can delays reading the underlying
3030
* {@linkplain Writeable} when it is read from a remote node.
@@ -60,6 +60,7 @@ private static class Referencing<T extends Writeable> extends DelayableWriteable
6060
@Override
6161
public void writeTo(StreamOutput out) throws IOException {
6262
try (BytesStreamOutput buffer = new BytesStreamOutput()) {
63+
buffer.setVersion(out.getVersion());
6364
reference.writeTo(buffer);
6465
out.writeBytesReference(buffer.bytes());
6566
}

server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
public class DelayableWriteableTests extends ESTestCase {
3232
// NOTE: we don't use AbstractWireSerializingTestCase because we don't implement equals and hashCode.
33-
public static class Example implements NamedWriteable {
33+
private static class Example implements NamedWriteable {
3434
private final String s;
3535

3636
public Example(String s) {
@@ -66,7 +66,7 @@ public int hashCode() {
6666
}
6767
}
6868

69-
public static class NamedHolder implements Writeable {
69+
private static class NamedHolder implements Writeable {
7070
private final Example e;
7171

7272
public NamedHolder(Example e) {
@@ -97,6 +97,23 @@ public int hashCode() {
9797
}
9898
}
9999

100+
private static class SneakOtherSideVersionOnWire implements Writeable {
101+
private final Version version;
102+
103+
public SneakOtherSideVersionOnWire() {
104+
version = Version.CURRENT;
105+
}
106+
107+
public SneakOtherSideVersionOnWire(StreamInput in) throws IOException {
108+
version = Version.readVersion(in);
109+
}
110+
111+
@Override
112+
public void writeTo(StreamOutput out) throws IOException {
113+
Version.writeVersion(out.getVersion(), out);
114+
}
115+
}
116+
100117
public void testRoundTripFromReferencing() throws IOException {
101118
Example e = new Example(randomAlphaOfLength(5));
102119
DelayableWriteable<Example> original = DelayableWriteable.referencing(e);
@@ -139,6 +156,12 @@ public void testRoundTripFromDelayedFromOldVersionWithNamedWriteable() throws IO
139156
roundTripTestCase(original, NamedHolder::new);
140157
}
141158

159+
public void testSerializesWithRemoteVersion() throws IOException {
160+
Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
161+
DelayableWriteable<SneakOtherSideVersionOnWire> original = DelayableWriteable.referencing(new SneakOtherSideVersionOnWire());
162+
assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).get().version, equalTo(remoteVersion));
163+
}
164+
142165
private <T extends Writeable> void roundTripTestCase(DelayableWriteable<T> original, Writeable.Reader<T> reader) throws IOException {
143166
DelayableWriteable<T> roundTripped = roundTrip(original, reader, Version.CURRENT);
144167
assertTrue(roundTripped.isDelayed());

0 commit comments

Comments
 (0)