-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Add TcpChannel to unify Transport implementations #27132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add TcpChannel to unify Transport implementations #27132
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did an initial pass. I like it. left some comments
|
||
import org.elasticsearch.action.support.PlainActionFuture; | ||
|
||
public class PlainChannelFuture<Channel> extends PlainActionFuture<Channel> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have javadocs for this? and can it be final
|
||
ListenableActionFuture<C> closeAsync(); | ||
|
||
ListenableActionFuture<C> getCloseFuture(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At times we need to attach a close listener. And the future allows us to do that. But I presume that you want just a method to add close listeners instead.
|
||
public interface TcpChannel<C extends TcpChannel<C>> { | ||
|
||
ListenableActionFuture<C> closeAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we turn this around and accept a listener isntead? I really don't like returning futures it's a design flaw IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand here. We need something to block on at shutdown. Are you suggesting this?
PlainActionFuture<C> future = PlainActionFuture.newFuture();
channel.addCloseListener(future);
channel.closeAsync();
future.actionGet();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or some callback with a CountDownLatch
?
CountDownLatch latch = new CountDownLatch(channels.size());
for (final C channel : channels) {
channel.addCloseListener(ActionListener.wrap(latch::countDown));
}
latch.await();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the simplest options would be this:
void close(ActionListener<Void> onClosed);
|
||
import java.io.IOException; | ||
|
||
public interface TcpChannel<C extends TcpChannel<C>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that down the road we will get rid of the generics here and add all the necessary method to it that TcpTransport needs, right?
@@ -426,7 +420,7 @@ public Version getVersion() { | |||
} | |||
|
|||
public List<Channel> getChannels() { | |||
return Arrays.asList(channels); | |||
return new ArrayList<>(channels); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we wrap this in the ctor in a Collections#unmodifiableList
and just return the instance instead of a mutable copy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep good idea. I should use the unmodifiableList in a few other places (in NodeChannels) too.
List<PlainChannelFuture<Channel>> pendingChannels = new ArrayList<>(numConnections); | ||
for (int i = 0; i < numConnections; ++i) { | ||
try { | ||
PlainChannelFuture<Channel> pending = initiateChannel(node, connectionProfile.getConnectTimeout()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like that this returns a future. I am almost certain it's simpler if we go with a callback here. I havent' seen a useage of future where it was actually helpful or simplifying anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess same question as above. Are you imaging something where I pass in a CountDownLatch as a callback or a future as a callback? At some point we need to block while waiting for the connections to be completed (without changing our architecture in a pretty major way).
@@ -843,7 +886,9 @@ public static int resolvePublishPort(ProfileSettings profileSettings, List<InetS | |||
// not perfect, but PortsRange should take care of any port range validation, not a regex | |||
private static final Pattern BRACKET_PATTERN = Pattern.compile("^\\[(.*:.*)\\](?::([\\d\\-]*))?$"); | |||
|
|||
/** parse a hostname+port range spec into its equivalent addresses */ | |||
/** | |||
* parse a hostname+port range spec into its equivalent addresses |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
autoformat. I'll fix in next revision.
This is ready for another review. I have address @s1monw suggested changes. |
@s1monw I have made changes based on your feedback. I was able to remove some of the parameterization. But more should be able to be removed in followup PRs. I did disagree with this:
Can you articulate the race? I do not think there is one. This pattern is pretty common and I use it a number of places. I attempt to enqueue something with a concurrent queue. After enqueuing I check a volatile variable to see if this context has been completed. If it has not been completed this listener will be dealt with by the completing thread. If the context has been completed this thread will attempt to remove the listener. If the listener is removed, we deal with it and the completing thread will never see it. If the completing thread removes it from the queue first, it will deal with the listener and we can move on. Only one thread can remove it from the queue. Pretty much all of the complicated concurrency work is provided by the java std lib concurrent queue. Additionally, unless you execute all of the listeners in a synchronized block the logic is still pretty complicated using synchronization. (As an aside, after this PR we are not really using the PlainListenableActionFuture anymore. We could probably remove that as I think you prefer to not use the future api?) And you say that this is not a performance sensitive area of the code but this is called on the network thread. Using synchronization risks the network thread being parked while waiting for another thread to attach a listener. I would prefer to avoid that risk with using two java std lib utilities (atomic reference and concurrent queue) and a reasonably easy to understand concurrency pattern. I did add a comment to the code explaining what is going on. Thoughts? |
@s1monw Actually it looks like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
} else { | ||
if (throwable instanceof Exception) { | ||
listener.onFailure((Exception) throwable); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should hide the non-exception. this will be an error and in this case we need to rethrow it? @jasontedor WDYT?
* @return a listener that listens for responses and invokes the runnable when received | ||
*/ | ||
static <Response> ActionListener<Response> wrap(Runnable runnable) { | ||
return new ActionListener<Response>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just return wrap(runnable:run, e -> runnable.run());
?
you are right this is correct. I didn't anticipate / overlooked that poll removes. Yet, I like the solution you have now better :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@s1monw did the heavy lifting in the review phase here but this LGTM too.
Right now our different transport implementations must duplicate functionality in order to stay compliant with the requirements of TcpTransport. They must all implement common logic to open channels, close channels, keep track of channels for eventual shutdown, etc. Additionally, there is a weird and complicated relationship between Transport and TransportService. We eventually want to start merging some of the functionality between these classes. This commit starts moving towards a world where TransportService retains all the application logic and channel state. Transport implementations in this world will only be tasked with returning a channel when one is requested, calling transport service when a channel is accepted from a server, and starting / stopping itself. Specifically this commit changes how channels are opened and closed. All Transport implementations now return a channel type that must comply with the new TcpChannel interface. This interface has the methods necessary for TcpTransport to completely manage the lifecycle of a channel. This includes setting the channel up, waiting for connection, adding close listeners, and eventually closing.
This commit is a follow up to the work completed in elastic#27132. Essentially it transitions two more methods (sendMessage and getLocalAddress) from Transport to TcpChannel. With this change, there is no longer a need for TcpTransport to be aware of the specific type of channel a transport returns. So that class is no longer parameterized by channel type.
This is a follow up to elastic#27132. As that PR greatly simplified the connection logic inside a low level transport implementation, much of the functionality provided by the NioClient class is no longer necessary. This commit removes that class.
* master: Stop skipping REST test after backport of #27056 Fix default value of ignore_unavailable for snapshot REST API (#27056) Add composite aggregator (#26800) Fix `ShardSplittingQuery` to respect nested documents. (#27398) [Docs] Restore section about multi-level parent/child relation in parent-join (#27392) Add TcpChannel to unify Transport implementations (#27132) Add note on plugin distributions in plugins folder Remove implementations of `TransportChannel` (#27388) Update Google SDK to version 1.23 (#27381) Fix Gradle 4.3.1 compatibility for logging (#27382) [Test] Change Elasticsearch startup timeout to 120s in packaging tests Docs/windows installer (#27369)
This is a follow up to #27132. As that PR greatly simplified the connection logic inside a low level transport implementation, much of the functionality provided by the NioClient class is no longer necessary. This commit removes that class.
This is a follow up to #27132. As that PR greatly simplified the connection logic inside a low level transport implementation, much of the functionality provided by the NioClient class is no longer necessary. This commit removes that class.
This commit is a follow up to the work completed in #27132. Essentially it transitions two more methods (sendMessage and getLocalAddress) from Transport to TcpChannel. With this change, there is no longer a need for TcpTransport to be aware of the specific type of channel a transport returns. So that class is no longer parameterized by channel type.
This commit is a follow up to the work completed in #27132. Essentially it transitions two more methods (sendMessage and getLocalAddress) from Transport to TcpChannel. With this change, there is no longer a need for TcpTransport to be aware of the specific type of channel a transport returns. So that class is no longer parameterized by channel type.
Right now our different transport implementations must duplicate
functionality in order to stay compliant with the requirements of
TcpTransport. They must all implement common logic to open channels,
close channels, keep track of channels for eventual shutdown, etc.
Additionally, there is a weird and complicated relationship between
Transport and TransportService. We eventually want to start merging
some of the functionality between these classes.
This commit starts moving towards a world where TransportService retains
all the application logic and channel state. Transport implementations
in this world will only be tasked with returning a channel when one is
requested, calling transport service when a channel is accepted from
a server, and starting / stopping itself.
Specifically this commit changes how channels are opened and closed. All
Transport implementations now return a channel type that must comply with
the new TcpChannel interface. This interface has the methods necessary
for TcpTransport to completely manage the lifecycle of a channel. This
includes setting the channel up, waiting for connection, adding close
listeners, and eventually closing.