30
30
import com .mongodb .bulk .WriteConcernError ;
31
31
import com .mongodb .client .cursor .TimeoutMode ;
32
32
import com .mongodb .client .model .bulk .ClientBulkWriteOptions ;
33
- import com .mongodb .client .model .bulk .ClientNamespacedReplaceOneModel ;
34
- import com .mongodb .client .model .bulk .ClientNamespacedUpdateOneModel ;
35
- import com .mongodb .client .model .bulk .ClientNamespacedWriteModel ;
36
33
import com .mongodb .client .model .bulk .ClientBulkWriteResult ;
37
34
import com .mongodb .client .model .bulk .ClientDeleteResult ;
38
35
import com .mongodb .client .model .bulk .ClientInsertOneResult ;
36
+ import com .mongodb .client .model .bulk .ClientNamespacedReplaceOneModel ;
37
+ import com .mongodb .client .model .bulk .ClientNamespacedUpdateOneModel ;
38
+ import com .mongodb .client .model .bulk .ClientNamespacedWriteModel ;
39
39
import com .mongodb .client .model .bulk .ClientUpdateResult ;
40
40
import com .mongodb .connection .ConnectionDescription ;
41
41
import com .mongodb .internal .TimeoutContext ;
45
45
import com .mongodb .internal .client .model .bulk .AbstractClientDeleteModel ;
46
46
import com .mongodb .internal .client .model .bulk .AbstractClientNamespacedWriteModel ;
47
47
import com .mongodb .internal .client .model .bulk .AbstractClientUpdateModel ;
48
+ import com .mongodb .internal .client .model .bulk .AcknowledgedSummaryClientBulkWriteResult ;
49
+ import com .mongodb .internal .client .model .bulk .AcknowledgedVerboseClientBulkWriteResult ;
48
50
import com .mongodb .internal .client .model .bulk .ClientWriteModel ;
49
51
import com .mongodb .internal .client .model .bulk .ConcreteClientBulkWriteOptions ;
50
52
import com .mongodb .internal .client .model .bulk .ConcreteClientDeleteManyModel ;
51
53
import com .mongodb .internal .client .model .bulk .ConcreteClientDeleteOneModel ;
52
54
import com .mongodb .internal .client .model .bulk .ConcreteClientDeleteOptions ;
55
+ import com .mongodb .internal .client .model .bulk .ConcreteClientDeleteResult ;
53
56
import com .mongodb .internal .client .model .bulk .ConcreteClientInsertOneModel ;
57
+ import com .mongodb .internal .client .model .bulk .ConcreteClientInsertOneResult ;
54
58
import com .mongodb .internal .client .model .bulk .ConcreteClientNamespacedDeleteManyModel ;
55
59
import com .mongodb .internal .client .model .bulk .ConcreteClientNamespacedDeleteOneModel ;
56
60
import com .mongodb .internal .client .model .bulk .ConcreteClientNamespacedInsertOneModel ;
62
66
import com .mongodb .internal .client .model .bulk .ConcreteClientUpdateManyModel ;
63
67
import com .mongodb .internal .client .model .bulk .ConcreteClientUpdateOneModel ;
64
68
import com .mongodb .internal .client .model .bulk .ConcreteClientUpdateOptions ;
65
- import com .mongodb .internal .client .model .bulk .AcknowledgedSummaryClientBulkWriteResult ;
66
- import com .mongodb .internal .client .model .bulk .AcknowledgedVerboseClientBulkWriteResult ;
67
- import com .mongodb .internal .client .model .bulk .ConcreteClientDeleteResult ;
68
- import com .mongodb .internal .client .model .bulk .ConcreteClientInsertOneResult ;
69
69
import com .mongodb .internal .client .model .bulk .ConcreteClientUpdateResult ;
70
70
import com .mongodb .internal .client .model .bulk .UnacknowledgedClientBulkWriteResult ;
71
71
import com .mongodb .internal .connection .Connection ;
82
82
import com .mongodb .internal .validator .UpdateFieldNameValidator ;
83
83
import com .mongodb .lang .Nullable ;
84
84
import org .bson .BsonArray ;
85
+ import org .bson .BsonBoolean ;
85
86
import org .bson .BsonDocument ;
86
- import org .bson .BsonDocumentWrapper ;
87
+ import org .bson .BsonInt32 ;
87
88
import org .bson .BsonObjectId ;
88
89
import org .bson .BsonValue ;
89
90
import org .bson .BsonWriter ;
111
112
import static com .mongodb .internal .operation .BulkWriteBatch .logWriteModelDoesNotSupportRetries ;
112
113
import static com .mongodb .internal .operation .ClientBulkWriteOperation .ClientBulkWriteCommand .OpsAndNsInfo .WritersProviderAndLimitsChecker .WriteResult .FAIL_LIMIT_EXCEEDED ;
113
114
import static com .mongodb .internal .operation .ClientBulkWriteOperation .ClientBulkWriteCommand .OpsAndNsInfo .WritersProviderAndLimitsChecker .WriteResult .OK_LIMIT_NOT_REACHED ;
115
+ import static com .mongodb .internal .operation .CommandOperationHelper .commandWriteConcern ;
114
116
import static com .mongodb .internal .operation .CommandOperationHelper .initialRetryState ;
115
117
import static com .mongodb .internal .operation .CommandOperationHelper .shouldAttemptToRetryWriteAndAddRetryableLabel ;
116
118
import static com .mongodb .internal .operation .CommandOperationHelper .transformWriteException ;
117
- import static com .mongodb .internal .operation .CommandOperationHelper .commandWriteConcern ;
118
119
import static com .mongodb .internal .operation .CommandOperationHelper .validateAndGetEffectiveWriteConcern ;
119
120
import static com .mongodb .internal .operation .OperationHelper .isRetryableWrite ;
120
121
import static com .mongodb .internal .operation .SyncOperationHelper .cursorDocumentToBatchCursor ;
@@ -256,7 +257,7 @@ private Integer executeBatch(
256
257
257
258
/**
258
259
* @throws MongoWriteConcernWithResponseException This internal exception must be handled to avoid it being observed by an application.
259
- * It {@linkplain MongoWriteConcernWithResponseException#getResponse() bears} the OK response to the {@code lazilyEncodedCommand },
260
+ * It {@linkplain MongoWriteConcernWithResponseException#getResponse() bears} the OK response to the {@code bulkWriteCommand },
260
261
* which must be
261
262
* {@linkplain ResultAccumulator#onBulkWriteCommandOkResponseWithWriteConcernError(int, MongoWriteConcernWithResponseException, BatchEncoder.EncodedBatchInfo) accumulated}
262
263
* iff this exception is the failed result of retries.
@@ -271,7 +272,7 @@ private ExhaustiveClientBulkWriteCommandOkResponse executeBulkWriteCommandAndExh
271
272
final OperationContext operationContext ) throws MongoWriteConcernWithResponseException {
272
273
BsonDocument bulkWriteCommandOkResponse = connection .command (
273
274
"admin" ,
274
- bulkWriteCommand .getLazilyEncodedCommandDocument (),
275
+ bulkWriteCommand .getCommandDocument (),
275
276
NoOpFieldNameValidator .INSTANCE ,
276
277
null ,
277
278
CommandResultDocumentCodec .create (codecRegistry .get (BsonDocument .class ), CommandBatchCursorHelper .FIRST_BATCH ),
@@ -337,39 +338,19 @@ private ClientBulkWriteCommand createBulkWriteCommand(
337
338
final List <? extends ClientNamespacedWriteModel > unexecutedModels ,
338
339
final BatchEncoder batchEncoder ,
339
340
final Runnable retriesEnabler ) {
340
- BsonDocumentWrapper <?> lazilyEncodedCommandDocument = new BsonDocumentWrapper <>(
341
- BULK_WRITE_COMMAND_NAME ,
342
- new Encoder <String >() {
343
- @ Override
344
- public void encode (final BsonWriter writer , final String commandName , final EncoderContext encoderContext ) {
345
- writer .writeStartDocument ();
346
- writer .writeInt32 (commandName , 1 );
347
- writer .writeBoolean ("errorsOnly" , !options .isVerboseResults ());
348
- writer .writeBoolean ("ordered" , options .isOrdered ());
349
- options .isBypassDocumentValidation ().ifPresent (value -> writer .writeBoolean ("bypassDocumentValidation" , value ));
350
- options .getComment ().ifPresent (value -> {
351
- writer .writeName ("comment" );
352
- encodeUsingRegistry (writer , value );
353
- });
354
- options .getLet ().ifPresent (value -> {
355
- writer .writeName ("let" );
356
- encodeUsingRegistry (writer , value );
357
- });
358
- commandWriteConcern (effectiveWriteConcern , sessionContext ).ifPresent (value -> {
359
- writer .writeName ("writeConcern" );
360
- encodeUsingRegistry (writer , value .asDocument ());
361
- });
362
- writer .writeEndDocument ();
363
- }
364
-
365
- @ Override
366
- public Class <String > getEncoderClass () {
367
- throw fail ();
368
- }
369
- }
370
- );
341
+ BsonDocument commandDocument = new BsonDocument (BULK_WRITE_COMMAND_NAME , new BsonInt32 (1 ))
342
+ .append ("errorsOnly" , BsonBoolean .valueOf (!options .isVerboseResults ()))
343
+ .append ("ordered" , BsonBoolean .valueOf (options .isOrdered ()));
344
+ options .isBypassDocumentValidation ().ifPresent (value ->
345
+ commandDocument .append ("bypassDocumentValidation" , BsonBoolean .valueOf (value )));
346
+ options .getComment ().ifPresent (value ->
347
+ commandDocument .append ("comment" , value ));
348
+ options .getLet ().ifPresent (let ->
349
+ commandDocument .append ("let" , let .toBsonDocument (BsonDocument .class , codecRegistry )));
350
+ commandWriteConcern (effectiveWriteConcern , sessionContext ).ifPresent (value ->
351
+ commandDocument .append ("writeConcern" , value .asDocument ()));
371
352
return new ClientBulkWriteCommand (
372
- lazilyEncodedCommandDocument ,
353
+ commandDocument ,
373
354
new ClientBulkWriteCommand .OpsAndNsInfo (
374
355
effectiveRetryWrites , unexecutedModels , batchEncoder , options ,
375
356
() -> {
@@ -666,18 +647,18 @@ void onBulkWriteCommandErrorWithoutResponse(final MongoException exception) {
666
647
}
667
648
668
649
public static final class ClientBulkWriteCommand {
669
- private final BsonDocumentWrapper <?> lazilyEncodedCommandDocument ;
650
+ private final BsonDocument commandDocument ;
670
651
private final OpsAndNsInfo opsAndNsInfo ;
671
652
672
653
ClientBulkWriteCommand (
673
- final BsonDocumentWrapper <?> lazilyEncodedCommandDocument ,
654
+ final BsonDocument commandDocument ,
674
655
final OpsAndNsInfo opsAndNsInfo ) {
675
- this .lazilyEncodedCommandDocument = lazilyEncodedCommandDocument ;
656
+ this .commandDocument = commandDocument ;
676
657
this .opsAndNsInfo = opsAndNsInfo ;
677
658
}
678
659
679
- BsonDocumentWrapper <?> getLazilyEncodedCommandDocument () {
680
- return lazilyEncodedCommandDocument ;
660
+ BsonDocument getCommandDocument () {
661
+ return commandDocument ;
681
662
}
682
663
683
664
OpsAndNsInfo getOpsAndNsInfo () {
0 commit comments