Skip to content

Commit ec5ef6f

Browse files
authored
Fix #3404 bulk all improvements (#3405)
* Add more explicit support for what to do when dropped documents are encountered. Now defaults to halting the BulkAll which is a saner default * fix audit reporting negative durations * Add FailedOverOnAllNodes audit event * BulkAll now handles bad responses from the _bulk itself as well and optionally bails out early * Fixed tests now excepting FailedOverAllNodes * ContinueAfterDroppedDocuments should not be tested for nullable arg in the code standards test
1 parent df95930 commit ec5ef6f

File tree

14 files changed

+286
-21
lines changed

14 files changed

+286
-21
lines changed

src/Elasticsearch.Net/Auditing/Audit.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public Audit(AuditEvent type, DateTime started)
4141
public override string ToString()
4242
{
4343
var took = Ended - Started;
44+
if (took < TimeSpan.Zero) took = TimeSpan.Zero;
4445
return $"Node: {Node?.Uri}, Event: {Event.GetStringValue()} NodeAlive: {Node?.IsAlive}, Took: {took}";
4546
}
4647
}

src/Elasticsearch.Net/Auditing/AuditEvent.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ public enum AuditEvent
2121
BadRequest,
2222
NoNodesAttempted,
2323
CancellationRequested,
24+
FailedOverAllNodes,
2425
}
2526
}

src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,13 +526,21 @@ public ElasticsearchClientException CreateClientException<TResponse>(
526526
pipelineFailure = PipelineFailure.MaxRetriesReached;
527527
this.Audit(MaxRetriesReached);
528528
exceptionMessage = "Maximum number of retries reached";
529+
530+
var now = this._dateTimeProvider.Now();
531+
// TODO make AliveNodes on IConnectionPool public in 7.0 (default interface C# 8 FTW)
532+
var activeNodes = this._connectionPool.Nodes.Count(n => n.IsAlive || n.DeadUntil <= now);
533+
if (this.Retried >= activeNodes)
534+
{
535+
this.Audit(FailedOverAllNodes);
536+
exceptionMessage += ", failed over to all the known alive nodes before failing";
537+
}
529538
}
530539

531540
exceptionMessage += $". Call: {resource}";
532541
if (response != null && response.TryGetServerErrorReason(out var reason))
533542
exceptionMessage += $". ServerError: {reason}";
534543

535-
536544
var clientException = new ElasticsearchClientException(pipelineFailure, exceptionMessage, innerException)
537545
{
538546
Request = data,

src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class BulkAllObservable<T> : IDisposable, IObservable<IBulkAllResponse> w
2121
private readonly CancellationToken _compositeCancelToken;
2222
private readonly CancellationTokenSource _compositeCancelTokenSource;
2323
private readonly Func<IBulkResponseItem, T, bool> _retryPredicate;
24+
private Action<IBulkResponseItem, T> _droppedDocumentCallBack;
2425

2526
public BulkAllObservable(
2627
IElasticClient client,
@@ -34,6 +35,7 @@ public BulkAllObservable(
3435
this._backOffTime = (this._partionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? CoordinatedRequestDefaults.BulkAllBackOffTimeDefault);
3536
this._bulkSize = this._partionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault;
3637
this._retryPredicate = this._partionedBulkRequest.RetryDocumentPredicate ?? RetryBulkActionPredicate;
38+
this._droppedDocumentCallBack = this._partionedBulkRequest.DroppedDocumentCallback ?? DroppedDocumentCallbackDefault;
3739
this._maxDegreeOfParallelism = _partionedBulkRequest.MaxDegreeOfParallelism ?? CoordinatedRequestDefaults.BulkAllMaxDegreeOfParallelismDefault;
3840
this._compositeCancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
3941
this._compositeCancelToken = this._compositeCancelTokenSource.Token;
@@ -53,9 +55,6 @@ public IDisposable Subscribe(IObserver<IBulkAllResponse> observer)
5355
return this;
5456
}
5557

56-
private static ElasticsearchClientException Throw(string message, IApiCallDetails details) =>
57-
new ElasticsearchClientException(PipelineFailure.BadResponse, message, details);
58-
5958
private void BulkAll(IObserver<IBulkAllResponse> observer)
6059
{
6160
var documents = this._partionedBulkRequest.Documents;
@@ -116,29 +115,85 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
116115

117116
this._compositeCancelToken.ThrowIfCancellationRequested();
118117

119-
var retryDocuments = response.Items.Zip(buffer, (i, d) => new {i, d})
120-
.Where(x=> !response.IsValid && this._retryPredicate(x.i, x.d))
121-
.Select(x => x.d)
118+
if (!response.ApiCall.Success)
119+
return await this.HandleBulkRequest(buffer, page, backOffRetries, response);
120+
121+
var documentsWithResponse = response.Items.Zip(buffer, Tuple.Create).ToList();
122+
123+
this.HandleDroppedDocuments(documentsWithResponse, response);
124+
125+
var retryDocuments = documentsWithResponse
126+
.Where(x=> !response.IsValid && this._retryPredicate(x.Item1, x.Item2))
127+
.Select(x => x.Item2)
122128
.ToList();
123129

124130
if (retryDocuments.Count > 0 && backOffRetries < this._backOffRetries)
131+
return await this.RetryDocuments(page, ++backOffRetries, retryDocuments);
132+
else if (retryDocuments.Count > 0)
133+
throw this.ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times");
134+
135+
this._partionedBulkRequest.BackPressure?.Release();
136+
return new BulkAllResponse { Retries = backOffRetries, Page = page };
137+
}
138+
139+
private void HandleDroppedDocuments(List<Tuple<IBulkResponseItem, T>> documentsWithResponse, IBulkResponse response)
140+
{
141+
var droppedDocuments = documentsWithResponse
142+
.Where(x => !response.IsValid && !this._retryPredicate(x.Item1, x.Item2))
143+
.ToList();
144+
if (droppedDocuments.Count > 0 && !this._partionedBulkRequest.ContinueAfterDroppedDocuments)
145+
throw this.ThrowOnBadBulk(response, $"BulkAll halted after receiving failures that can not be retried from _bulk");
146+
else if (droppedDocuments.Count > 0 && this._partionedBulkRequest.ContinueAfterDroppedDocuments)
125147
{
126-
this._incrementRetries();
127-
await Task.Delay(this._backOffTime, this._compositeCancelToken).ConfigureAwait(false);
128-
return await this.BulkAsync(retryDocuments, page, ++backOffRetries).ConfigureAwait(false);
148+
foreach (var dropped in droppedDocuments) this._droppedDocumentCallBack(dropped.Item1, dropped.Item2);
129149
}
130-
if (retryDocuments.Count > 0)
150+
}
151+
152+
private async Task<IBulkAllResponse> HandleBulkRequest(IList<T> buffer, long page, int backOffRetries, IBulkResponse response)
153+
{
154+
var clientException = response.ApiCall.OriginalException as ElasticsearchClientException;
155+
//TODO expose this on IAPiCallDetails as RetryLater in 7.0?
156+
var failureReason = clientException?.FailureReason.GetValueOrDefault(PipelineFailure.Unexpected);
157+
switch (failureReason)
131158
{
132-
this._incrementFailed();
133-
this._partionedBulkRequest.BackPressure?.Release();
134-
throw Throw($"Bulk indexing failed and after retrying {backOffRetries} times", response.ApiCall);
159+
case PipelineFailure.MaxRetriesReached:
160+
//TODO move this to its own PipelineFailure classification in 7.0
161+
if (response.ApiCall.AuditTrail.Last().Event == AuditEvent.FailedOverAllNodes)
162+
throw this.ThrowOnBadBulk(response, $"BulkAll halted after attempted bulk failed over all the active nodes");
163+
return await this.RetryDocuments(page, ++backOffRetries, buffer);
164+
case PipelineFailure.CouldNotStartSniffOnStartup:
165+
case PipelineFailure.BadAuthentication:
166+
case PipelineFailure.NoNodesAttempted:
167+
case PipelineFailure.SniffFailure:
168+
case PipelineFailure.Unexpected:
169+
throw this.ThrowOnBadBulk(response,
170+
$"BulkAll halted after {nameof(PipelineFailure)}{failureReason.GetStringValue()} from _bulk");
171+
default:
172+
return await this.RetryDocuments(page, ++backOffRetries, buffer);
135173
}
174+
}
175+
176+
private async Task<IBulkAllResponse> RetryDocuments(long page, int backOffRetries, IList<T> retryDocuments)
177+
{
178+
this._incrementRetries();
179+
await Task.Delay(this._backOffTime, this._compositeCancelToken).ConfigureAwait(false);
180+
return await this.BulkAsync(retryDocuments, page, backOffRetries).ConfigureAwait(false);
181+
}
182+
183+
private Exception ThrowOnBadBulk(IElasticsearchResponse response, string message)
184+
{
185+
this._incrementFailed();
136186
this._partionedBulkRequest.BackPressure?.Release();
137-
return new BulkAllResponse {Retries = backOffRetries, Page = page};
187+
return Throw(message, response.ApiCall);
138188
}
189+
private static ElasticsearchClientException Throw(string message, IApiCallDetails details) =>
190+
new ElasticsearchClientException(PipelineFailure.BadResponse, message, details);
191+
139192

140193
private static bool RetryBulkActionPredicate(IBulkResponseItem bulkResponseItem, T d) => bulkResponseItem.Status == 429;
141194

195+
private static void DroppedDocumentCallbackDefault(IBulkResponseItem bulkResponseItem, T d) { }
196+
142197
public bool IsDisposed { get; private set; }
143198

144199
public void Dispose()

src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ public interface IBulkAllRequest<T> where T : class
7373
/// A predicate which controls which documents should be retried, defaults to failed bulk items with status code 429
7474
/// </summary>
7575
Func<IBulkResponseItem, T, bool> RetryDocumentPredicate { get; set; }
76+
77+
/// <summary>
78+
/// Halt the bulk all request if any of the documents returned is a failure that can not be retried.
79+
/// When true, will feed dropped documents to <see cref="DroppedDocumentCallback"/>
80+
/// </summary>
81+
bool ContinueAfterDroppedDocuments { get; set; }
82+
83+
/// <summary>
84+
/// If <see cref="ContinueAfterDroppedDocuments"/> is set to true dropped messages will be fed through
85+
/// this callback. Use this if you don't expect many failures and want to feed these dropped messages in a dead letter queue
86+
/// for instance.
87+
/// </summary>
88+
Action<IBulkResponseItem, T> DroppedDocumentCallback { get; set; }
89+
7690
}
7791

7892
public class BulkAllRequest<T> : IBulkAllRequest<T>
@@ -108,9 +122,12 @@ public class BulkAllRequest<T> : IBulkAllRequest<T>
108122
public Action<BulkDescriptor, IList<T>> BufferToBulk { get; set; }
109123
/// <inheritdoc />
110124
public ProducerConsumerBackPressure BackPressure { get; set; }
111-
112125
/// <inheritdoc />
113126
public Func<IBulkResponseItem, T, bool> RetryDocumentPredicate { get; set; }
127+
/// <inheritdoc />
128+
public bool ContinueAfterDroppedDocuments { get; set; }
129+
/// <inheritdoc />
130+
public Action<IBulkResponseItem, T> DroppedDocumentCallback { get; set; }
114131

115132
public BulkAllRequest(IEnumerable<T> documents)
116133
{
@@ -141,6 +158,8 @@ public class BulkAllDescriptor<T> : DescriptorBase<BulkAllDescriptor<T>, IBulkAl
141158
Action<BulkDescriptor, IList<T>> IBulkAllRequest<T>.BufferToBulk { get; set; }
142159
ProducerConsumerBackPressure IBulkAllRequest<T>.BackPressure { get; set; }
143160
Func<IBulkResponseItem, T, bool> IBulkAllRequest<T>.RetryDocumentPredicate { get; set; }
161+
bool IBulkAllRequest<T>.ContinueAfterDroppedDocuments { get; set; }
162+
Action<IBulkResponseItem, T> IBulkAllRequest<T>.DroppedDocumentCallback { get; set; }
144163

145164
public BulkAllDescriptor(IEnumerable<T> documents)
146165
{
@@ -209,5 +228,16 @@ public BulkAllDescriptor<T> RetryDocumentPredicate(Func<IBulkResponseItem, T, bo
209228
public BulkAllDescriptor<T> BackPressure(int maxConcurrency, int? backPressureFactor = null) =>
210229
Assign(a => a.BackPressure = new ProducerConsumerBackPressure(backPressureFactor, maxConcurrency));
211230

231+
/// <inheritdoc cref="IBulkAllRequest{T}.ContinueAfterDroppedDocuments" />
232+
public BulkAllDescriptor<T> ContinueAfterDroppedDocuments(bool proceed = true) => Assign(p => p.ContinueAfterDroppedDocuments = proceed);
233+
234+
/// <summary>
235+
/// If <see cref="ContinueAfterDroppedDocuments"/> is set to false (not the default) dropped messages will be fed through
236+
/// this callback. Use this if you don't expect many failures and want to feed these dropped messages in a dead letter queue
237+
/// for instance.
238+
/// </summary>
239+
public BulkAllDescriptor<T> DroppedDocumentCallback(Action<IBulkResponseItem, T> callback) =>
240+
Assign(p => p.DroppedDocumentCallback = callback);
241+
212242
}
213243
}

src/Nest/Document/Multiple/BulkAll/BulkAllResponse.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using Newtonsoft.Json;
34

45
namespace Nest
@@ -13,6 +14,7 @@ public interface IBulkAllResponse
1314

1415
/// <summary>The number of back off retries were needed to store this document.</summary>
1516
int Retries { get; }
17+
1618
}
1719

1820
/// <inheritdoc />

src/Tests/Tests/ClientConcepts/ConnectionPooling/MaxRetries/RespectsMaxRetry.doc.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ public async Task RetriesAreLimitedByNodesInPool()
157157
new ClientCall {
158158
{ BadResponse, 9200 },
159159
{ BadResponse, 9201 },
160-
{ MaxRetriesReached }
160+
{ MaxRetriesReached },
161+
{ FailedOverAllNodes }
161162
}
162163
);
163164
}

src/Tests/Tests/ClientConcepts/ConnectionPooling/RoundRobin/SkipDeadNodes.doc.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ await audit.TraceCalls(
158158
{ BadResponse, 9202},
159159
{ BadResponse, 9203},
160160
{ MaxRetriesReached },
161+
{ FailedOverAllNodes },
161162
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
162163
},
163164
new ClientCall {

src/Tests/Tests/ClientConcepts/ConnectionPooling/Sticky/SkipDeadNodes.doc.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ await audit.TraceCalls(
136136
{ BadResponse, 9202},
137137
{ BadResponse, 9203},
138138
{ MaxRetriesReached },
139+
{ FailedOverAllNodes },
139140
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
140141
},
141142
/** After all our registered nodes are marked dead we want to sample a single dead node

src/Tests/Tests/ClientConcepts/ConnectionPooling/Sticky/StickySniffingConnectionPool.doc.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ await audit.TraceCalls(
167167
{ BadResponse, 9202},
168168
{ BadResponse, 9203},
169169
{ MaxRetriesReached },
170+
{ FailedOverAllNodes },
170171
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
171172
},
172173
/** After all our registered nodes are marked dead we want to sample a single dead node

0 commit comments

Comments
 (0)