Skip to content

Comments

[fix] Fix deadlock when closing the partitioned producer#187

Merged
shibd merged 1 commit intoapache:mainfrom
RobertIndie:fix-deadlock
Feb 9, 2023
Merged

[fix] Fix deadlock when closing the partitioned producer#187
shibd merged 1 commit intoapache:mainfrom
RobertIndie:fix-deadlock

Conversation

@RobertIndie
Copy link
Member

@RobertIndie RobertIndie commented Feb 3, 2023

Fixes #186

Motivation

This PR fixes the deadlock issue mentioned in #186

The case is that when we create a Partitioned Producer with 2 partitions.
And then we expand the topic to 3 partitions. The PP(Partitioned Producer) will create a new internal producer(Let's called it P3)

But if we close the PP before P3 starts completed. The P3.closeAsync will be called. And it will failed the creation for itself here:

producerCreatedPromise_.setFailed(ResultAlreadyClosed);

The PP then knows the P3 has failed to create and then close PP.closeAsync again:

The internal producers will be closed again can cause the deadlock here:

Lock lock(mutex_);

Here is the sequence diagram for the issue:
image

And here is the stack trace in #186

    frame #6: 0x000000010c5d7672 pulsar-tests`pulsar::ProducerImpl::closeAsync(this=0x00007fb19e012c20, originalCallback=<unavailable>)>) at ProducerImpl.cc:725:10
    frame #7: 0x000000010c5768a1 pulsar-tests`pulsar::PartitionedProducerImpl::closeAsync(this=0x00007fb19ef04098, originalCallback=<unavailable>)>) at PartitionedProducerImpl.cc:287:23
    frame #8: 0x000000010c57518f pulsar-tests`pulsar::PartitionedProducerImpl::handleSinglePartitionProducerCreated(this=0x00007fb19ef04098, result=ResultAlreadyClosed, producerWeakPtr=<unavailable>, partitionIndex=2) at PartitionedProducerImpl.cc:166:13
    frame #9: 0x000000010c582c9c pulsar-tests`decltype(__f=0x0000600002699868, __a0=std::__1::shared_ptr<pulsar::PartitionedProducerImpl>::element_type @ 0x00007fb19ef04098 strong=8 weak=4, __args=0x00007ff7b4127fa4, __args=nullptr, __args=0x0000600002699888).*fp(static_cast<pulsar::Result>(fp1), static_cast<std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(fp1), static_cast<unsigned int&>(fp1))) std::__1::__invoke<void (pulsar::PartitionedProducerImpl::*&)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>&, pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> const&, unsigned int&, void>(void (pulsar::PartitionedProducerImpl::*&)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>&, pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&, unsigned int&) at type_traits:3859:1
    frame #10: 0x000000010c582bb4 pulsar-tests`std::__1::__bind_return<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned int>, std::__1::tuple<pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&>, __is_valid_bind_return<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned int>, std::__1::tuple<pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&>>::value>::type std::__1::__apply_functor<void (__f=0x0000600002699868, __bound_args=size=4, (null)=__tuple_indices<0, 1, 2, 3> @ 0x00007ff7b4127dd8, __args=size=2)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned int>, 0ul, 1ul, 2ul, 3ul, std::__1::tuple<pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&>>(void (pulsar::PartitionedProducerImpl::*&)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned int>&, std::__1::__tuple_indices<0ul, 1ul, 2ul, 3ul>, std::__1::tuple<pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&>&&) at bind.h:257:12
    frame #11: 0x000000010c582b0b pulsar-tests`std::__1::__bind_return<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned int>, std::__1::tuple<pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&>, __is_valid_bind_return<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::tuple<std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1>, std::__1::placeholders::__ph<2>, unsigned int>, std::__1::tuple<pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&>>::value>::type std::__1::__bind<void (this=0x0000600002699868, __args=0x00007ff7b4127fa4, __args=nullptr)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>::operator()<pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&) at bind.h:292:20
    frame #12: 0x000000010c582a95 pulsar-tests`decltype(__f=0x0000600002699868, __args=0x00007ff7b4127fa4, __args=nullptr)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>&>(fp)(static_cast<pulsar::Result>(fp0), static_cast<std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(fp0))) std::__1::__invoke<std::__1::__bind<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>&, pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(std::__1::__bind<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>&, pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&) at type_traits:3918:1
    frame #13: 0x000000010c582a47 pulsar-tests`void std::__1::__invoke_void_return_wrapper<void, true>::__call<std::__1::__bind<void (__args=0x0000600002699868, __args=0x00007ff7b4127fa4, __args=nullptr)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>&, pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> const&>(std::__1::__bind<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>&, pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&) at invoke.h:61:9
    frame #14: 0x000000010c5829f7 pulsar-tests`std::__1::__function::__alloc_func<std::__1::__bind<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>, std::__1::allocator<std::__1::__bind<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>>, void (pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> const&)>::operator(this=0x0000600002699868, __arg=0x00007ff7b4127fa4, __arg=nullptr)(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&) at function.h:178:16
    frame #15: 0x000000010c5815d6 pulsar-tests`std::__1::__function::__func<std::__1::__bind<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>, std::__1::allocator<std::__1::__bind<void (pulsar::PartitionedProducerImpl::*)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int), std::__1::shared_ptr<pulsar::PartitionedProducerImpl>, std::__1::placeholders::__ph<1> const&, std::__1::placeholders::__ph<2> const&, unsigned int&>>, void (pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> const&)>::operator(this=0x0000600002699860, __arg=0x00007ff7b4127fa4, __arg=nullptr)(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&) at function.h:352:12
    frame #16: 0x000000010c5edf2f pulsar-tests`std::__1::__function::__value_func<void (pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> const&)>::operator(this=0x0000600003d999d0, __args=0x00007ff7b4127fa4, __args=nullptr)(pulsar::Result&&, std::__1::weak_ptr<pulsar::ProducerImplBase> const&) const at function.h:505:16
    frame #17: 0x000000010c5edbf1 pulsar-tests`std::__1::function<void (pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> const&)>::operator(this= Function = pulsar::PartitionedProducerImpl::handleSinglePartitionProducerCreated(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>, unsigned int) , __arg=ResultAlreadyClosed, __arg=nullptr)(pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase> const&) const at function.h:1182:12
    frame #18: 0x000000010c5d26b0 pulsar-tests`pulsar::Promise<pulsar::Result, std::__1::weak_ptr<pulsar::ProducerImplBase>>::setFailed(this=0x00007fb19e013988, result=ResultAlreadyClosed) const at Future.h:156:13
    frame #19: 0x000000010c5daece pulsar-tests`pulsar::ProducerImpl::shutdown(this=0x00007fb19e012c20) at ProducerImpl.cc:945:29
    frame #20: 0x000000010c5d7ed3 pulsar-tests`pulsar::ProducerImpl::closeAsync(this=0x00007ff7b4128670, result=ResultOk)>)::$_6::operator()(pulsar::Result) const at ProducerImpl.cc:716:13

We should not call PartitionedProudcer.closeAsync in handleSinglePartitionProducerCreated when the partitioned producer is already in the closing state.

Modifications

  • Skip handling single partition created when the partitioned producer is already in the closing state.

Verifying this change

  • Make sure that the change passes the CI checks.

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@RobertIndie RobertIndie marked this pull request as draft February 3, 2023 10:22
@RobertIndie RobertIndie self-assigned this Feb 3, 2023
@RobertIndie RobertIndie added this to the 3.2.0 milestone Feb 9, 2023
@RobertIndie RobertIndie marked this pull request as ready for review February 9, 2023 03:12
@RobertIndie RobertIndie requested review from BewareMyPower and shibd and removed request for BewareMyPower February 9, 2023 03:27
@BewareMyPower
Copy link
Contributor

This fix LGTM. But I still think the lock for ProducerImpl::handleCreateProducer introduced in #131 is dangerous. We should avoid acquiring the mutex in any callback, which could be called in the caller's thread and might lead to a deadlock.

@BewareMyPower
Copy link
Contributor

Oh, this deadlock is not related to the lock in handleCreateProducer, I just found handleCreateProducer is also stuck from the thread stacks.

@shibd
Copy link
Member

shibd commented Feb 9, 2023

Although this modification avoids deadlocks, I would like to discuss some implementation details.

  1. When P3 started failed, why need close all producers? Wouldn't it be better for other functioning producers to send messages? (Suppose P3 is temporarily unavailable due to load balancing)

  2. Other question: When partitions are changed, we can't guarantee that the same key is sent to the same parition-topic, right? (RoundRobinMode)

topicMetadata.getNumPartitions() has changed, The result is different.

// if message has a key, hash the key and return the partition
if (msg.hasPartitionKey()) {
return hash->makeHash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
}

@BewareMyPower
Copy link
Contributor

Wouldn't it be better for other functioning producers to send messages?

I agree with you. The bahavior of the Java client is to remove the producers for all new added partitions. I think we can fix it in another PR.

Other question: When partitions are changed, we can't guarantee that the same key is sent to the same parition-topic, right? (RoundRobinMode)

Yes. It's an expected behavior of the partitions change. Messages with the same key are sent to the same partition only if the partitions does not change.

@shibd shibd merged commit f69d0ce into apache:main Feb 9, 2023
@RobertIndie
Copy link
Member Author

We should avoid acquiring the mutex in any callback, which could be called in the caller's thread and might lead to a deadlock.

Agree. Actually, the first solution I think for this issue is to release the lock in the callback. But it will raise another problem because it's not the root cause. So I changed the solution. But I still think that avoid locking the lock in the use callback is a better practice.

  1. When P3 started failed, why need close all producers? Wouldn't it be better for other functioning producers to send messages? (Suppose P3 is temporarily unavailable due to load balancing)

Yes. We can fix it later. I think we should not let the newly added producer affect the existing Partitioned Producer. In other way, the auto update partitions operation should not affect the existing producer. The java client skips this error.

  1. Other question: When partitions are changed, we can't guarantee that the same key is sent to the same parition-topic, right? (RoundRobinMode)

This is an expected behavior. I think the RoundRobinMode is mostly used as load-balancing under the topic scope. If users want to ensure the order and this requirement, they should use Key-shared subscription.

@BewareMyPower
Copy link
Contributor

Actually, the first solution I think for this issue is to release the lock in the callback.

@RobertIndie I just wrote another solution based on this idea in my local env. But this PR is more simple and solves the deadlock directly so I prefer this PR. We can do more enhancements in future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Deadlock when closing the PartitionedProduce after update partitions

3 participants