Skip to content

Commit 19c67d6

Browse files
committed
[ML] Add graceful retry for bulk index results failures
1 parent 0390ec3 commit 19c67d6

File tree

14 files changed

+472
-55
lines changed

14 files changed

+472
-55
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.apache.logging.log4j.Logger;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.bulk.BulkItemResponse;
11+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
12+
import org.elasticsearch.action.bulk.BulkResponse;
13+
import org.elasticsearch.action.index.IndexRequest;
14+
import org.elasticsearch.action.support.WriteRequest;
15+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
16+
import org.elasticsearch.cluster.metadata.IndexMetaData;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.unit.TimeValue;
19+
import org.elasticsearch.xpack.core.action.util.PageParams;
20+
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
21+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
22+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
23+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
24+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
25+
import org.elasticsearch.xpack.core.ml.job.config.Job;
26+
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
27+
import org.elasticsearch.xpack.core.ml.job.results.Result;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
31+
import java.util.Collections;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
import java.util.function.Consumer;
35+
36+
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
37+
import static org.hamcrest.Matchers.greaterThan;
38+
39+
public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase {
40+
41+
private String index = "bulk-failure-retry";
42+
private long now = System.currentTimeMillis();
43+
private String jobId = "bulk-failure-retry-job";
44+
private String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job";
45+
46+
@Before
47+
public void putPastDataIntoIndex() {
48+
client().admin().indices().prepareCreate(index)
49+
.addMapping("type", "time", "type=date", "value", "type=long")
50+
.get();
51+
long oneDayAgo = now - 86400000;
52+
long twoDaysAgo = oneDayAgo - 86400000;
53+
writeData(logger, index, 128, twoDaysAgo, oneDayAgo);
54+
}
55+
56+
@After
57+
public void cleanUpTest() {
58+
client().admin()
59+
.cluster()
60+
.prepareUpdateSettings()
61+
.setTransientSettings(Settings.builder()
62+
.putNull("logger.org.elasticsearch.xpack.ml")
63+
.putNull("xpack.ml.persist_results_max_retries")
64+
.build()).get();
65+
cleanUp();
66+
}
67+
68+
private void ensureAnomaliesWrite() throws InterruptedException {
69+
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
70+
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
71+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
72+
blockingCall(
73+
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
74+
acknowledgedResponseHolder,
75+
exceptionHolder);
76+
if (exceptionHolder.get() != null) {
77+
logger.error("FAILED TO MARK ["+ resultsIndex + "] as read-write again", exceptionHolder.get());
78+
}
79+
}
80+
81+
private void setAnomaliesReadOnlyBlock() throws InterruptedException {
82+
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
83+
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
84+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
85+
blockingCall(
86+
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
87+
acknowledgedResponseHolder,
88+
exceptionHolder);
89+
if (exceptionHolder.get() != null) {
90+
logger.error("FAILED TO MARK ["+ resultsIndex + "] as read-write again", exceptionHolder.get());
91+
}
92+
}
93+
94+
public void testBulkFailureRetries() throws Exception {
95+
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
96+
job.setResultsIndexName(jobId);
97+
98+
DatafeedConfig.Builder datafeedConfigBuilder =
99+
createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
100+
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
101+
registerJob(job);
102+
putJob(job);
103+
openJob(job.getId());
104+
registerDatafeed(datafeedConfig);
105+
putDatafeed(datafeedConfig);
106+
startDatafeed(datafeedConfig.getId(), 0L, now - 86400000);
107+
waitUntilJobIsClosed(jobId);
108+
109+
// Get the job stats
110+
Bucket initialLatestBucket = getLatestFinalizedBucket(jobId);
111+
assertThat(initialLatestBucket.getEpoch(), greaterThan(0L));
112+
113+
client().admin()
114+
.cluster()
115+
.prepareUpdateSettings()
116+
.setTransientSettings(Settings.builder()
117+
.put("logger.org.elasticsearch.xpack.ml", "TRACE")
118+
.put("xpack.ml.persist_results_max_retries", "10000")
119+
.build()).get();
120+
121+
setAnomaliesReadOnlyBlock();
122+
123+
int moreDocs = 128;
124+
long oneDayAgo = now - 86400000;
125+
writeData(logger, index, moreDocs, oneDayAgo, now);
126+
127+
openJob(job.getId());
128+
startDatafeed(datafeedConfig.getId(), oneDayAgo, now);
129+
130+
// TODO Any better way?????
131+
Thread.sleep(1000);
132+
ensureAnomaliesWrite();
133+
waitUntilJobIsClosed(jobId);
134+
135+
Bucket newLatestBucket = getLatestFinalizedBucket(jobId);
136+
assertThat(newLatestBucket.getEpoch(), greaterThan(initialLatestBucket.getEpoch()));
137+
}
138+
139+
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
140+
return createJob(id, bucketSpan, function, field, null);
141+
}
142+
143+
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
144+
DataDescription.Builder dataDescription = new DataDescription.Builder();
145+
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
146+
dataDescription.setTimeField("time");
147+
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
148+
149+
Detector.Builder d = new Detector.Builder(function, field);
150+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
151+
analysisConfig.setBucketSpan(bucketSpan);
152+
analysisConfig.setSummaryCountFieldName(summaryCountField);
153+
154+
Job.Builder builder = new Job.Builder();
155+
builder.setId(id);
156+
builder.setAnalysisConfig(analysisConfig);
157+
builder.setDataDescription(dataDescription);
158+
return builder;
159+
}
160+
161+
private void writeData(Logger logger, String index, long numDocs, long start, long end) {
162+
int maxDelta = (int) (end - start - 1);
163+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
164+
for (int i = 0; i < numDocs; i++) {
165+
IndexRequest indexRequest = new IndexRequest(index);
166+
long timestamp = start + randomIntBetween(0, maxDelta);
167+
assert timestamp >= start && timestamp < end;
168+
indexRequest.source("time", timestamp, "value", i);
169+
bulkRequestBuilder.add(indexRequest);
170+
}
171+
BulkResponse bulkResponse = bulkRequestBuilder
172+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
173+
.get();
174+
if (bulkResponse.hasFailures()) {
175+
int failures = 0;
176+
for (BulkItemResponse itemResponse : bulkResponse) {
177+
if (itemResponse.isFailed()) {
178+
failures++;
179+
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
180+
}
181+
}
182+
fail("Bulk response contained " + failures + " failures");
183+
}
184+
logger.info("Indexed [{}] documents", numDocs);
185+
}
186+
187+
private Bucket getLatestFinalizedBucket(String jobId) {
188+
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
189+
getBucketsRequest.setExcludeInterim(true);
190+
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName());
191+
getBucketsRequest.setDescending(true);
192+
getBucketsRequest.setPageParams(new PageParams(0, 1));
193+
return getBuckets(getBucketsRequest).get(0);
194+
}
195+
196+
private <T> void blockingCall(Consumer<ActionListener<T>> function,
197+
AtomicReference<T> response,
198+
AtomicReference<Exception> error) throws InterruptedException {
199+
CountDownLatch latch = new CountDownLatch(1);
200+
ActionListener<T> listener = ActionListener.wrap(
201+
r -> {
202+
response.set(r);
203+
latch.countDown();
204+
},
205+
e -> {
206+
error.set(e);
207+
latch.countDown();
208+
}
209+
);
210+
211+
function.accept(listener);
212+
latch.await();
213+
}
214+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@
228228
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
229229
import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess;
230230
import org.elasticsearch.xpack.ml.job.process.autodetect.NativeAutodetectProcessFactory;
231+
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
231232
import org.elasticsearch.xpack.ml.job.process.normalizer.MultiplyingNormalizerProcess;
232233
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
233234
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
@@ -446,7 +447,8 @@ public List<Setting<?>> getSettings() {
446447
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION,
447448
InferenceProcessor.MAX_INFERENCE_PROCESSORS,
448449
ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
449-
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL
450+
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
451+
AutodetectResultProcessor.PERSIST_RESULTS_MAX_RETRIES
450452
);
451453
}
452454

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private Builder(String jobId) {
9797
* @param bucket The bucket to persist
9898
* @return this
9999
*/
100-
public Builder persistBucket(Bucket bucket) {
100+
public Builder persistBucket(Bucket bucket) throws BulkIndexException {
101101
// If the supplied bucket has records then create a copy with records
102102
// removed, because we never persist nested records in buckets
103103
Bucket bucketWithoutRecords = bucket;
@@ -114,7 +114,7 @@ public Builder persistBucket(Bucket bucket) {
114114
return this;
115115
}
116116

117-
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) {
117+
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) throws BulkIndexException {
118118
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
119119
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
120120
String id = bucketInfluencer.getId();
@@ -130,7 +130,7 @@ private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluen
130130
* @param timingStats timing stats to persist
131131
* @return this
132132
*/
133-
public Builder persistTimingStats(TimingStats timingStats) {
133+
public Builder persistTimingStats(TimingStats timingStats) throws BulkIndexException {
134134
indexResult(
135135
TimingStats.documentId(timingStats.getJobId()),
136136
timingStats,
@@ -145,7 +145,7 @@ public Builder persistTimingStats(TimingStats timingStats) {
145145
* @param records the records to persist
146146
* @return this
147147
*/
148-
public Builder persistRecords(List<AnomalyRecord> records) {
148+
public Builder persistRecords(List<AnomalyRecord> records) throws BulkIndexException {
149149
for (AnomalyRecord record : records) {
150150
logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, record.getId());
151151
indexResult(record.getId(), record, "record");
@@ -161,7 +161,7 @@ public Builder persistRecords(List<AnomalyRecord> records) {
161161
* @param influencers the influencers to persist
162162
* @return this
163163
*/
164-
public Builder persistInfluencers(List<Influencer> influencers) {
164+
public Builder persistInfluencers(List<Influencer> influencers) throws BulkIndexException {
165165
for (Influencer influencer : influencers) {
166166
logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, influencer.getId());
167167
indexResult(influencer.getId(), influencer, "influencer");
@@ -170,30 +170,30 @@ public Builder persistInfluencers(List<Influencer> influencers) {
170170
return this;
171171
}
172172

173-
public Builder persistModelPlot(ModelPlot modelPlot) {
173+
public Builder persistModelPlot(ModelPlot modelPlot) throws BulkIndexException {
174174
logger.trace("[{}] ES BULK ACTION: index model plot to index [{}] with ID [{}]", jobId, indexName, modelPlot.getId());
175175
indexResult(modelPlot.getId(), modelPlot, "model plot");
176176
return this;
177177
}
178178

179-
public Builder persistForecast(Forecast forecast) {
179+
public Builder persistForecast(Forecast forecast) throws BulkIndexException {
180180
logger.trace("[{}] ES BULK ACTION: index forecast to index [{}] with ID [{}]", jobId, indexName, forecast.getId());
181181
indexResult(forecast.getId(), forecast, Forecast.RESULT_TYPE_VALUE);
182182
return this;
183183
}
184184

185-
public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) {
185+
public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) throws BulkIndexException {
186186
logger.trace("[{}] ES BULK ACTION: index forecast request stats to index [{}] with ID [{}]", jobId, indexName,
187187
forecastRequestStats.getId());
188188
indexResult(forecastRequestStats.getId(), forecastRequestStats, Forecast.RESULT_TYPE_VALUE);
189189
return this;
190190
}
191191

192-
private void indexResult(String id, ToXContent resultDoc, String resultType) {
192+
private void indexResult(String id, ToXContent resultDoc, String resultType) throws BulkIndexException {
193193
indexResult(id, resultDoc, ToXContent.EMPTY_PARAMS, resultType);
194194
}
195195

196-
private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) {
196+
private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) throws BulkIndexException {
197197
try (XContentBuilder content = toXContentBuilder(resultDoc, params)) {
198198
bulkRequest.add(new IndexRequest(indexName).id(id).source(content));
199199
} catch (IOException e) {
@@ -208,7 +208,7 @@ private void indexResult(String id, ToXContent resultDoc, ToXContent.Params para
208208
/**
209209
* Execute the bulk action
210210
*/
211-
public void executeRequest() {
211+
public void executeRequest() throws BulkIndexException {
212212
if (bulkRequest.numberOfActions() == 0) {
213213
return;
214214
}
@@ -218,6 +218,7 @@ public void executeRequest() {
218218
BulkResponse addRecordsResponse = client.bulk(bulkRequest).actionGet();
219219
if (addRecordsResponse.hasFailures()) {
220220
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
221+
throw new BulkIndexException(addRecordsResponse);
221222
}
222223
}
223224

@@ -411,4 +412,16 @@ private void logCall(String indexName) {
411412
}
412413
}
413414
}
415+
416+
public static class BulkIndexException extends Exception {
417+
418+
public BulkIndexException(String msg) {
419+
super(msg);
420+
}
421+
422+
public BulkIndexException(BulkResponse bulkResponse) {
423+
this(bulkResponse.buildFailureMessage());
424+
}
425+
426+
}
414427
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,23 @@ public TimingStats getCurrentTimingStats() {
3636
return new TimingStats(currentTimingStats);
3737
}
3838

39-
public void reportBucket(Bucket bucket) {
39+
public void reportBucket(Bucket bucket) throws JobResultsPersister.BulkIndexException {
4040
currentTimingStats.updateStats(bucket.getProcessingTimeMs());
4141
currentTimingStats.setLatestRecordTimestamp(bucket.getTimestamp().toInstant().plusSeconds(bucket.getBucketSpan()));
4242
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
4343
flush();
4444
}
4545
}
4646

47-
public void finishReporting() {
47+
public void finishReporting() throws JobResultsPersister.BulkIndexException {
4848
// Don't flush if current timing stats are identical to the persisted ones
4949
if (currentTimingStats.equals(persistedTimingStats)) {
5050
return;
5151
}
5252
flush();
5353
}
5454

55-
private void flush() {
55+
private void flush() throws JobResultsPersister.BulkIndexException {
5656
persistedTimingStats = new TimingStats(currentTimingStats);
5757
bulkResultsPersister.persistTimingStats(persistedTimingStats);
5858
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.xpack.core.ml.job.config.Job;
1213
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
@@ -235,7 +236,15 @@ public void finishReporting(ActionListener<Boolean> listener) {
235236
totalRecordStats.setLastDataTimeStamp(now);
236237
diagnostics.flush();
237238
retrieveDiagnosticsIntermediateResults();
238-
dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), listener);
239+
dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), ActionListener.wrap(
240+
listener::onResponse,
241+
e -> {
242+
// Recording data counts should not cause the job processing to fail.
243+
// Log the failure and move on.
244+
logger.warn(() -> new ParameterizedMessage("[{}] failed to record data counts", job.getId()), e);
245+
listener.onResponse(true);
246+
}
247+
));
239248
}
240249

241250
/**

0 commit comments

Comments
 (0)