diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 5a53f6ec014a2..1e19d760c6d7b 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -19,7 +19,6 @@ package org.apache.pulsar.proxy.server; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; @@ -380,7 +379,7 @@ private synchronized void completeConnect() throws PulsarClientException { } private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) { - checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop"); + assert ctx.executor().inEventLoop(); if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) { this.directProxyHandler = directProxyHandler; state = State.ProxyConnectionToBroker; @@ -401,7 +400,7 @@ private void handleBrokerConnected(DirectProxyHandler directProxyHandler, Comman } private void connectToBroker(InetSocketAddress brokerAddress) { - checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop"); + assert ctx.executor().inEventLoop(); DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this); directProxyHandler.connect(proxyToBrokerUrl, brokerAddress, protocolVersionToAdvertise); } @@ -409,10 +408,13 @@ private void connectToBroker(InetSocketAddress brokerAddress) { public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) { try { final CommandConnected finalConnected = new CommandConnected().copyFrom(connected); - ctx.executor().execute(() -> handleBrokerConnected(directProxyHandler, finalConnected)); + handleBrokerConnected(directProxyHandler, finalConnected); } catch (RejectedExecutionException e) { LOG.error("Event loop was already closed. Closing broker connection.", e); directProxyHandler.close(); + } catch (AssertionError e) { + LOG.error("Failed assertion, closing direct proxy handler.", e); + directProxyHandler.close(); } }