Skip to content

Add TcpChannel to unify Transport implementations #27132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Nov 15, 2017

Conversation

Tim-Brooks
Copy link
Contributor

Right now our different transport implementations must duplicate
functionality in order to stay compliant with the requirements of
TcpTransport. They must all implement common logic to open channels,
close channels, keep track of channels for eventual shutdown, etc.

Additionally, there is a weird and complicated relationship between
Transport and TransportService. We eventually want to start merging
some of the functionality between these classes.

This commit starts moving towards a world where TransportService retains
all the application logic and channel state. Transport implementations
in this world will only be tasked with returning a channel when one is
requested, calling transport service when a channel is accepted from
a server, and starting / stopping itself.

Specifically this commit changes how channels are opened and closed. All
Transport implementations now return a channel type that must comply with
the new TcpChannel interface. This interface has the methods necessary
for TcpTransport to completely manage the lifecycle of a channel. This
includes setting the channel up, waiting for connection, adding close
listeners, and eventually closing.

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did an initial pass. I like it. left some comments


import org.elasticsearch.action.support.PlainActionFuture;

public class PlainChannelFuture<Channel> extends PlainActionFuture<Channel> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have javadocs for this? and can it be final


ListenableActionFuture<C> closeAsync();

ListenableActionFuture<C> getCloseFuture();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At times we need to attach a close listener. And the future allows us to do that. But I presume that you want just a method to add close listeners instead.


public interface TcpChannel<C extends TcpChannel<C>> {

ListenableActionFuture<C> closeAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we turn this around and accept a listener isntead? I really don't like returning futures it's a design flaw IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand here. We need something to block on at shutdown. Are you suggesting this?

PlainActionFuture<C> future = PlainActionFuture.newFuture();
channel.addCloseListener(future);
channel.closeAsync();
future.actionGet();

Copy link
Contributor Author

@Tim-Brooks Tim-Brooks Oct 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or some callback with a CountDownLatch?

CountDownLatch latch = new CountDownLatch(channels.size());
for (final C channel : channels) {
    channel.addCloseListener(ActionListener.wrap(latch::countDown));
}
latch.await();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the simplest options would be this:

void close(ActionListener<Void> onClosed);


import java.io.IOException;

public interface TcpChannel<C extends TcpChannel<C>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that down the road we will get rid of the generics here and add all the necessary method to it that TcpTransport needs, right?

@@ -426,7 +420,7 @@ public Version getVersion() {
}

public List<Channel> getChannels() {
return Arrays.asList(channels);
return new ArrayList<>(channels);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wrap this in the ctor in a Collections#unmodifiableList and just return the instance instead of a mutable copy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep good idea. I should use the unmodifiableList in a few other places (in NodeChannels) too.

List<PlainChannelFuture<Channel>> pendingChannels = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; ++i) {
try {
PlainChannelFuture<Channel> pending = initiateChannel(node, connectionProfile.getConnectTimeout());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that this returns a future. I am almost certain it's simpler if we go with a callback here. I havent' seen a useage of future where it was actually helpful or simplifying anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess same question as above. Are you imaging something where I pass in a CountDownLatch as a callback or a future as a callback? At some point we need to block while waiting for the connections to be completed (without changing our architecture in a pretty major way).

@@ -843,7 +886,9 @@ public static int resolvePublishPort(ProfileSettings profileSettings, List<InetS
// not perfect, but PortsRange should take care of any port range validation, not a regex
private static final Pattern BRACKET_PATTERN = Pattern.compile("^\\[(.*:.*)\\](?::([\\d\\-]*))?$");

/** parse a hostname+port range spec into its equivalent addresses */
/**
* parse a hostname+port range spec into its equivalent addresses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

autoformat. I'll fix in next revision.

@Tim-Brooks
Copy link
Contributor Author

This is ready for another review. I have address @s1monw suggested changes.

@Tim-Brooks Tim-Brooks changed the title Add TcpChannel<C> to unify Transport impls Add TcpChannel to unify Transport impls Nov 13, 2017
@Tim-Brooks Tim-Brooks changed the title Add TcpChannel to unify Transport impls Add TcpChannel to unify Transport implementations Nov 13, 2017
@Tim-Brooks
Copy link
Contributor Author

@s1monw I have made changes based on your feedback. I was able to remove some of the parameterization. But more should be able to be removed in followup PRs.

I did disagree with this:

we have a race here. We might execute this listener twice. I think we should keep it simple and just use synchronize and make a fully copy of the list every time we add a listener. I don't think this is perf critical. Let's make the state only mutable under a mutex otherwise it's too complicated. That way you can also just use ActionListener#onResponse and ActionListener#onFailure

Can you articulate the race? I do not think there is one. This pattern is pretty common and I use it a number of places. I attempt to enqueue something with a concurrent queue. After enqueuing I check a volatile variable to see if this context has been completed. If it has not been completed this listener will be dealt with by the completing thread. If the context has been completed this thread will attempt to remove the listener. If the listener is removed, we deal with it and the completing thread will never see it. If the completing thread removes it from the queue first, it will deal with the listener and we can move on. Only one thread can remove it from the queue.

Pretty much all of the complicated concurrency work is provided by the java std lib concurrent queue.

Additionally, unless you execute all of the listeners in a synchronized block the logic is still pretty complicated using synchronization. (As an aside, after this PR we are not really using the PlainListenableActionFuture anymore. We could probably remove that as I think you prefer to not use the future api?)

And you say that this is not a performance sensitive area of the code but this is called on the network thread. Using synchronization risks the network thread being parked while waiting for another thread to attach a listener. I would prefer to avoid that risk with using two java std lib utilities (atomic reference and concurrent queue) and a reasonably easy to understand concurrency pattern.

I did add a comment to the code explaining what is going on.

Thoughts?

@Tim-Brooks
Copy link
Contributor Author

@s1monw Actually it looks like CompletableFuture provides the thing that I was looking for (multiple listeners without synchronization 😀) and is in the std lib. So I just deleted the ListenerExecutionContext and used that.

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

} else {
if (throwable instanceof Exception) {
listener.onFailure((Exception) throwable);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should hide the non-exception. this will be an error and in this case we need to rethrow it? @jasontedor WDYT?

* @return a listener that listens for responses and invokes the runnable when received
*/
static <Response> ActionListener<Response> wrap(Runnable runnable) {
return new ActionListener<Response>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just return wrap(runnable:run, e -> runnable.run());?

@s1monw
Copy link
Contributor

s1monw commented Nov 14, 2017

Can you articulate the race? I do not think there is one. This pattern is pretty common and I use it a number of places. I attempt to enqueue something with a concurrent queue. After enqueuing I check a volatile variable to see if this context has been completed. If it has not been completed this listener will be dealt with by the completing thread. If the context has been completed this thread will attempt to remove the listener. If the listener is removed, we deal with it and the completing thread will never see it. If the completing thread removes it from the queue first, it will deal with the listener and we can move on. Only one thread can remove it from the queue.

you are right this is correct. I didn't anticipate / overlooked that poll removes. Yet, I like the solution you have now better :)

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw did the heavy lifting in the review phase here but this LGTM too.

@Tim-Brooks Tim-Brooks merged commit ca11085 into elastic:master Nov 15, 2017
Tim-Brooks added a commit that referenced this pull request Nov 15, 2017
Right now our different transport implementations must duplicate
functionality in order to stay compliant with the requirements of
TcpTransport. They must all implement common logic to open channels,
close channels, keep track of channels for eventual shutdown, etc.

Additionally, there is a weird and complicated relationship between
Transport and TransportService. We eventually want to start merging
some of the functionality between these classes.

This commit starts moving towards a world where TransportService retains
all the application logic and channel state. Transport implementations
in this world will only be tasked with returning a channel when one is
requested, calling transport service when a channel is accepted from
a server, and starting / stopping itself.

Specifically this commit changes how channels are opened and closed. All
Transport implementations now return a channel type that must comply with
the new TcpChannel interface. This interface has the methods necessary
for TcpTransport to completely manage the lifecycle of a channel. This
includes setting the channel up, waiting for connection, adding close
listeners, and eventually closing.
Tim-Brooks added a commit to Tim-Brooks/elasticsearch that referenced this pull request Nov 15, 2017
This commit is a follow up to the work completed in elastic#27132. Essentially
it transitions two more methods (sendMessage and getLocalAddress) from
Transport to TcpChannel. With this change, there is no longer a need for
TcpTransport to be aware of the specific type of channel a transport
returns. So that class is no longer parameterized by channel type.
Tim-Brooks added a commit to Tim-Brooks/elasticsearch that referenced this pull request Nov 15, 2017
This is a follow up to elastic#27132. As that PR greatly simplified the
connection logic inside a low level transport implementation, much of
the functionality provided by the NioClient class is no longer
necessary. This commit removes that class.
jasontedor added a commit that referenced this pull request Nov 16, 2017
* master:
  Stop skipping REST test after backport of #27056
  Fix default value of ignore_unavailable for snapshot REST API (#27056)
  Add composite aggregator (#26800)
  Fix `ShardSplittingQuery` to respect nested documents. (#27398)
  [Docs] Restore section about multi-level parent/child relation in parent-join (#27392)
  Add TcpChannel to unify Transport implementations (#27132)
  Add note on plugin distributions in plugins folder
  Remove implementations of `TransportChannel` (#27388)
  Update Google SDK to version 1.23 (#27381)
  Fix Gradle 4.3.1 compatibility for logging (#27382)
  [Test] Change Elasticsearch startup timeout to 120s in packaging tests
  Docs/windows installer (#27369)
Tim-Brooks added a commit that referenced this pull request Nov 16, 2017
This is a follow up to #27132. As that PR greatly simplified the
connection logic inside a low level transport implementation, much of
the functionality provided by the NioClient class is no longer
necessary. This commit removes that class.
Tim-Brooks added a commit that referenced this pull request Nov 16, 2017
This is a follow up to #27132. As that PR greatly simplified the
connection logic inside a low level transport implementation, much of
the functionality provided by the NioClient class is no longer
necessary. This commit removes that class.
Tim-Brooks added a commit that referenced this pull request Nov 16, 2017
This commit is a follow up to the work completed in #27132. Essentially
it transitions two more methods (sendMessage and getLocalAddress) from
Transport to TcpChannel. With this change, there is no longer a need for
TcpTransport to be aware of the specific type of channel a transport
returns. So that class is no longer parameterized by channel type.
Tim-Brooks added a commit that referenced this pull request Nov 16, 2017
This commit is a follow up to the work completed in #27132. Essentially
it transitions two more methods (sendMessage and getLocalAddress) from
Transport to TcpChannel. With this change, there is no longer a need for
TcpTransport to be aware of the specific type of channel a transport
returns. So that class is no longer parameterized by channel type.
@Tim-Brooks Tim-Brooks deleted the introduce_tcp_connection branch December 10, 2018 16:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Network Http and internode communication implementations >non-issue v6.1.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants