Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Use dedicated executor for requests in BinaryProtoLookupService #23378

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

nodece
Copy link
Member

@nodece nodece commented Sep 30, 2024

Motivation

I'm testing Pulsar 3.0.6. We have a Pulsar interceptor, which records the data(web request/binary command) to the topic by the producer, but I got this error:

[ZKMetadataStore-12-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.100.102:60796] Created subscription on topic persistent://as/system/__change_events / reader-e710cb132a
[pulsar-io-6-2] WARN org.apache.pulsar.client.impl.ClientCnx - [/192.168.100.102:15002] Got exception java.lang.UnsupportedOperationException
	at org.apache.pulsar.common.protocol.PulsarDecoder.handlePartitionMetadataRequest(PulsarDecoder.java:508)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:134)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)

This indicates the broker sends a PartitionMetadataRequest command to the client, this is a bizarre behavior.

How to reproduce this issue?

  1. Create a producer asynchronously in the org.apache.pulsar.broker.intercept.BrokerInterceptor#onPulsarCommand.
  2. The producer will change the type of pulsar response command, which will be replaced with PARTITION_METADATA.
image

Due to Pulsar's use of ThreadLocal for command creation, the command instances are being unexpectedly modified.

Modifications

  • Add lookup executor for requests in BinaryProtoLookupService.
  • Add shared lookup executor for the broker client.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 30, 2024
@nodece nodece self-assigned this Sep 30, 2024
@lhotari
Copy link
Member

lhotari commented Oct 1, 2024

Due to Pulsar's use of ThreadLocal for command creation, the command instances are being unexpectedly modified.

Thanks for bringing this up. This seems to be a severe bug and it would justify to report it separately. It's easy to miss this PR that this is addressing a critical issue.

@lhotari
Copy link
Member

lhotari commented Oct 1, 2024

Create a producer asynchronously in the org.apache.pulsar.broker.intercept.BrokerInterceptor#onPulsarCommand.

Is this the only case where this can be reproduced?

@lhotari
Copy link
Member

lhotari commented Oct 1, 2024

  1. Create a producer asynchronously in the org.apache.pulsar.broker.intercept.BrokerInterceptor#onPulsarCommand.

@nodece I think that the correct solution would be to call new BaseCommand().copyFrom(command) in the interceptor to make a copy of the command. That's what is needed whenever the thread locals are referenced later.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct solution for keeping a reference in the onPulsarCommand broker interceptor implementation would be to use new BaseCommand().copyFrom(command). That's why I don't think that the solution in the PR directly related to addressing the problem.

@nodece
Copy link
Member Author

nodece commented Oct 2, 2024

Create a producer asynchronously in the org.apache.pulsar.broker.intercept.BrokerInterceptor#onPulsarCommand.

Is this the only case where this can be reproduced?

At present, it looks like this.

@nodece I think that the correct solution would be to call new BaseCommand().copyFrom(command) in the interceptor to make a copy of the command. That's what is needed whenever the thread locals are referenced later.

My interceptor doesn't change the command. Please notice the 1 and 2 in my picture, the broker and client in the same k thread.

  1. org.apache.pulsar.broker.service.PulsarCommandSenderImpl#sendSuccessResponse:
    public void sendSuccessResponse(long requestId) {
        // step-1: caller in the zk thread.
        BaseCommand command = Commands.newSuccessCommand(requestId);
        // step-2: call the interceptor method.
        safeIntercept(command, cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        writeAndFlush(outBuf);
    }

Use the ThreadLocal to create the command in the zk thread.

  1. My intercept method:
    public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
        // caller in the zk thread.
        Producer<byte[]> producer = client.newProducer().topic("my-topic").create();
        producer.send(command.getType().name().getBytes(StandardCharsets.UTF_8));
    }

The produce creation needs to do the lookup and then get partition metadata, which requests the broker by Commands.newLookup and Commands.newPartitionMetadataRequest commands.

We are using the ThreadLocal to create the lookup or partition metadata requests, so the sendSuccessResponse command is broken in the step-2.

The current solution version uses a dedicated executor to create commands and make requests in the client.

Pulsar uses implicit executor in many places, so often we are not sure who the executor is, as a result, this accident occurred, clarifying the executor is not a bad thing.

@lhotari
Copy link
Member

lhotari commented Oct 8, 2024

Pulsar uses implicit executor in many places, so often we are not sure who the executor is, as a result, this accident occurred, clarifying the executor is not a bad thing.

It's very hard to be sure whether it had performance impacts. When an executor is defined, it will add extra overhead of queuing to the other executor and the extra thread switching overhead. It depends a lot on the situation whether this causes performance regressions or not.

@nodece
Copy link
Member Author

nodece commented Oct 9, 2024

Do you have any suggestions or recommendations for resolving this issue?

@lhotari
Copy link
Member

lhotari commented Oct 9, 2024

Do you have any suggestions or recommendations for resolving this issue?

@nodece The changes in this PR LGTM. There's no risk of additional overhead of using the separate executor since the number of ops is relatively low for the operations that it applies to. I'll approve this PR. Thanks for the contribution!

lhotari
lhotari previously approved these changes Oct 9, 2024
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari dismissed their stale review October 9, 2024 14:53

A lot of tests fail. This is causing problems.

@nodece nodece force-pushed the use-dedicated-executor-lookup branch from f2fdbbf to ae23e52 Compare October 10, 2024 04:02
@nodece nodece requested a review from lhotari October 10, 2024 04:03
@nodece nodece force-pushed the use-dedicated-executor-lookup branch from ae23e52 to df0949b Compare October 10, 2024 04:06
@nodece nodece force-pushed the use-dedicated-executor-lookup branch from df0949b to 4f31a8e Compare October 10, 2024 06:38
@codecov-commenter
Copy link

codecov-commenter commented Oct 10, 2024

Codecov Report

Attention: Patch coverage is 80.00000% with 8 lines in your changes missing coverage. Please review.

Project coverage is 74.28%. Comparing base (bbc6224) to head (179f950).
Report is 667 commits behind head on master.

Files with missing lines Patch % Lines
...e/pulsar/client/impl/BinaryProtoLookupService.java 80.95% 3 Missing and 1 partial ⚠️
...rg/apache/pulsar/client/impl/PulsarClientImpl.java 81.25% 3 Missing ⚠️
...n/java/org/apache/pulsar/broker/PulsarService.java 66.66% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23378      +/-   ##
============================================
+ Coverage     73.57%   74.28%   +0.71%     
- Complexity    32624    34454    +1830     
============================================
  Files          1877     1953      +76     
  Lines        139502   147200    +7698     
  Branches      15299    16208     +909     
============================================
+ Hits         102638   109350    +6712     
- Misses        28908    29412     +504     
- Partials       7956     8438     +482     
Flag Coverage Δ
inttests 27.32% <62.50%> (+2.73%) ⬆️
systests 24.35% <55.00%> (+0.03%) ⬆️
unittests 73.66% <80.00%> (+0.81%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...n/java/org/apache/pulsar/broker/PulsarService.java 85.20% <66.66%> (+2.83%) ⬆️
...rg/apache/pulsar/client/impl/PulsarClientImpl.java 75.46% <81.25%> (+1.15%) ⬆️
...e/pulsar/client/impl/BinaryProtoLookupService.java 81.16% <80.95%> (-1.38%) ⬇️

... and 634 files with indirect coverage changes

@nodece nodece force-pushed the use-dedicated-executor-lookup branch from 4f31a8e to 94e9d3d Compare October 10, 2024 15:08
@nodece nodece marked this pull request as draft October 10, 2024 15:10
@nodece nodece marked this pull request as ready for review October 10, 2024 15:10
@nodece nodece force-pushed the use-dedicated-executor-lookup branch 2 times, most recently from bc06602 to 7afec1c Compare October 10, 2024 16:10
@nodece nodece requested a review from lhotari October 11, 2024 04:24
@nodece
Copy link
Member Author

nodece commented Oct 11, 2024

@lhotari Your comment has been taken care of.

@nodece
Copy link
Member Author

nodece commented Oct 14, 2024

Any updates?

@@ -428,7 +461,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,

@Override
public void close() throws Exception {
// no-op
if (!createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be createdLookupPinnedExecutor instead of !createdLookupPinnedExecutor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@lhotari
Copy link
Member

lhotari commented Oct 14, 2024

Any updates?

@nodece I added a few review comments.

@nodece
Copy link
Member Author

nodece commented Oct 14, 2024

Ping @lhotari

@nodece nodece requested a review from lhotari October 14, 2024 11:14
@@ -218,11 +231,14 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-lookup");
Copy link
Member

@lhotari lhotari Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that numIoThreads should be used for this purpose. It's already used for the scheduledExecutorProvider, but that's not a great default at all. This could cause regressions due to memory consumption increase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use a single thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's better than numIoThreads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari Done.

…pService

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@nodece nodece force-pushed the use-dedicated-executor-lookup branch from 2d02348 to e52bd09 Compare October 14, 2024 13:35
@nodece nodece requested a review from lhotari October 14, 2024 13:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants