Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class MessagePublishContext implements PublishContext {
private Topic topic;
private long startTimeNs;
private CompletableFuture<Position> positionFuture;
private long sequenceId;

/**
* Executed from managed ledger thread when the message is persisted.
Expand All @@ -58,11 +59,12 @@ public void completed(Exception exception, long ledgerId, long entryId) {

// recycler
public static MessagePublishContext get(CompletableFuture<Position> positionFuture, String producerName,
Topic topic, long startTimeNs) {
Topic topic, long sequenceId, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.positionFuture = positionFuture;
callback.producerName = producerName;
callback.topic = topic;
callback.sequenceId = sequenceId;
callback.startTimeNs = startTimeNs;
return callback;
}
Expand All @@ -77,6 +79,12 @@ public String getProducerName() {
return producerName;
}

@Override
public long getSequenceId() {
return this.sequenceId;
}


private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>() {
protected MessagePublishContext newObject(Handle<MessagePublishContext> handle) {
return new MessagePublishContext(handle);
Expand All @@ -87,19 +95,20 @@ public void recycle() {
positionFuture = null;
topic = null;
startTimeNs = -1;
sequenceId = -1;
recyclerHandle.recycle(this);
}

/**
* publish mqtt message to pulsar topic, no batch.
*/
public static CompletableFuture<Position> publishMessages(String producerName, Message<byte[]> message,
Topic topic) {
long sequenceId, Topic topic) {
CompletableFuture<Position> future = new CompletableFuture<>();

ByteBuf headerAndPayload = messageToByteBuf(message);
topic.publishMessage(headerAndPayload,
MessagePublishContext.get(future, producerName, topic, System.nanoTime()));
MessagePublishContext.get(future, producerName, topic, sequenceId, System.nanoTime()));
headerAndPayload.release();
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import io.streamnative.pulsar.handlers.mqtt.common.utils.PulsarTopicUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.util.FutureUtil;

Expand All @@ -45,6 +47,8 @@ public abstract class AbstractQosPublishHandler implements QosPublishHandler {
protected final PulsarService pulsarService;
protected final RetainedMessageHandler retainedMessageHandler;
protected final MQTTServerConfiguration configuration;
private final ConcurrentHashMap<String, Long> sequenceIdMap = new ConcurrentHashMap<>();


protected AbstractQosPublishHandler(MQTTService mqttService) {
this.pulsarService = mqttService.getPulsarService();
Expand Down Expand Up @@ -104,9 +108,23 @@ protected CompletableFuture<Position> writeToPulsarTopic(Connection connection,
mqttTopicName = msg.variableHeader().topicName();
}
return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> {
long lastPublishedSequenceId = -1;
if (topic instanceof PersistentTopic) {
final long lastPublishedId = ((PersistentTopic) topic).getLastPublishedSequenceId(producerName);
lastPublishedSequenceId = sequenceIdMap.compute(producerName, (k, v) -> {
long id;
if (v == null) {
id = lastPublishedId + 1;
} else {
id = Math.max(v, lastPublishedId) + 1;
}
return id;
});
}
MessageImpl<byte[]> message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(),
msg.payload().nioBuffer());
CompletableFuture<Position> ret = MessagePublishContext.publishMessages(producerName, message, topic);
CompletableFuture<Position> ret = MessagePublishContext.publishMessages(producerName, message,
lastPublishedSequenceId, topic);
message.recycle();
return ret.thenApply(position -> {
if (checkSubscription && topic.getSubscriptions().isEmpty()) {
Expand Down
6 changes: 6 additions & 0 deletions tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-mqtt-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.streamnative</groupId>
<artifactId>testmocks</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,22 @@ public void testSendAndConsume() throws Exception {
received.ack();
connection.disconnect();
}

@Test
public void testDedup() throws Exception {
MQTT mqtt = createMQTTClient();
String topicName = "testDedup";
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = { new Topic(topicName, QoS.AT_MOST_ONCE) };
connection.subscribe(topics);
String message = "Hello MQTT";
for (int i = 1; i <= 10; i++) {
connection.publish(topicName, (message + i).getBytes(), QoS.AT_MOST_ONCE, false);
Message received = connection.receive();
Assert.assertEquals(received.getTopic(), topicName);
Assert.assertEquals(new String(received.getPayload()), message + i);
}
connection.disconnect();
}
}
Loading