Skip to content

Comments

[fix] Avoid resource leakage of AckGroupingTracker#185

Merged
BewareMyPower merged 3 commits intoapache:mainfrom
erobot:fix-ack-tracker-leakage
Feb 7, 2023
Merged

[fix] Avoid resource leakage of AckGroupingTracker#185
BewareMyPower merged 3 commits intoapache:mainfrom
erobot:fix-ack-tracker-leakage

Conversation

@erobot
Copy link
Contributor

@erobot erobot commented Feb 2, 2023

Motivation

Avoid resource leakage of AckGroupingTracker.

The result of AckGroupingTracker leakage is waste of cpu. In our case, after a large number (30w+) of consumer creation failures, pulsar client io threads use 99% cpu when no operations.

There are two problems about the leakage:

  • In the current code, if consumer creation failed, ConsumerImpl::ackGroupingTrackerPtr_ will not close.
  • AckGroupingTrackerEnabled has race condition between close and reschedule, and may continue reschedule after close.

Modifications

  • ConsumerImpl: close ackGroupingTrackerPtr_ when shutdown
  • AckGroupingTrackerEnabled: add state, and check state before reschedule

Verifying this change

  • Make sure that the change passes the CI checks.

Can verify this change by adding a log and using a test program. The leakage resources are hold in boost::asio::io_service and seems not easy to write a unit test to verify.

The test program trys to trigger AckGroupingTracker leakage. If there is a leak, will print a lot of test log 'reschedule AckGroupingTrackerEnabled'.

Add a test log:

void AckGroupingTrackerEnabled::scheduleTimer() {
    // ......
    this->timer_->async_wait([this, self](const boost::system::error_code& ec) -> void {
        if (!ec) {
            this->flush();
            this->scheduleTimer();
            LOG_INFO("reschedule AckGroupingTrackerEnabled");  // add a log when reschedule
        }
    });
}

Test program:

int main() {
    Client client("pulsar://localhost:6650");

    Consumer consumer;
    ConsumerConfiguration config;
    config.setConsumerType(ConsumerType::ConsumerExclusive);
    config.setAckGroupingTimeMs(1);

    // create exclusive consumer
    Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer);
    if (result != ResultOk) {
        LOG_ERROR("Failed to subscribe: " << result);
        return -1;
    }

    // create other consumer, will fail
    for (int i = 0; i < 1000; ++i) {
        Result result =
            client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer);
        assert(result != ResultOk);
        (void)result;
    }

    consumer.close();

    LOG_INFO("sleep, should not have reschedule logs below");
    std::this_thread::sleep_for(std::chrono::seconds(60));

    client.close();
    return 0;
}

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    Bug fix only.

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@erobot erobot force-pushed the fix-ack-tracker-leakage branch from 6c498bf to 1daf3fe Compare February 2, 2023 17:06
@BewareMyPower BewareMyPower added the bug Something isn't working label Feb 3, 2023
@BewareMyPower BewareMyPower added this to the 3.2.0 milestone Feb 3, 2023
@BewareMyPower
Copy link
Contributor

I observed a similar bug long days ago: apache/pulsar#8914. This PR might also fix that problem. The scheduleTimer method should not be called without any state check.

Ready,
Closed,
};
std::atomic<State> state_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NotStarted and Closed states are never used, would it better to just use a atomic_bool here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using similar state definition style is a first thought and it can use atomic_bool isClosed_ instead. May be more explicit. I can change to isClosed style latter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to isClosed style.

@BewareMyPower
Copy link
Contributor

There might be something wrong with the Vcpkg installation, I will investigate the reason soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants