Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] DigestManager should not advance readerIndex #3919

Merged
merged 1 commit into from
Apr 14, 2023

Conversation

michaeljmarshall
Copy link
Member

Descriptions of the changes in this PR:

#3783 introduced a change in the behavior of LedgerHandle#asyncAddEntry. This PR re-introduces the original behavior.

Motivation

Apache Pulsar just upgraded to Bookkeeper client version 4.16.0. In doing so, we observed new errors related to unexpected reader positions in the passed in byte buffer. Here is a thread discussing some of the observations: https://lists.apache.org/thread/n4dpmbo5t9bvq5t1fp8fn4m6c6d9d9so.

Based on my analysis, it seems that the core issue is with the type of ByteBuf being passed to the BK client. Previously, the readerIndex for the ByteBuf that was passed did not change. Now, it seems to move forward. I am assuming this is because of specific conditions related to CompositeByteBuf, but I haven't yet been able to create a test to show the problem. Perhaps someone is more familiar with these data structures and can provide me with insight into how to test this change?

Changes

Update DigestManager#computeDigestAndPackageForSendingV2 in the case of a small entry to use a copy method that does not modify the readerIndex on the parameterized buffer.

Below are the Javadocs for the old and the new method.

    /**
     * Transfers the specified source buffer's data to this buffer starting at
     * the current {@code writerIndex} until the source buffer becomes
     * unreadable, and increases the {@code writerIndex} by the number of
     * the transferred bytes.  This method is basically same with
     * {@link #writeBytes(ByteBuf, int, int)}, except that this method
     * increases the {@code readerIndex} of the source buffer by the number of
     * the transferred bytes while {@link #writeBytes(ByteBuf, int, int)}
     * does not.
     * If {@code this.writableBytes} is less than {@code src.readableBytes},
     * {@link #ensureWritable(int)} will be called in an attempt to expand
     * capacity to accommodate.
     */
    public abstract ByteBuf writeBytes(ByteBuf src);

    /**
     * Transfers the specified source buffer's data to this buffer starting at
     * the current {@code writerIndex} and increases the {@code writerIndex}
     * by the number of the transferred bytes (= {@code length}).
     * If {@code this.writableBytes} is less than {@code length}, {@link #ensureWritable(int)}
     * will be called in an attempt to expand capacity to accommodate.
     *
     * @param srcIndex the first index of the source
     * @param length   the number of bytes to transfer
     *
     * @throws IndexOutOfBoundsException
     *         if the specified {@code srcIndex} is less than {@code 0}, or
     *         if {@code srcIndex + length} is greater than {@code src.capacity}
     */
    public abstract ByteBuf writeBytes(ByteBuf src, int srcIndex, int length);

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch!

@hangc0276 this is a bad regression we should cut a new release ASAP

Copy link
Member

@horizonzy horizonzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@wenbingshen wenbingshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice Catch!

@hangc0276
Copy link
Contributor

How this bug happens on the Pulsar side

Reproduce steps

  1. Add brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor to conf/broker.conf to enable broker entry metadata interceptor
  2. Setup a consumer to a topic-A
  3. Setup a producer without batch and produce a small message (10 bytes) to topic-A
  4. The broker will throw the following exception
2023-04-14T17:06:09,622+0800 [broker-topic-workers-OrderedExecutor-0-0] ERROR org.apache.pulsar.common.protocol.Commands - [PersistentSubscription{topic=persistent://pulsar/test_v1/127.0.0.1:8080/healthcheck, name=healthCheck-d1544425-97ea-4874-8972-d30d2391ee8b}] [-1] Failed to parse message metadata
java.lang.IndexOutOfBoundsException: readerIndex(94) + length(2) exceeds writerIndex(94): UnpooledDuplicatedByteBuf(ridx: 94, widx: 94, cap: 94, unwrapped: CompositeByteBuf(ridx: 94, widx: 94, cap: 94, components=2))
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[io.netty-netty-buffer-4.1.89.Final.jar:4.1.89.Final]
	at io.netty.buffer.AbstractByteBuf.readShort(AbstractByteBuf.java:749) ~[io.netty-netty-buffer-4.1.89.Final.jar:4.1.89.Final]
	at org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist(Commands.java:1692) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:452) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1899) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1918) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:142) ~[org.apache.pulsar-pulsar-broker-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:100) ~[org.apache.pulsar-pulsar-broker-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:210) ~[org.apache

How this bug happens

  1. When producing an entry on the Pulsar topic with AppendBrokerTimestampMetadataInterceptor, the broker will add a meta header for the entry and generate a CompositeByteBuf
    https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L1685-L1687

  2. To isolate the readerIndex and writeIndex change between the Pulsar broker and BookKeeper client, we generate a duplicated ByteBuf for BookieClient to write to the BookKeeper cluster. Ideally, the Broker's ByteBuf's readerIndex won't change.
    https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L126-L144

  3. However, we unwrapped the duplicated ByteBuf if the original ByteBuf is a CompositeByteBuf to improve the crc32c digest for the ByteBuf. If we use the wrapped duplicated ByteBuf to calculate the crc32c digest, it will copy the CompositeByteBuf data from direct memory to heap memory and bring a heavy JVM GC load.

    final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
    ? data.unwrap() : data;

  4. After calculating the crc32c digest, it will write the unwrapped CompositeByteBuf to a new ByteBuf if the ByteBuf size is small. The buf.writeBytes(unwrapped) API will change the unwrapped CompositeByteBuf's readerIndex to the end of the ByteBuf.

https://github.com/netty/netty/blob/24a0ac36ea91d1aee647d738f879ac873892d829/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java#L1086-L1099

  1. After this entry writes succeed, the broker will add this entry to the Broker's cache due to the topic has active consumers. At this comment, the original CompositeByteBuf data's readerIndex has been updated to the end of the ByteBuf on Step 4
    https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L231-L238

  2. When the consumer fetches messages from the topic, it will get the entry from the Broker's Cache directly. Due to the ByteBuf's readerIndex has been set to the end of the ByteBuf, it will throw an IndexOutOfBoundsException when parsing metadata from the entry.

@hangc0276
Copy link
Contributor

Great catch!

@hangc0276 this is a bad regression we should cut a new release ASAP

@eolivelli Sure, I will cut 4.16.1 that just include this PR soon.

@eolivelli eolivelli merged commit df44920 into apache:master Apr 14, 2023
@michaeljmarshall michaeljmarshall deleted the dont-move-reader-index branch April 14, 2023 14:13
hangc0276 pushed a commit that referenced this pull request Apr 14, 2023
Ghatage pushed a commit to sijie/bookkeeper that referenced this pull request Jul 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants