Skip to content

Commit

Permalink
Adding max refreshlisteners limit that a replica shard can hold and f…
Browse files Browse the repository at this point in the history
…orce refresh.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Mar 9, 2023
1 parent b33aca2 commit 4b47039
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,16 +542,13 @@ public void testWaitUntilRefresh() throws Exception {
.execute()
);
}
assertBusy(
() -> {
assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED)));
},
1,
TimeUnit.MINUTES
);

assertEquals(primaryShard.getLatestReplicationCheckpoint().getSeqNo(), replicaShard.getLatestReplicationCheckpoint().getSeqNo());

assertBusy(() -> {
assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED)));
assertEquals(
primaryShard.getLatestReplicationCheckpoint().getSeqNo(),
replicaShard.getLatestReplicationCheckpoint().getSeqNo()
);
}, 1, TimeUnit.MINUTES);
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,34 +201,42 @@ public boolean addOrNotify(Translog.Location location, Consumer<Boolean> listene
}

/**
* Add a listener for refreshes, calling it immediately if the max sequence number is already visible.
* Add a listener for refreshes, calling it immediately if the max sequence number is already visible. If this runs out of listener slots then it
* forces a refresh and calls the listener immediately as well.
*
* @param maxSeqNo the Sequence number to listen on segment replication enabled replica shards
* @param listener for the refresh.It waits until max sequence number is visible.
* @param listener for the refresh.It waits until max sequence number is visible. Called with true if registering the listener ran it out of slots and forced a refresh. Called with
* false otherwise.
* @return did we call the listener (true) or register the listener to call later (false)?
*/
public void addOrNotify(Long maxSeqNo, Consumer<Boolean> listener) {
public boolean addOrNotify(Long maxSeqNo, Consumer<Boolean> listener) {
requireNonNull(listener, "listener cannot be null");
requireNonNull(maxSeqNo, "location cannot be null");

if (lastMaxSeqNo != SequenceNumbers.NO_OPS_PERFORMED && lastMaxSeqNo >= maxSeqNo) {
listener.accept(false);
return;
return true;
}
if (refreshForcers == 0) {
synchronized (this) {
if (closed) {
throw new IllegalStateException("can't wait for refresh on a closed index");
}
List<Tuple<Long, Consumer<Boolean>>> listeners = seqNoRefreshListeners;
synchronized (this) {
if (closed) {
throw new IllegalStateException("can't wait for refresh on a closed index");
}
List<Tuple<Long, Consumer<Boolean>>> listeners = seqNoRefreshListeners;
final int maxRefreshes = getMaxRefreshListeners.getAsInt();
if (refreshForcers == 0 && maxRefreshes > 0 && (listeners == null || listeners.size() < maxRefreshes)) {
Consumer<Boolean> contextPreservingListener = getContextListener(listener);
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(new Tuple<>(maxSeqNo, contextPreservingListener));
seqNoRefreshListeners = listeners;
return false;
}
} else {
listener.accept(false);
}
// No free slot so force a refresh and call the listener in this thread
forceRefresh.run();
listener.accept(true);
return true;
}

private Consumer<Boolean> getContextListener(Consumer<Boolean> listener) {
Expand Down

0 comments on commit 4b47039

Please sign in to comment.