1
1
using System ;
2
2
using System . Collections . Generic ;
3
+ using System . Diagnostics ;
3
4
using System . Linq ;
4
5
using System . Threading ;
5
6
using System . Threading . Tasks ;
@@ -23,17 +24,22 @@ public class Producer : IMetadataQueries
23
24
24
25
private readonly CancellationTokenSource _stopToken = new CancellationTokenSource ( ) ;
25
26
private readonly int _maximumAsyncRequests ;
26
- private readonly int _maximumMessageBuffer ;
27
27
private readonly AsyncCollection < TopicMessage > _asyncCollection ;
28
28
private readonly SemaphoreSlim _semaphoreMaximumAsync ;
29
- private readonly SemaphoreSlim _boundedCapacitySemaphore ;
30
29
private readonly IMetadataQueries _metadataQueries ;
31
30
private readonly Task _postTask ;
32
31
32
+ private int _inFlightMessageCount = 0 ;
33
+
33
34
/// <summary>
34
35
/// Get the number of messages sitting in the buffer waiting to be sent.
35
36
/// </summary>
36
- public int BufferCount { get { return _maximumMessageBuffer - _boundedCapacitySemaphore . CurrentCount ; } }
37
+ public int BufferCount { get { return _asyncCollection . Count ; } }
38
+
39
+ /// <summary>
40
+ /// Get the number of messages staged for Async upload.
41
+ /// </summary>
42
+ public int InFlightMessageCount { get { return _inFlightMessageCount ; } }
37
43
38
44
/// <summary>
39
45
/// Get the number of active async threads sending messages.
@@ -78,11 +84,9 @@ public Producer(IBrokerRouter brokerRouter, int maximumAsyncRequests = MaximumAs
78
84
{
79
85
BrokerRouter = brokerRouter ;
80
86
_maximumAsyncRequests = maximumAsyncRequests ;
81
- _maximumMessageBuffer = maximumMessageBuffer ;
82
87
_metadataQueries = new MetadataQueries ( BrokerRouter ) ;
83
88
_asyncCollection = new AsyncCollection < TopicMessage > ( ) ;
84
89
_semaphoreMaximumAsync = new SemaphoreSlim ( maximumAsyncRequests , maximumAsyncRequests ) ;
85
- _boundedCapacitySemaphore = new SemaphoreSlim ( maximumMessageBuffer , maximumMessageBuffer ) ;
86
90
87
91
BatchSize = DefaultBatchSize ;
88
92
BatchDelayTime = TimeSpan . FromMilliseconds ( DefaultBatchDelayMS ) ;
@@ -104,10 +108,11 @@ public Producer(IBrokerRouter brokerRouter, int maximumAsyncRequests = MaximumAs
104
108
/// <param name="timeout">Interal kafka timeout to wait for the requested level of ack to occur before returning. Defaults to 1000ms.</param>
105
109
/// <param name="codec">The codec to apply to the message collection. Defaults to none.</param>
106
110
/// <returns>List of ProduceResponses from each partition sent to or empty list if acks = 0.</returns>
107
- public async Task < List < ProduceResponse > > SendMessageAsync ( string topic , IEnumerable < Message > messages , Int16 acks = 1 ,
111
+ public Task < List < ProduceResponse > > SendMessageAsync ( string topic , IEnumerable < Message > messages , Int16 acks = 1 ,
108
112
TimeSpan ? timeout = null , MessageCodec codec = MessageCodec . CodecNone )
109
113
{
110
- if ( _stopToken . IsCancellationRequested ) throw new ObjectDisposedException ( "Cannot send new documents as producer is disposing." ) ;
114
+ if ( _stopToken . IsCancellationRequested )
115
+ throw new ObjectDisposedException ( "Cannot send new documents as producer is disposing." ) ;
111
116
if ( timeout == null ) timeout = TimeSpan . FromMilliseconds ( DefaultAckTimeoutMS ) ;
112
117
113
118
var batch = messages . Select ( message => new TopicMessage
@@ -119,20 +124,17 @@ public async Task<List<ProduceResponse>> SendMessageAsync(string topic, IEnumera
119
124
Message = message
120
125
} ) . ToList ( ) ;
121
126
122
- foreach ( var item in batch )
123
- {
124
- item . Tcs . Task . ContinueWith ( t => _boundedCapacitySemaphore . Release ( ) , TaskContinuationOptions . ExecuteSynchronously ) ;
125
- _boundedCapacitySemaphore . Wait ( _stopToken . Token ) ;
126
- _asyncCollection . Add ( item ) ;
127
- }
127
+ _asyncCollection . AddRange ( batch ) ;
128
128
129
- var results = new List < ProduceResponse > ( ) ;
130
- foreach ( var topicMessage in batch )
131
- {
132
- results . Add ( await topicMessage . Tcs . Task . ConfigureAwait ( false ) ) ;
133
- }
129
+ return Task . WhenAll ( batch . Select ( x => x . Tcs . Task ) )
130
+ . ContinueWith ( t =>
131
+ {
132
+ t . ThrowOnFault ( ) ;
134
133
135
- return results . Distinct ( ) . ToList ( ) ;
134
+ return batch . Select ( topicMessage => topicMessage . Tcs . Task . Result )
135
+ . Distinct ( )
136
+ . ToList ( ) ;
137
+ } ) ;
136
138
}
137
139
138
140
/// <summary>
@@ -181,6 +183,8 @@ private async Task BatchSendAsync()
181
183
{
182
184
try
183
185
{
186
+ await _asyncCollection . OnHasDataAvailable ( _stopToken . Token ) . ConfigureAwait ( false ) ;
187
+
184
188
batch = await _asyncCollection . TakeAsync ( BatchSize , BatchDelayTime , _stopToken . Token ) . ConfigureAwait ( false ) ;
185
189
}
186
190
catch ( OperationCanceledException ex )
@@ -229,16 +233,18 @@ private async Task BatchSendAsync()
229
233
}
230
234
}
231
235
232
- private async Task ProduceAndSendBatchAsync ( List < TopicMessage > batchs , CancellationToken cancellationToken )
236
+ private async Task ProduceAndSendBatchAsync ( List < TopicMessage > messages , CancellationToken cancellationToken )
233
237
{
238
+ Interlocked . Add ( ref _inFlightMessageCount , messages . Count ) ;
239
+
234
240
//we must send a different produce request for each ack level and timeout combination.
235
- foreach ( var ackLevelBatch in batchs . GroupBy ( batch => new { batch . Acks , batch . Timeout } ) )
241
+ foreach ( var ackLevelBatch in messages . GroupBy ( batch => new { batch . Acks , batch . Timeout } ) )
236
242
{
237
243
var messageByRouter = ackLevelBatch . Select ( batch => new
238
- {
239
- TopicMessage = batch ,
240
- Route = BrokerRouter . SelectBrokerRoute ( batch . Topic , batch . Message . Key ) ,
241
- } )
244
+ {
245
+ TopicMessage = batch ,
246
+ Route = BrokerRouter . SelectBrokerRoute ( batch . Topic , batch . Message . Key ) ,
247
+ } )
242
248
. GroupBy ( x => new { x . Route , x . TopicMessage . Topic , x . TopicMessage . Codec } ) ;
243
249
244
250
var sendTasks = new List < BrokerRouteSendBatch > ( ) ;
@@ -269,8 +275,7 @@ private async Task ProduceAndSendBatchAsync(List<TopicMessage> batchs, Cancellat
269
275
} ;
270
276
271
277
//ensure the async is released as soon as each task is completed
272
- brokerSendTask . Task . ContinueWith ( t => { _semaphoreMaximumAsync . Release ( ) ; } , cancellationToken ,
273
- TaskContinuationOptions . ExecuteSynchronously , TaskScheduler . Default ) ;
278
+ brokerSendTask . Task . ContinueWith ( t => { _semaphoreMaximumAsync . Release ( ) ; } , cancellationToken ) ;
274
279
275
280
sendTasks . Add ( brokerSendTask ) ;
276
281
}
@@ -293,11 +298,17 @@ private async Task ProduceAndSendBatchAsync(List<TopicMessage> batchs, Cancellat
293
298
{
294
299
foreach ( var topicMessageBatch in ackLevelBatch )
295
300
{
296
- topicMessageBatch . Tcs . TrySetException ( new KafkaApplicationException ( "An exception occured while executing a send operation against {0}. Exception:{1}" ,
297
- failedTask . Route , failedTask . Task . Exception ) ) ;
301
+ topicMessageBatch . Tcs . TrySetException (
302
+ new KafkaApplicationException (
303
+ "An exception occured while executing a send operation against {0}. Exception:{1}" ,
304
+ failedTask . Route , failedTask . Task . Exception ) ) ;
298
305
}
299
306
}
300
307
}
308
+ finally
309
+ {
310
+ Interlocked . Add ( ref _inFlightMessageCount , messages . Count * - 1 ) ;
311
+ }
301
312
}
302
313
}
303
314
@@ -312,7 +323,7 @@ public void Dispose()
312
323
using ( _metadataQueries )
313
324
{
314
325
}
315
- }
326
+ }
316
327
#endregion
317
328
}
318
329
0 commit comments