Skip to content

Commit

Permalink
[improve][proxy] Remove unnecessary executor callback; use assert (ap…
Browse files Browse the repository at this point in the history
…ache#19670)

### Motivation

The `DirectProxyHandler` and the `ProxyConnection` are run in the same event loop to prevent context switching. As such, we do not need to schedule an event onto an event loop that is in fact the same event loop. Further, scheduling on that event loop could have resulted in uncaught failures because the method was run without any error handling.

Additionally, we can use `assert` to verify that we are in the event loop. Netty makes extensive use of this paradigm, as described in this PR apache#19653. The primary benefit here is that we skip some unnecessary volatile reads when running the code in production.

### Modifications

* Replace `checkState` with `assert` in `ProxyConnection` class
* Remove unnecessary event execution in callback when starting up the `DirectProxyHandler`.

### Verifying this change

This change is covered by the assertions that are added.

### Does this pull request potentially affect one of the following parts:

This is a minor improvement that should not break anything.

### Documentation

- [x] `doc-not-needed`

### Matching PR in forked repository

PR in forked repository: michaeljmarshall#33
  • Loading branch information
michaeljmarshall authored Mar 20, 2023
1 parent c4abe7b commit 164add6
Showing 1 changed file with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.proxy.server;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
Expand Down Expand Up @@ -380,7 +379,7 @@ private synchronized void completeConnect() throws PulsarClientException {
}

private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
assert ctx.executor().inEventLoop();
if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
this.directProxyHandler = directProxyHandler;
state = State.ProxyConnectionToBroker;
Expand All @@ -401,18 +400,21 @@ private void handleBrokerConnected(DirectProxyHandler directProxyHandler, Comman
}

private void connectToBroker(InetSocketAddress brokerAddress) {
checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
assert ctx.executor().inEventLoop();
DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
directProxyHandler.connect(proxyToBrokerUrl, brokerAddress, protocolVersionToAdvertise);
}

public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
try {
final CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
ctx.executor().execute(() -> handleBrokerConnected(directProxyHandler, finalConnected));
handleBrokerConnected(directProxyHandler, finalConnected);
} catch (RejectedExecutionException e) {
LOG.error("Event loop was already closed. Closing broker connection.", e);
directProxyHandler.close();
} catch (AssertionError e) {
LOG.error("Failed assertion, closing direct proxy handler.", e);
directProxyHandler.close();
}
}

Expand Down

0 comments on commit 164add6

Please sign in to comment.