@@ -35,6 +35,7 @@ private sealed class QueryingEnumerable<T> : IEnumerable<T>, IAsyncEnumerable<T>
35
35
private readonly string _partitionKey ;
36
36
private readonly IDiagnosticsLogger < DbLoggerCategory . Query > _queryLogger ;
37
37
private readonly bool _standAloneStateManager ;
38
+ private readonly bool _concurrencyDetectionEnabled ;
38
39
39
40
public QueryingEnumerable (
40
41
CosmosQueryContext cosmosQueryContext ,
@@ -44,7 +45,8 @@ public QueryingEnumerable(
44
45
Func < CosmosQueryContext , JObject , T > shaper ,
45
46
Type contextType ,
46
47
string partitionKeyFromExtension ,
47
- bool standAloneStateManager )
48
+ bool standAloneStateManager ,
49
+ bool concurrencyDetectionEnabled )
48
50
{
49
51
_cosmosQueryContext = cosmosQueryContext ;
50
52
_sqlExpressionFactory = sqlExpressionFactory ;
@@ -54,6 +56,7 @@ public QueryingEnumerable(
54
56
_contextType = contextType ;
55
57
_queryLogger = cosmosQueryContext . QueryLogger ;
56
58
_standAloneStateManager = standAloneStateManager ;
59
+ _concurrencyDetectionEnabled = concurrencyDetectionEnabled ;
57
60
58
61
var partitionKey = selectExpression . GetPartitionKey ( cosmosQueryContext . ParameterValues ) ;
59
62
if ( partitionKey != null && partitionKeyFromExtension != null && partitionKeyFromExtension != partitionKey )
@@ -113,6 +116,7 @@ private sealed class Enumerator : IEnumerator<T>
113
116
private readonly string _partitionKey ;
114
117
private readonly IDiagnosticsLogger < DbLoggerCategory . Query > _queryLogger ;
115
118
private readonly bool _standAloneStateManager ;
119
+ private readonly IConcurrencyDetector _concurrencyDetector ;
116
120
117
121
private IEnumerator < JObject > _enumerator ;
118
122
@@ -126,6 +130,10 @@ public Enumerator(QueryingEnumerable<T> queryingEnumerable)
126
130
_partitionKey = queryingEnumerable . _partitionKey ;
127
131
_queryLogger = queryingEnumerable . _queryLogger ;
128
132
_standAloneStateManager = queryingEnumerable . _standAloneStateManager ;
133
+
134
+ _concurrencyDetector = queryingEnumerable . _concurrencyDetectionEnabled
135
+ ? _cosmosQueryContext . ConcurrencyDetector
136
+ : null ;
129
137
}
130
138
131
139
public T Current { get ; private set ; }
@@ -135,41 +143,44 @@ object IEnumerator.Current
135
143
136
144
public bool MoveNext ( )
137
145
{
146
+ _concurrencyDetector ? . EnterCriticalSection ( ) ;
147
+
138
148
try
139
149
{
140
- using ( _cosmosQueryContext . ConcurrencyDetector . EnterCriticalSection ( ) )
150
+ if ( _enumerator == null )
141
151
{
142
- if ( _enumerator == null )
143
- {
144
- var sqlQuery = _queryingEnumerable . GenerateQuery ( ) ;
152
+ var sqlQuery = _queryingEnumerable . GenerateQuery ( ) ;
145
153
146
- EntityFrameworkEventSource . Log . QueryExecuting ( ) ;
154
+ EntityFrameworkEventSource . Log . QueryExecuting ( ) ;
147
155
148
- _enumerator = _cosmosQueryContext . CosmosClient
149
- . ExecuteSqlQuery (
150
- _selectExpression . Container ,
151
- _partitionKey ,
152
- sqlQuery )
153
- . GetEnumerator ( ) ;
154
- _cosmosQueryContext . InitializeStateManager ( _standAloneStateManager ) ;
155
- }
156
+ _enumerator = _cosmosQueryContext . CosmosClient
157
+ . ExecuteSqlQuery (
158
+ _selectExpression . Container ,
159
+ _partitionKey ,
160
+ sqlQuery )
161
+ . GetEnumerator ( ) ;
162
+ _cosmosQueryContext . InitializeStateManager ( _standAloneStateManager ) ;
163
+ }
156
164
157
- var hasNext = _enumerator . MoveNext ( ) ;
165
+ var hasNext = _enumerator . MoveNext ( ) ;
158
166
159
- Current
160
- = hasNext
161
- ? _shaper ( _cosmosQueryContext , _enumerator . Current )
162
- : default ;
167
+ Current
168
+ = hasNext
169
+ ? _shaper ( _cosmosQueryContext , _enumerator . Current )
170
+ : default ;
163
171
164
- return hasNext ;
165
- }
172
+ return hasNext ;
166
173
}
167
174
catch ( Exception exception )
168
175
{
169
176
_queryLogger . QueryIterationFailed ( _contextType , exception ) ;
170
177
171
178
throw ;
172
179
}
180
+ finally
181
+ {
182
+ _concurrencyDetector ? . ExitCriticalSection ( ) ;
183
+ }
173
184
}
174
185
175
186
public void Dispose ( )
@@ -193,6 +204,7 @@ private sealed class AsyncEnumerator : IAsyncEnumerator<T>
193
204
private readonly IDiagnosticsLogger < DbLoggerCategory . Query > _queryLogger ;
194
205
private readonly bool _standAloneStateManager ;
195
206
private readonly CancellationToken _cancellationToken ;
207
+ private readonly IConcurrencyDetector _concurrencyDetector ;
196
208
197
209
private IAsyncEnumerator < JObject > _enumerator ;
198
210
@@ -207,47 +219,54 @@ public AsyncEnumerator(QueryingEnumerable<T> queryingEnumerable, CancellationTok
207
219
_queryLogger = queryingEnumerable . _queryLogger ;
208
220
_standAloneStateManager = queryingEnumerable . _standAloneStateManager ;
209
221
_cancellationToken = cancellationToken ;
222
+
223
+ _concurrencyDetector = queryingEnumerable . _concurrencyDetectionEnabled
224
+ ? _cosmosQueryContext . ConcurrencyDetector
225
+ : null ;
210
226
}
211
227
212
228
public T Current { get ; private set ; }
213
229
214
230
public async ValueTask < bool > MoveNextAsync ( )
215
231
{
232
+ _concurrencyDetector ? . EnterCriticalSection ( ) ;
233
+
216
234
try
217
235
{
218
- using ( _cosmosQueryContext . ConcurrencyDetector . EnterCriticalSection ( ) )
236
+ if ( _enumerator == null )
219
237
{
220
- if ( _enumerator == null )
221
- {
222
- var sqlQuery = _queryingEnumerable . GenerateQuery ( ) ;
238
+ var sqlQuery = _queryingEnumerable . GenerateQuery ( ) ;
223
239
224
- EntityFrameworkEventSource . Log . QueryExecuting ( ) ;
240
+ EntityFrameworkEventSource . Log . QueryExecuting ( ) ;
225
241
226
- _enumerator = _cosmosQueryContext . CosmosClient
227
- . ExecuteSqlQueryAsync (
228
- _selectExpression . Container ,
229
- _partitionKey ,
230
- sqlQuery )
231
- . GetAsyncEnumerator ( _cancellationToken ) ;
232
- _cosmosQueryContext . InitializeStateManager ( _standAloneStateManager ) ;
233
- }
242
+ _enumerator = _cosmosQueryContext . CosmosClient
243
+ . ExecuteSqlQueryAsync (
244
+ _selectExpression . Container ,
245
+ _partitionKey ,
246
+ sqlQuery )
247
+ . GetAsyncEnumerator ( _cancellationToken ) ;
248
+ _cosmosQueryContext . InitializeStateManager ( _standAloneStateManager ) ;
249
+ }
234
250
235
- var hasNext = await _enumerator . MoveNextAsync ( ) . ConfigureAwait ( false ) ;
251
+ var hasNext = await _enumerator . MoveNextAsync ( ) . ConfigureAwait ( false ) ;
236
252
237
- Current
238
- = hasNext
239
- ? _shaper ( _cosmosQueryContext , _enumerator . Current )
240
- : default ;
253
+ Current
254
+ = hasNext
255
+ ? _shaper ( _cosmosQueryContext , _enumerator . Current )
256
+ : default ;
241
257
242
- return hasNext ;
243
- }
258
+ return hasNext ;
244
259
}
245
260
catch ( Exception exception )
246
261
{
247
262
_queryLogger . QueryIterationFailed ( _contextType , exception ) ;
248
263
249
264
throw ;
250
265
}
266
+ finally
267
+ {
268
+ _concurrencyDetector ? . ExitCriticalSection ( ) ;
269
+ }
251
270
}
252
271
253
272
public ValueTask DisposeAsync ( )
0 commit comments