Skip to content

Remove Unused Features Field on StreamOutput #44667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.CharArrays;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -71,7 +69,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -110,7 +107,6 @@ public abstract class StreamOutput extends OutputStream {
}

private Version version = Version.CURRENT;
private Set<String> features = Collections.emptySet();

/**
* The version of the node on the other side of this stream.
Expand All @@ -126,27 +122,6 @@ public void setVersion(Version version) {
this.version = version;
}

/**
* Test if the stream has the specified feature. Features are used when serializing {@link ClusterState.Custom} or
* {@link MetaData.Custom}; see also {@link ClusterState.FeatureAware}.
*
* @param feature the feature to test
* @return true if the stream has the specified feature
*/
public boolean hasFeature(final String feature) {
return this.features.contains(feature);
}

/**
* Set the features on the stream. See {@link StreamOutput#hasFeature(String)}.
*
* @param features the features on the stream
*/
public void setFeatures(final Set<String> features) {
assert this.features.isEmpty() : this.features;
this.features = Set.copyOf(features);
}

public long position() throws IOException {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Set;

final class OutboundHandler {

Expand Down Expand Up @@ -95,13 +94,12 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long
* Sends the response to the given channel. This method should be used to send {@link TransportResponse}
* objects back to the caller.
*
* @see #sendErrorResponse(Version, Set, TcpChannel, long, String, Exception) for sending error responses
* @see #sendErrorResponse(Version, TcpChannel, long, String, Exception) for sending error responses
*/
void sendResponse(final Version nodeVersion, final Set<String> features, final TcpChannel channel,
final long requestId, final String action, final TransportResponse response,
final boolean compress, final boolean isHandshake) throws IOException {
void sendResponse(final Version nodeVersion, final TcpChannel channel, final long requestId, final String action,
final TransportResponse response, final boolean compress, final boolean isHandshake) throws IOException {
Version version = Version.min(this.version, nodeVersion);
OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), features, response, version,
OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), response, version,
requestId, isHandshake, compress);
ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response));
sendMessage(channel, message, listener);
Expand All @@ -110,12 +108,12 @@ void sendResponse(final Version nodeVersion, final Set<String> features, final T
/**
* Sends back an error response to the caller via the given channel
*/
void sendErrorResponse(final Version nodeVersion, final Set<String> features, final TcpChannel channel, final long requestId,
final String action, final Exception error) throws IOException {
void sendErrorResponse(final Version nodeVersion, final TcpChannel channel, final long requestId, final String action,
final Exception error) throws IOException {
Version version = Version.min(this.version, nodeVersion);
TransportAddress address = new TransportAddress(channel.getLocalAddress());
RemoteTransportException tx = new RemoteTransportException(nodeName, address, action, error);
OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), features, tx, version, requestId,
OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), tx, version, requestId,
false, false);
ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, error));
sendMessage(channel, message, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ThreadContext;

import java.io.IOException;
import java.util.Set;

abstract class OutboundMessage extends NetworkMessage implements Writeable {
abstract class OutboundMessage extends NetworkMessage {

private final Writeable message;

Expand All @@ -49,15 +47,14 @@ BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {
try (CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) {
stream.setVersion(version);
threadContext.writeTo(stream);
writeTo(stream);
reference = writeMessage(stream);
}
bytesStream.seek(0);
TcpHeader.writeHeader(bytesStream, requestId, status, version, reference.length() - TcpHeader.HEADER_SIZE);
return reference;
}

private BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
protected BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
final BytesReference zeroCopyBuffer;
if (message instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) message;
Expand Down Expand Up @@ -96,9 +93,10 @@ static class Request extends OutboundMessage {
}

@Override
public void writeTo(StreamOutput out) throws IOException {
protected BytesReference writeMessage(CompressibleBytesOutputStream out) throws IOException {
out.writeStringArray(features);
out.writeString(action);
return super.writeMessage(out);
}

private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {
Expand All @@ -117,17 +115,8 @@ private static byte setStatus(boolean compress, boolean isHandshake, Writeable m

static class Response extends OutboundMessage {

private final Set<String> features;

Response(ThreadContext threadContext, Set<String> features, Writeable message, Version version, long requestId,
boolean isHandshake, boolean compress) {
Response(ThreadContext threadContext, Writeable message, Version version, long requestId, boolean isHandshake, boolean compress) {
super(threadContext, version, setStatus(compress, isHandshake, message), requestId, message);
this.features = features;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.setFeatures(features);
Copy link
Contributor Author

@original-brownbear original-brownbear Jul 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was (and now would have been even more) a weird writeTo that doesn't actually write anything in case of a response. Adjusted the abstractions a little here and just made the request overwrite the message write code to prefix messages with the features array.

}

private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P
(node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
TransportRequestOptions.EMPTY, v, false, true),
(v, features1, channel, response, requestId) -> outboundHandler.sendResponse(v, features1, channel, requestId,
(v, features1, channel, response, requestId) -> outboundHandler.sendResponse(v, channel, requestId,
TransportHandshaker.HANDSHAKE_ACTION_NAME, response, false, true));
InboundMessage.Reader reader = new InboundMessage.Reader(version, namedWriteableRegistry, threadPool.getThreadContext());
this.keepAlive = new TransportKeepAlive(threadPool, this.outboundHandler::sendBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public String getProfileName() {
@Override
public void sendResponse(TransportResponse response) throws IOException {
try {
outboundHandler.sendResponse(version, features, channel, requestId, action, response, compressResponse, false);
outboundHandler.sendResponse(version, channel, requestId, action, response, compressResponse, false);
} finally {
release(false);
}
Expand All @@ -70,7 +70,7 @@ public void sendResponse(TransportResponse response) throws IOException {
@Override
public void sendResponse(Exception exception) throws IOException {
try {
outboundHandler.sendErrorResponse(version, features, channel, requestId, action, exception);
outboundHandler.sendErrorResponse(version, channel, requestId, action, exception);
} finally {
release(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Optional;

Expand Down Expand Up @@ -111,19 +110,14 @@ public void testVersion() {
final BytesStreamOutput out = new BytesStreamOutput();
final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
out.setVersion(afterVersion);
if (custom.getRequiredFeature().isPresent()) {
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
}
custom.getRequiredFeature();
assertTrue(FeatureAware.shouldSerialize(out, custom));
}
{
final BytesStreamOutput out = new BytesStreamOutput();
final Version beforeVersion =
randomVersionBetween(random(), VersionUtils.getFirstVersion(), VersionUtils.getPreviousVersion(version));
out.setVersion(beforeVersion);
if (custom.getRequiredFeature().isPresent() && randomBoolean()) {
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
}
assertFalse(FeatureAware.shouldSerialize(out, custom));
}
}
Expand All @@ -138,7 +132,6 @@ public void testFeature() {
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(afterVersion);
assertTrue(custom.getRequiredFeature().isPresent());
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
assertTrue(FeatureAware.shouldSerialize(out, custom));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY;
import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT;
Expand Down Expand Up @@ -269,11 +267,6 @@ public void testMinVersionSerialization() throws IOException {
final BytesStreamOutput out = new BytesStreamOutput();

out.setVersion(streamVersion);
Set<String> features = new HashSet<>();
if (randomBoolean()) {
features.add("test");
}
out.setFeatures(features);
tasks.build().writeTo(out);

final StreamInput input = out.bytes().streamInput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testReadResponse() throws IOException {
boolean compress = randomBoolean();
threadContext.putHeader("header", "header_value");
Version version = randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion());
OutboundMessage.Response request = new OutboundMessage.Response(threadContext, features, message, version, requestId, isHandshake,
OutboundMessage.Response request = new OutboundMessage.Response(threadContext, message, version, requestId, isHandshake,
compress);
BytesReference reference;
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testReadErrorResponse() throws IOException {
boolean compress = randomBoolean();
threadContext.putHeader("header", "header_value");
Version version = randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion());
OutboundMessage.Response request = new OutboundMessage.Response(threadContext, features, exception, version, requestId,
OutboundMessage.Response request = new OutboundMessage.Response(threadContext, exception, version, requestId,
isHandshake, compress);
BytesReference reference;
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testEnsureVersionCompatibility() throws IOException {

public void testThrowOnNotCompressed() throws Exception {
OutboundMessage.Response request = new OutboundMessage.Response(
threadContext, Collections.emptySet(), new Message(randomAlphaOfLength(10)), Version.CURRENT, randomLong(), false, false);
threadContext, new Message(randomAlphaOfLength(10)), Version.CURRENT, randomLong(), false, false);
BytesReference reference;
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
reference = request.serialize(streamOutput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void onResponseSent(long requestId, String action, TransportResponse resp
responseRef.set(response);
}
});
handler.sendResponse(version, Collections.emptySet(), channel, requestId, action, response, compress, isHandshake);
handler.sendResponse(version, channel, requestId, action, response, compress, isHandshake);

BytesReference reference = channel.getMessageCaptor().get();
ActionListener<Void> sendListener = channel.getListenerCaptor().get();
Expand Down Expand Up @@ -256,7 +256,7 @@ public void onResponseSent(long requestId, String action, Exception error) {
responseRef.set(error);
}
});
handler.sendErrorResponse(version, Collections.emptySet(), channel, requestId, action, error);
handler.sendErrorResponse(version, channel, requestId, action, error);

BytesReference reference = channel.getMessageCaptor().get();
ActionListener<Void> sendListener = channel.getListenerCaptor().get();
Expand Down