Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand Down Expand Up @@ -125,4 +127,20 @@ private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf hea
public void resetCloseFuture() {
// noop
}

public static final String NONE_KEY = "NONE_KEY";
protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
metadataAndPayload.markReaderIndex();
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.resetReaderIndex();
String key = metadata.getPartitionKey();
if (log.isDebugEnabled()) {
log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
}
if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
}
metadata.recycle();
return NONE_KEY.getBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,6 @@ private int getFirstConsumerIndexOfPriority(int targetPriority) {
return -1;
}

public static final String NONE_KEY = "NONE_KEY";
protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
metadataAndPayload.markReaderIndex();
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.resetReaderIndex();
String key = metadata.getPartitionKey();
if (log.isDebugEnabled()) {
log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
}
if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
}
metadata.recycle();
return NONE_KEY.getBytes();
}

private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
AtomicReferenceFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class, Consumer.class, "activeConsumer");
private volatile Consumer activeConsumer = null;
protected final CopyOnWriteArrayList<Consumer> consumers;
protected StickyKeyConsumerSelector stickyKeyConsumerSelector;
protected boolean isKeyHashRangeFiltered = false;
protected CompletableFuture<Void> closeFuture = null;
protected final int partitionIndex;

Expand Down Expand Up @@ -155,6 +157,17 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

consumers.add(consumer);

if (subscriptionType == SubType.Exclusive
&& consumer.getKeySharedMeta() != null
&& consumer.getKeySharedMeta().getHashRangesList() != null
&& consumer.getKeySharedMeta().getHashRangesList().size() > 0) {
stickyKeyConsumerSelector = new HashRangeExclusiveStickyKeyConsumerSelector();
stickyKeyConsumerSelector.addConsumer(consumer);
isKeyHashRangeFiltered = true;
} else {
isKeyHashRangeFiltered = false;
}

if (!pickAndScheduleActiveConsumer()) {
// the active consumer is not changed
Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -207,6 +209,19 @@ public synchronized void internalReadEntriesComplete(final List<Entry> entries,
readFailureBackoff.reduceToHalf();

Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);

if (isKeyHashRangeFiltered) {
Iterator<Entry> iterator = entries.iterator();
while (iterator.hasNext()) {
Entry entry = iterator.next();
int keyHash = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
Consumer consumer = stickyKeyConsumerSelector.select(keyHash);
if (consumer == null || currentConsumer != consumer) {
iterator.remove();
}
}
}

if (currentConsumer == null || readConsumer != currentConsumer) {
// Active consumer has changed since the read request has been issued. We need to rewind the cursor and
// re-issue the read request for the new consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,43 @@
import com.google.common.collect.Sets;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ReaderTest extends MockedPulsarServiceBaseTest {

private static final String subscription = "reader-sub";
Expand Down Expand Up @@ -286,4 +296,88 @@ public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException
Assert.assertEquals(admin.topics().getInternalStats(topic).cursors.size(), 0);

}

@Test
public void testKeyHashRangeReader() throws IOException {
final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
final String topic = "persistent://my-property/my-ns/testKeyHashRangeReader";

try {
pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(0, 10000), Range.of(8000, 12000))
.create();
fail("should failed with unexpected key hash range");
} catch (IllegalArgumentException e) {
log.error("Create key hash range failed", e);
}

try {
pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(30000, 20000))
.create();
fail("should failed with unexpected key hash range");
} catch (IllegalArgumentException e) {
log.error("Create key hash range failed", e);
}

try {
pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(80000, 90000))
.create();
fail("should failed with unexpected key hash range");
} catch (IllegalArgumentException e) {
log.error("Create key hash range failed", e);
}

@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(0, StickyKeyConsumerSelector.DEFAULT_RANGE_SIZE / 2))
.create();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create();

int expectedMessages = 0;
for (String key : keys) {
int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
% StickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
if (slot <= StickyKeyConsumerSelector.DEFAULT_RANGE_SIZE / 2) {
expectedMessages++;
}
producer.newMessage()
.key(key)
.value(key)
.send();
log.info("Publish message to slot {}", slot);
}

List<String> receivedMessages = new ArrayList<>();

Message<String> msg;
do {
msg = reader.readNext(1, TimeUnit.SECONDS);
if (msg != null) {
receivedMessages.add(msg.getValue());
}
} while (msg != null);

assertTrue(expectedMessages > 0);
assertEquals(receivedMessages.size(), expectedMessages);
for (String receivedMessage : receivedMessages) {
log.info("Receive message {}", receivedMessage);
assertTrue(Integer.valueOf(receivedMessage) <= StickyKeyConsumerSelector.DEFAULT_RANGE_SIZE / 2);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public static class KeySharedPolicySticky extends KeySharedPolicy {
this.ranges = new ArrayList<>();
}

public KeySharedPolicySticky ranges(List<Range> ranges) {
this.ranges.addAll(ranges);
return this;
}

public KeySharedPolicySticky ranges(Range... ranges) {
this.ranges.addAll(Arrays.asList(ranges));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,16 @@ public interface ReaderBuilder<T> extends Cloneable {
* @return the reader builder instance
*/
ReaderBuilder<T> readCompacted(boolean readCompacted);

/**
* Set key hash range of the reader, broker will only dispatch messages which hash of the message key contains by
* the specified key hash range. Multiple key hash ranges can be specified on a reader.
*
* <p>Total hash range size is 65536, so the max end of the range should be less than or equal to 65535.
*
* @param ranges
* key hash ranges for a reader
* @return the reader builder instance
*/
ReaderBuilder<T> keyHashRange(Range... ranges);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,28 @@
*/
package org.apache.pulsar.client.impl;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import lombok.AccessLevel;
import lombok.Getter;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
import static org.apache.pulsar.client.api.KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;

@Getter(AccessLevel.PUBLIC)
public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
Expand Down Expand Up @@ -159,4 +163,25 @@ public ReaderBuilder<T> readCompacted(boolean readCompacted) {
conf.setReadCompacted(readCompacted);
return this;
}

@Override
public ReaderBuilder<T> keyHashRange(Range... ranges) {
Preconditions.checkArgument(ranges != null && ranges.length > 0,
"Cannot specify a null ofr an empty key hash ranges for a reader");
for (int i = 0; i < ranges.length; i++) {
Range range1 = ranges[i];
if (range1.getStart() < 0 || range1.getEnd() > DEFAULT_HASH_RANGE_SIZE) {
throw new IllegalArgumentException("Ranges must be [0, 65535] but provided range is " + range1);
}
for (int j = 0; j < ranges.length; j++) {
Range range2 = ranges[j];
if (i != j && range1.intersect(range2) != null) {
throw new IllegalArgumentException("Key hash ranges with overlap between " + range1
+ " and " + range2);
}
}
}
conf.setKeyHashRanges(Arrays.asList(ranges));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}

if (readerConfiguration.getKeyHashRanges() != null) {
consumerConfiguration.setKeySharedPolicy(
KeySharedPolicy
.stickyHashRange()
.ranges(readerConfiguration.getKeyHashRanges())
);
}

final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration,
listenerExecutor, partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.List;

import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.ReaderListener;

import lombok.Data;
Expand Down Expand Up @@ -54,6 +54,8 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
private boolean readCompacted = false;
private boolean resetIncludeHead = false;

private List<Range> keyHashRanges;

@SuppressWarnings("unchecked")
public ReaderConfigurationData<T> clone() {
try {
Expand Down
16 changes: 16 additions & 0 deletions site2/docs/client-libraries-java.md
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,22 @@ ConsumerCryptoFailureAction|`cryptoFailureAction`|Consumer should take action wh
boolean|`readCompacted`|If enabling `readCompacted`, a consumer reads messages from a compacted topic rather than a full message backlog of a topic.<br/><br/> A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.<br/><br/>`readCompacted` can only be enabled on subscriptions to persistent topics, which have a single active consumer (for example, failure or exclusive subscriptions). <br/><br/>Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a `PulsarClientException`.|false
boolean|`resetIncludeHead`|If set to true, the first message to be returned is the one specified by `messageId`.<br/><br/>If set to false, the first message to be returned is the one next to the message specified by `messageId`.|false

### Sticky key range reader

In sticky key range reader, broker will only dispatch messages which hash of the message key contains by the specified key hash range. Multiple key hash ranges can be specified on a reader.

The following is an example to create a sticky key range reader.

```java
pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
.create();
```

Total hash range size is 65536, so the max end of the range should be less than or equal to 65535.

## Schema

In Pulsar, all message data consists of byte arrays "under the hood." [Message schemas](schema-get-started.md) enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a [producer](#producers) without specifying a schema, then the producer can only produce messages of type `byte[]`. The following is an example.
Expand Down