Skip to content

Commit 94d7cbe

Browse files
authored
Correct context for batched reroute notifications (#83019) (#83165)
Today the `BatchedRerouteService` completes its listeners in the thread context of the `reroute()` call that actually triggered the reroute, which will be the correct context only if no batching took place. With this commit we make sure to complete each listener in the context in which it was passed to the corresponding `reroute()` call.
1 parent 902ce4d commit 94d7cbe

File tree

3 files changed

+27
-7
lines changed

3 files changed

+27
-7
lines changed

docs/changelog/83019.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 83019
2+
summary: Correct context for batched reroute notifications
3+
area: Allocation
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
1414
import org.elasticsearch.ElasticsearchException;
1515
import org.elasticsearch.action.ActionListener;
16+
import org.elasticsearch.action.support.ContextPreservingActionListener;
1617
import org.elasticsearch.cluster.ClusterState;
1718
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1819
import org.elasticsearch.cluster.NotMasterException;
@@ -55,6 +56,10 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction<ClusterSt
5556
*/
5657
@Override
5758
public final void reroute(String reason, Priority priority, ActionListener<ClusterState> listener) {
59+
final ActionListener<ClusterState> wrappedListener = ContextPreservingActionListener.wrapPreservingContext(
60+
listener,
61+
clusterService.getClusterApplierService().threadPool().getThreadContext()
62+
);
5863
final List<ActionListener<ClusterState>> currentListeners;
5964
synchronized (mutex) {
6065
if (pendingRerouteListeners != null) {
@@ -65,7 +70,7 @@ public final void reroute(String reason, Priority priority, ActionListener<Clust
6570
reason,
6671
priority
6772
);
68-
pendingRerouteListeners.add(listener);
73+
pendingRerouteListeners.add(wrappedListener);
6974
return;
7075
} else {
7176
logger.trace(
@@ -75,7 +80,7 @@ public final void reroute(String reason, Priority priority, ActionListener<Clust
7580
reason
7681
);
7782
currentListeners = new ArrayList<>(1 + pendingRerouteListeners.size());
78-
currentListeners.add(listener);
83+
currentListeners.add(wrappedListener);
7984
currentListeners.addAll(pendingRerouteListeners);
8085
pendingRerouteListeners.clear();
8186
pendingRerouteListeners = currentListeners;
@@ -84,7 +89,7 @@ public final void reroute(String reason, Priority priority, ActionListener<Clust
8489
} else {
8590
logger.trace("no pending reroute, scheduling reroute [{}] at priority [{}]", reason, priority);
8691
currentListeners = new ArrayList<>(1);
87-
currentListeners.add(listener);
92+
currentListeners.add(wrappedListener);
8893
pendingRerouteListeners = currentListeners;
8994
pendingTaskPriority = priority;
9095
}

server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.service.ClusterService;
1717
import org.elasticsearch.common.Priority;
1818
import org.elasticsearch.common.Randomness;
19+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1920
import org.elasticsearch.test.ClusterServiceUtils;
2021
import org.elasticsearch.test.ESTestCase;
2122
import org.elasticsearch.threadpool.TestThreadPool;
@@ -34,6 +35,7 @@
3435
import java.util.concurrent.atomic.AtomicLong;
3536
import java.util.function.Function;
3637

38+
import static org.hamcrest.Matchers.equalTo;
3739
import static org.hamcrest.Matchers.lessThan;
3840

3941
public class BatchedRerouteServiceTests extends ESTestCase {
@@ -99,16 +101,24 @@ public void onFailure(String source, Exception e) {
99101
return s;
100102
});
101103

104+
final ThreadContext threadContext = threadPool.getThreadContext();
105+
final String contextHeader = "test-context-header";
106+
102107
final int iterations = scaledRandomIntBetween(1, 100);
103108
final CountDownLatch tasksSubmittedCountDown = new CountDownLatch(iterations);
104109
final CountDownLatch tasksCompletedCountDown = new CountDownLatch(iterations);
105110
final List<Runnable> actions = new ArrayList<>(iterations);
106111
final Function<Priority, Runnable> rerouteFromPriority = priority -> () -> {
107112
final AtomicBoolean alreadyRun = new AtomicBoolean();
108-
batchedRerouteService.reroute("reroute at " + priority, priority, ActionListener.wrap(() -> {
109-
assertTrue(alreadyRun.compareAndSet(false, true));
110-
tasksCompletedCountDown.countDown();
111-
}));
113+
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
114+
final String contextValue = randomAlphaOfLength(10);
115+
threadContext.putHeader(contextHeader, contextValue);
116+
batchedRerouteService.reroute("reroute at " + priority, priority, ActionListener.wrap(() -> {
117+
assertTrue(alreadyRun.compareAndSet(false, true));
118+
assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue));
119+
tasksCompletedCountDown.countDown();
120+
}));
121+
}
112122
tasksSubmittedCountDown.countDown();
113123
};
114124
actions.add(rerouteFromPriority.apply(Priority.URGENT)); // ensure at least one URGENT priority reroute

0 commit comments

Comments
 (0)