Skip to content

Add TcpChannel to unify Transport implementations #27132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Nov 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d8cdf08
WIP
Tim-Brooks Oct 23, 2017
a5a30ca
WIP
Tim-Brooks Oct 23, 2017
e5d4c21
Fix mock transport tests
Tim-Brooks Oct 24, 2017
044f6bf
WIP
Tim-Brooks Oct 24, 2017
068bde0
DWIP
Tim-Brooks Oct 25, 2017
dbc2c85
WIP
Tim-Brooks Oct 25, 2017
701e3a8
Fix nio client tests
Tim-Brooks Oct 25, 2017
ac4cfc3
WIP
Tim-Brooks Oct 25, 2017
b63eb9b
Merge branch 'master' into introduce_tcp_connection
Tim-Brooks Oct 25, 2017
8f85ba5
WIP
Tim-Brooks Oct 25, 2017
c887e7d
Cleanup futures
Tim-Brooks Oct 26, 2017
4c5d0d6
Cleanup channel closing
Tim-Brooks Oct 26, 2017
57a506d
Work on handling accepting
Tim-Brooks Oct 26, 2017
8fa658d
Cleanup
Tim-Brooks Oct 26, 2017
1028126
Add license
Tim-Brooks Oct 26, 2017
172f9eb
Work on cleaning up close futures
Tim-Brooks Oct 27, 2017
cea0253
Cleanups
Tim-Brooks Oct 27, 2017
3565108
Add back method
Tim-Brooks Oct 30, 2017
9dc88dc
Merge remote-tracking branch 'upstream/master' into introduce_tcp_con…
Tim-Brooks Nov 2, 2017
10fb412
Remove futures
Tim-Brooks Nov 2, 2017
9e7903d
Merge branch 'master' into introduce_tcp_connection
Tim-Brooks Nov 8, 2017
8e32b7b
Drop parameterization
Tim-Brooks Nov 13, 2017
1f1fd43
Fix test
Tim-Brooks Nov 13, 2017
8d8df72
make changes based on review
Tim-Brooks Nov 13, 2017
c57d919
Cleanup code
Tim-Brooks Nov 13, 2017
22bef9c
Merge remote-tracking branch 'upstream/master' into introduce_tcp_con…
Tim-Brooks Nov 13, 2017
c664c07
use completablefuture
Tim-Brooks Nov 13, 2017
d58678c
Make getter public
Tim-Brooks Nov 14, 2017
3a79a16
Changes based on review
Tim-Brooks Nov 14, 2017
bb89a97
Don't wrap throwable
Tim-Brooks Nov 14, 2017
9fd9984
Merge remote-tracking branch 'upstream/master' into introduce_tcp_con…
Tim-Brooks Nov 15, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions core/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -69,6 +70,42 @@ public void onFailure(Exception e) {
};
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding runnable when the response (or failure) is received.
*
* @param runnable the runnable that will be called in event of success or failure
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the runnable when received
*/
static <Response> ActionListener<Response> wrap(Runnable runnable) {
return wrap(r -> runnable.run(), e -> runnable.run());
}

/**
* Converts a listener to a {@link BiConsumer} for compatibility with the {@link java.util.concurrent.CompletableFuture}
* api.
*
* @param listener that will be wrapped
* @param <Response> the type of the response
* @return a bi consumer that will complete the wrapped listener
*/
static <Response> BiConsumer<Response, Throwable> toBiConsumer(ActionListener<Response> listener) {
return (response, throwable) -> {
if (throwable == null) {
listener.onResponse(response);
} else {
if (throwable instanceof Exception) {
listener.onFailure((Exception) throwable);
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new AssertionError("Should have been either Error or Exception", throwable);
}
}
};
}

/**
* Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception
* the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ private ConnectionTypeHandle(int offset, int length, Set<TransportRequestOptions
* Returns one of the channels out configured for this handle. The channel is selected in a round-robin
* fashion.
*/
<T> T getChannel(T[] channels) {
<T> T getChannel(List<T> channels) {
if (length == 0) {
throw new IllegalStateException("can't select channel size is 0 for types: " + types);
}
assert channels.length >= offset + length : "illegal size: " + channels.length + " expected >= " + (offset + length);
return channels[offset + Math.floorMod(counter.incrementAndGet(), length)];
assert channels.size() >= offset + length : "illegal size: " + channels.size() + " expected >= " + (offset + length);
return channels.get(offset + Math.floorMod(counter.incrementAndGet(), length));
}

/**
Expand All @@ -223,5 +223,4 @@ Set<TransportRequestOptions.Type> getTypes() {
return types;
}
}

}
169 changes: 169 additions & 0 deletions core/src/main/java/org/elasticsearch/transport/TcpChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


/**
* This is a tcp channel representing a single channel connection to another node. It is the base channel
* abstraction used by the {@link TcpTransport} and {@link TransportService}. All tcp transport
* implementations must return channels that adhere to the required method contracts.
*/
public interface TcpChannel extends Releasable {

/**
* Closes the channel. This might be an asynchronous process. There is notguarantee that the channel
* will be closed when this method returns. Use the {@link #addCloseListener(ActionListener)} method
* to implement logic that depends on knowing when the channel is closed.
*/
void close();

/**
* Adds a listener that will be executed when the channel is closed. If the channel is still open when
* this listener is added, the listener will be executed by the thread that eventually closes the
* channel. If the channel is already closed when the listener is added the listener will immediately be
* executed by the thread that is attempting to add the listener.
*
* @param listener to be executed
*/
void addCloseListener(ActionListener<TcpChannel> listener);


/**
* This sets the low level socket option {@link java.net.StandardSocketOptions} SO_LINGER on a channel.
*
* @param value to set for SO_LINGER
* @throws IOException that can be throw by the low level socket implementation
*/
void setSoLinger(int value) throws IOException;


/**
* Indicates whether a channel is currently open
*
* @return boolean indicating if channel is open
*/
boolean isOpen();

/**
* Closes the channel.
*
* @param channel to close
* @param blocking indicates if we should block on channel close
*/
static <C extends TcpChannel> void closeChannel(C channel, boolean blocking) {
closeChannels(Collections.singletonList(channel), blocking);
}

/**
* Closes the channels.
*
* @param channels to close
* @param blocking indicates if we should block on channel close
*/
static <C extends TcpChannel> void closeChannels(List<C> channels, boolean blocking) {
if (blocking) {
ArrayList<ActionFuture<TcpChannel>> futures = new ArrayList<>(channels.size());
for (final C channel : channels) {
if (channel.isOpen()) {
PlainActionFuture<TcpChannel> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture);
channel.close();
futures.add(closeFuture);
}
}
blockOnFutures(futures);
} else {
Releasables.close(channels);
}
}

/**
* Awaits for all of the pending connections to complete. Will throw an exception if at least one of the
* connections fails.
*
* @param discoveryNode the node for the pending connections
* @param connectionFutures representing the pending connections
* @param connectTimeout to wait for a connection
* @param <C> the type of channel
* @throws ConnectTransportException if one of the connections fails
*/
static <C extends TcpChannel> void awaitConnected(DiscoveryNode discoveryNode, List<ActionFuture<C>> connectionFutures,
TimeValue connectTimeout) throws ConnectTransportException {
Exception connectionException = null;
boolean allConnected = true;

for (ActionFuture<C> connectionFuture : connectionFutures) {
try {
connectionFuture.get(connectTimeout.getMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
allConnected = false;
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
} catch (ExecutionException e) {
allConnected = false;
connectionException = (Exception) e.getCause();
break;
}
}

if (allConnected == false) {
if (connectionException == null) {
throw new ConnectTransportException(discoveryNode, "connect_timeout[" + connectTimeout + "]");
} else {
throw new ConnectTransportException(discoveryNode, "connect_exception", connectionException);
}
}
}

static void blockOnFutures(List<ActionFuture<TcpChannel>> futures) {
for (ActionFuture<TcpChannel> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
// Ignore as we are only interested in waiting for the close process to complete. Logging
// close exceptions happens elsewhere.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
}
}
}
}
Loading