Skip to content
This repository was archived by the owner on Apr 1, 2024. It is now read-only.

Commit 95a53f3

Browse files
[fix][client] fix Reader.hasMessageAvailable might return true after seeking to latest (apache#22201)
1 parent e84516f commit 95a53f3

File tree

3 files changed

+96
-37
lines changed

3 files changed

+96
-37
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.testng.Assert;
6969
import org.testng.annotations.AfterMethod;
7070
import org.testng.annotations.BeforeMethod;
71+
import org.testng.annotations.DataProvider;
7172
import org.testng.annotations.Test;
7273

7374
@Slf4j
@@ -813,4 +814,30 @@ public void testReaderReconnectedFromNextEntry() throws Exception {
813814
producer.close();
814815
admin.topics().delete(topic, false);
815816
}
817+
818+
@DataProvider
819+
public static Object[][] initializeLastMessageIdInBroker() {
820+
return new Object[][] { { true }, { false } };
821+
}
822+
823+
@Test(dataProvider = "initializeLastMessageIdInBroker")
824+
public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBroker) throws Exception {
825+
final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek";
826+
@Cleanup Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
827+
.startMessageId(MessageId.earliest).create();
828+
829+
@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
830+
producer.send("msg");
831+
832+
if (initializeLastMessageIdInBroker) {
833+
assertTrue(reader.hasMessageAvailable());
834+
} // else: lastMessageIdInBroker is earliest
835+
836+
reader.seek(MessageId.latest);
837+
// lastMessageIdInBroker is the last message ID, while startMessageId is still earliest
838+
assertFalse(reader.hasMessageAvailable());
839+
840+
producer.send("msg");
841+
assertTrue(reader.hasMessageAvailable());
842+
}
816843
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 68 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
167167
private volatile MessageIdAdv startMessageId;
168168

169169
private volatile MessageIdAdv seekMessageId;
170-
private final AtomicBoolean duringSeek;
170+
@VisibleForTesting
171+
final AtomicReference<SeekStatus> seekStatus;
172+
private volatile CompletableFuture<Void> seekFuture;
171173

172174
private final MessageIdAdv initialStartMessageId;
173175

@@ -304,7 +306,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
304306
stats = ConsumerStatsDisabled.INSTANCE;
305307
}
306308

307-
duringSeek = new AtomicBoolean(false);
309+
seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED);
308310

309311
// Create msgCrypto if not created already
310312
if (conf.getCryptoKeyReader() != null) {
@@ -781,15 +783,15 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
781783
closeConsumerTasks();
782784
deregisterFromClientCnx();
783785
client.cleanupConsumer(this);
784-
clearReceiverQueue();
786+
clearReceiverQueue(false);
785787
return CompletableFuture.completedFuture(null);
786788
}
787789

788790
log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
789791
topic, subscription, cnx.ctx().channel(), consumerId);
790792

791793
long requestId = client.newRequestId();
792-
if (duringSeek.get()) {
794+
if (seekStatus.get() != SeekStatus.NOT_STARTED) {
793795
acknowledgmentsGroupingTracker.flushAndClean();
794796
}
795797

@@ -800,7 +802,8 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
800802
int currentSize;
801803
synchronized (this) {
802804
currentSize = incomingMessages.size();
803-
startMessageId = clearReceiverQueue();
805+
setClientCnx(cnx);
806+
clearReceiverQueue(true);
804807
if (possibleSendToDeadLetterTopicMessages != null) {
805808
possibleSendToDeadLetterTopicMessages.clear();
806809
}
@@ -838,7 +841,6 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
838841
// synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them
839842
final CompletableFuture<Void> future = new CompletableFuture<>();
840843
synchronized (this) {
841-
setClientCnx(cnx);
842844
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
843845
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
844846
conf.isReplicateSubscriptionState(),
@@ -943,15 +945,24 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize
943945
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
944946
* not seen by the application.
945947
*/
946-
private MessageIdAdv clearReceiverQueue() {
948+
private void clearReceiverQueue(boolean updateStartMessageId) {
947949
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
948950
incomingMessages.drainTo(currentMessageQueue);
949951
resetIncomingMessageSize();
950952

951-
if (duringSeek.compareAndSet(true, false)) {
952-
return seekMessageId;
953+
CompletableFuture<Void> seekFuture = this.seekFuture;
954+
MessageIdAdv seekMessageId = this.seekMessageId;
955+
956+
if (seekStatus.get() != SeekStatus.NOT_STARTED) {
957+
if (updateStartMessageId) {
958+
startMessageId = seekMessageId;
959+
}
960+
if (seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) {
961+
internalPinnedExecutor.execute(() -> seekFuture.complete(null));
962+
}
963+
return;
953964
} else if (subscriptionMode == SubscriptionMode.Durable) {
954-
return startMessageId;
965+
return;
955966
}
956967

957968
if (!currentMessageQueue.isEmpty()) {
@@ -968,15 +979,14 @@ private MessageIdAdv clearReceiverQueue() {
968979
}
969980
// release messages if they are pooled messages
970981
currentMessageQueue.forEach(Message::release);
971-
return previousMessage;
972-
} else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
982+
if (updateStartMessageId) {
983+
startMessageId = previousMessage;
984+
}
985+
} else if (updateStartMessageId && !lastDequeuedMessageId.equals(MessageId.earliest)) {
973986
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
974987
// in the past
975-
return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
976-
} else {
977-
// No message was received or dequeued by this consumer. Next message would still be the startMessageId
978-
return startMessageId;
979-
}
988+
startMessageId = new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
989+
} // else: No message was received or dequeued by this consumer. Next message would still be the startMessageId
980990
}
981991

982992
/**
@@ -2249,25 +2259,23 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
22492259
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
22502260
.create();
22512261

2252-
CompletableFuture<Void> seekFuture = new CompletableFuture<>();
2253-
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
2262+
if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED, SeekStatus.IN_PROGRESS)) {
2263+
final String message = String.format(
2264+
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
2265+
topic, subscription, seekBy);
2266+
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
2267+
topic, subscription, seekBy);
2268+
return FutureUtil.failedFuture(new IllegalStateException(message));
2269+
}
2270+
seekFuture = new CompletableFuture<>();
2271+
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs);
22542272
return seekFuture;
22552273
}
22562274

22572275
private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
2258-
final Backoff backoff, final AtomicLong remainingTime,
2259-
CompletableFuture<Void> seekFuture) {
2276+
final Backoff backoff, final AtomicLong remainingTime) {
22602277
ClientCnx cnx = cnx();
22612278
if (isConnected() && cnx != null) {
2262-
if (!duringSeek.compareAndSet(false, true)) {
2263-
final String message = String.format(
2264-
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
2265-
topic, subscription, seekBy);
2266-
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
2267-
topic, subscription, seekBy);
2268-
seekFuture.completeExceptionally(new IllegalStateException(message));
2269-
return;
2270-
}
22712279
MessageIdAdv originSeekMessageId = seekMessageId;
22722280
seekMessageId = (MessageIdAdv) seekId;
22732281
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
@@ -2279,14 +2287,25 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
22792287
lastDequeuedMessageId = MessageId.earliest;
22802288

22812289
clearIncomingMessages();
2282-
seekFuture.complete(null);
2290+
CompletableFuture<Void> future = null;
2291+
synchronized (this) {
2292+
if (!hasParentConsumer && cnx() == null) {
2293+
// It's during reconnection, complete the seek future after connection is established
2294+
seekStatus.set(SeekStatus.COMPLETED);
2295+
} else {
2296+
future = seekFuture;
2297+
startMessageId = seekMessageId;
2298+
seekStatus.set(SeekStatus.NOT_STARTED);
2299+
}
2300+
}
2301+
if (future != null) {
2302+
future.complete(null);
2303+
}
22832304
}).exceptionally(e -> {
2284-
// re-set duringSeek and seekMessageId if seek failed
22852305
seekMessageId = originSeekMessageId;
2286-
duringSeek.set(false);
22872306
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
22882307

2289-
seekFuture.completeExceptionally(
2308+
failSeek(
22902309
PulsarClientException.wrap(e.getCause(),
22912310
String.format("Failed to seek the subscription %s of the topic %s to %s",
22922311
subscription, topicName.toString(), seekBy)));
@@ -2295,7 +2314,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
22952314
} else {
22962315
long nextDelay = Math.min(backoff.next(), remainingTime.get());
22972316
if (nextDelay <= 0) {
2298-
seekFuture.completeExceptionally(
2317+
failSeek(
22992318
new PulsarClientException.TimeoutException(
23002319
String.format("The subscription %s of the topic %s could not seek "
23012320
+ "withing configured timeout", subscription, topicName.toString())));
@@ -2306,11 +2325,18 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
23062325
log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms",
23072326
topic, getHandlerName(), nextDelay);
23082327
remainingTime.addAndGet(-nextDelay);
2309-
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture);
2328+
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime);
23102329
}, nextDelay, TimeUnit.MILLISECONDS);
23112330
}
23122331
}
23132332

2333+
private void failSeek(Throwable throwable) {
2334+
CompletableFuture<Void> seekFuture = this.seekFuture;
2335+
if (seekStatus.compareAndSet(SeekStatus.IN_PROGRESS, SeekStatus.NOT_STARTED)) {
2336+
seekFuture.completeExceptionally(throwable);
2337+
}
2338+
}
2339+
23142340
@Override
23152341
public CompletableFuture<Void> seekAsync(long timestamp) {
23162342
String seekBy = String.format("the timestamp %d", timestamp);
@@ -2968,4 +2994,10 @@ boolean isAckReceiptEnabled() {
29682994

29692995
private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
29702996

2997+
@VisibleForTesting
2998+
enum SeekStatus {
2999+
NOT_STARTED,
3000+
IN_PROGRESS,
3001+
COMPLETED
3002+
}
29713003
}

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,14 +283,14 @@ public void testSeekAsyncInternal() {
283283

284284
consumer.setClientCnx(cnx);
285285
consumer.setState(HandlerState.State.Ready);
286+
consumer.seekStatus.set(ConsumerImpl.SeekStatus.NOT_STARTED);
286287

287288
// when
288289
CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
289290
CompletableFuture<Void> secondResult = consumer.seekAsync(1L);
290291

291292
clientReq.complete(null);
292293

293-
// then
294294
assertTrue(firstResult.isDone());
295295
assertTrue(secondResult.isCompletedExceptionally());
296296
verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());

0 commit comments

Comments
 (0)