Skip to content

Commit 4bbd150

Browse files
authored
[ML] fixes testWatchdog test verifying matcher is interrupted on timeout (#62391)
Constructing the timout checker FIRST and THEN registering the watcher allows the test to have a race condition. The timeout value could be reached BEFORE the matcher is added. To prevent the matcher never being interrupted, a new timedOut value is added to the watcher thread entry. Then when a new matcher is registered, if the thread was previously timedout, we interrupt the matcher immediately. closes #48861
1 parent 1835337 commit 4bbd150

File tree

2 files changed

+19
-10
lines changed

2 files changed

+19
-10
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ public synchronized void close() {
8484
* @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time.
8585
*/
8686
public void check(String where) {
87-
8887
if (timeoutExceeded) {
8988
throw new ElasticsearchTimeoutException("Aborting " + operation + " during [" + where +
9089
"] as it has taken longer than the timeout of [" + timeout + "]");
@@ -101,7 +100,6 @@ public void check(String where) {
101100
* @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time.
102101
*/
103102
public Map<String, Object> grokCaptures(Grok grok, String text, String where) {
104-
105103
try {
106104
return grok.captures(text);
107105
} finally {
@@ -137,12 +135,15 @@ void add(Thread thread, TimeValue timeout) {
137135
}
138136

139137
@Override
140-
public void register(Matcher matcher) {
138+
public synchronized void register(Matcher matcher) {
141139
WatchDogEntry value = registry.get(Thread.currentThread());
142140
if (value != null) {
143141
boolean wasFalse = value.registered.compareAndSet(false, true);
144142
assert wasFalse;
145143
value.matchers.add(matcher);
144+
if (value.isTimedOut()) {
145+
matcher.interrupt();
146+
}
146147
}
147148
}
148149

@@ -167,8 +168,9 @@ void remove(Thread thread) {
167168
assert previousValue != null;
168169
}
169170

170-
void interruptLongRunningThreadIfRegistered(Thread thread) {
171+
synchronized void interruptLongRunningThreadIfRegistered(Thread thread) {
171172
WatchDogEntry value = registry.get(thread);
173+
value.timedOut();
172174
if (value.registered.get()) {
173175
for (Matcher matcher : value.matchers) {
174176
matcher.interrupt();
@@ -181,12 +183,21 @@ static class WatchDogEntry {
181183
final TimeValue timeout;
182184
final AtomicBoolean registered;
183185
final Collection<Matcher> matchers;
186+
boolean timedOut;
184187

185188
WatchDogEntry(TimeValue timeout) {
186189
this.timeout = timeout;
187190
this.registered = new AtomicBoolean(false);
188191
this.matchers = new CopyOnWriteArrayList<>();
189192
}
193+
194+
private void timedOut() {
195+
timedOut = true;
196+
}
197+
198+
private boolean isTimedOut() {
199+
return timedOut;
200+
}
190201
}
191202
}
192203
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,21 +59,19 @@ public void testCheckTimeoutExceeded() throws Exception {
5959
}
6060
}
6161

62-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48861")
6362
public void testWatchdog() throws Exception {
64-
TimeValue timeout = TimeValue.timeValueMillis(500);
63+
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(10, 500));
6564
try (TimeoutChecker timeoutChecker = new TimeoutChecker("watchdog test", timeout, scheduler)) {
66-
TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog;
67-
65+
final TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog;
6866
Matcher matcher = mock(Matcher.class);
69-
TimeoutChecker.watchdog.register(matcher);
67+
watchdog.register(matcher);
7068
assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(1));
7169
try {
7270
assertBusy(() -> {
7371
verify(matcher).interrupt();
7472
});
7573
} finally {
76-
TimeoutChecker.watchdog.unregister(matcher);
74+
watchdog.unregister(matcher);
7775
assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(0));
7876
}
7977
}

0 commit comments

Comments
 (0)