|
25 | 25 | import com.mongodb.internal.binding.ReadBinding;
|
26 | 26 | import com.mongodb.internal.client.model.AggregationLevel;
|
27 | 27 | import com.mongodb.internal.connection.QueryResult;
|
28 |
| -import com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; |
29 |
| -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; |
30 |
| -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; |
31 | 28 | import com.mongodb.internal.session.SessionContext;
|
32 | 29 | import com.mongodb.lang.Nullable;
|
33 | 30 | import org.bson.BsonArray;
|
|
43 | 40 | import java.util.List;
|
44 | 41 | import java.util.concurrent.TimeUnit;
|
45 | 42 |
|
| 43 | +import static com.mongodb.assertions.Assertions.assertNotNull; |
46 | 44 | import static com.mongodb.assertions.Assertions.isTrueArgument;
|
47 | 45 | import static com.mongodb.assertions.Assertions.notNull;
|
48 | 46 | import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
|
49 |
| -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead; |
50 |
| -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync; |
| 47 | +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; |
| 48 | +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; |
| 49 | +import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; |
51 | 50 | import static com.mongodb.internal.operation.OperationHelper.LOGGER;
|
52 | 51 | import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
|
53 | 52 | import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
|
| 53 | +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; |
| 54 | +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; |
54 | 55 |
|
55 | 56 | class AggregateOperationImpl<T> implements AsyncReadOperation<AsyncBatchCursor<T>>, ReadOperation<BatchCursor<T>> {
|
56 | 57 | private static final String RESULT = "result";
|
@@ -196,8 +197,9 @@ public BatchCursor<T> execute(final ReadBinding binding) {
|
196 | 197 | @Override
|
197 | 198 | public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback<AsyncBatchCursor<T>> callback) {
|
198 | 199 | SingleResultCallback<AsyncBatchCursor<T>> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
|
199 |
| - executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()), |
200 |
| - CommandResultDocumentCodec.create(decoder, FIELD_NAMES_WITH_RESULT), asyncTransformer(), retryReads, errHandlingCallback); |
| 200 | + executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()), |
| 201 | + CommandResultDocumentCodec.create(this.decoder, FIELD_NAMES_WITH_RESULT), asyncTransformer(), retryReads, |
| 202 | + errHandlingCallback); |
201 | 203 | }
|
202 | 204 |
|
203 | 205 | private CommandCreator getCommandCreator(final SessionContext sessionContext) {
|
@@ -238,10 +240,11 @@ BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVe
|
238 | 240 | }
|
239 | 241 |
|
240 | 242 | private QueryResult<T> createQueryResult(final BsonDocument result, final ConnectionDescription description) {
|
| 243 | + assertNotNull(result); |
241 | 244 | return cursorDocumentToQueryResult(result.getDocument(CURSOR), description.getServerAddress());
|
242 | 245 | }
|
243 | 246 |
|
244 |
| - private CommandReadTransformer<BsonDocument, AggregateResponseBatchCursor<T>> transformer() { |
| 247 | + private CommandReadTransformer<BsonDocument, QueryBatchCursor<T>> transformer() { |
245 | 248 | return (result, source, connection) -> {
|
246 | 249 | QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
|
247 | 250 | return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment,
|
|
0 commit comments