2020import com .mongodb .MongoClientException ;
2121import com .mongodb .MongoClientSettings ;
2222import com .mongodb .MongoException ;
23+ import com .mongodb .ServerAddress ;
24+ import com .mongodb .assertions .Assertions ;
25+ import com .mongodb .event .CommandListener ;
26+ import com .mongodb .event .CommandSucceededEvent ;
2327import com .mongodb .event .ConnectionCheckOutFailedEvent ;
2428import com .mongodb .event .ConnectionCheckedOutEvent ;
2529import com .mongodb .event .ConnectionPoolClearedEvent ;
3438import org .junit .Before ;
3539import org .junit .Test ;
3640
41+ import java .util .concurrent .CompletableFuture ;
3742import java .util .concurrent .ExecutionException ;
3843import java .util .concurrent .ExecutorService ;
3944import java .util .concurrent .Executors ;
4045import java .util .concurrent .Future ;
4146import java .util .concurrent .TimeUnit ;
4247import java .util .concurrent .TimeoutException ;
48+ import java .util .concurrent .atomic .AtomicBoolean ;
49+ import java .util .stream .Collectors ;
50+ import java .util .stream .Stream ;
4351
4452import static com .mongodb .ClusterFixture .getServerStatus ;
4553import static com .mongodb .ClusterFixture .isDiscoverableReplicaSet ;
5563import static java .util .concurrent .TimeUnit .SECONDS ;
5664import static org .junit .Assert .assertEquals ;
5765import static org .junit .Assert .assertTrue ;
66+ import static org .junit .Assert .fail ;
5867import static org .junit .Assume .assumeFalse ;
5968import static org .junit .Assume .assumeTrue ;
6069
@@ -138,7 +147,6 @@ public static <R> void poolClearedExceptionMustBeRetryable(
138147 * As a result, the client has to wait for at least its heartbeat delay until it hears back from a server
139148 * (while it waits for a response, calling `ServerMonitor.connect` has no effect).
140149 * Thus, we want to use small heartbeat delay to reduce delays in the test. */
141- .minHeartbeatFrequency (50 , TimeUnit .MILLISECONDS )
142150 .heartbeatFrequency (50 , TimeUnit .MILLISECONDS ))
143151 .retryReads (true )
144152 .retryWrites (true )
@@ -158,7 +166,7 @@ public static <R> void poolClearedExceptionMustBeRetryable(
158166 .append ("blockTimeMS" , new BsonInt32 (1000 )));
159167 int timeoutSeconds = 5 ;
160168 try (MongoClient client = clientCreator .apply (clientSettings );
161- FailPoint ignored = FailPoint .enable (configureFailPoint , client )) {
169+ FailPoint ignored = FailPoint .enable (configureFailPoint , Fixture . getPrimary () )) {
162170 MongoCollection <Document > collection = client .getDatabase (getDefaultDatabaseName ())
163171 .getCollection ("poolClearedExceptionMustBeRetryable" );
164172 collection .drop ();
@@ -179,6 +187,77 @@ public static <R> void poolClearedExceptionMustBeRetryable(
179187 }
180188 }
181189
190+ /**
191+ * Prose test #3.
192+ */
193+ @ Test
194+ public void originalErrorMustBePropagatedIfNoWritesPerformed () throws InterruptedException {
195+ originalErrorMustBePropagatedIfNoWritesPerformed (MongoClients ::create );
196+ }
197+
198+ @ SuppressWarnings ("try" )
199+ public static void originalErrorMustBePropagatedIfNoWritesPerformed (
200+ final Function <MongoClientSettings , MongoClient > clientCreator ) throws InterruptedException {
201+ assumeTrue (serverVersionAtLeast (6 , 0 ) && isDiscoverableReplicaSet ());
202+ ServerAddress primaryServerAddress = Fixture .getPrimary ();
203+ CompletableFuture <FailPoint > futureFailPointFromListener = new CompletableFuture <>();
204+ CommandListener commandListener = new CommandListener () {
205+ private final AtomicBoolean configureFailPoint = new AtomicBoolean (true );
206+
207+ @ Override
208+ public void commandSucceeded (final CommandSucceededEvent event ) {
209+ if (event .getCommandName ().equals ("insert" )
210+ && event .getResponse ().getDocument ("writeConcernError" , new BsonDocument ())
211+ .getInt32 ("code" , new BsonInt32 (-1 )).intValue () == 91
212+ && configureFailPoint .compareAndSet (true , false )) {
213+ Assertions .assertTrue (futureFailPointFromListener .complete (FailPoint .enable (
214+ new BsonDocument ()
215+ .append ("configureFailPoint" , new BsonString ("failCommand" ))
216+ .append ("mode" , new BsonDocument ()
217+ .append ("times" , new BsonInt32 (1 )))
218+ .append ("data" , new BsonDocument ()
219+ .append ("failCommands" , new BsonArray (singletonList (new BsonString ("insert" ))))
220+ .append ("errorCode" , new BsonInt32 (10107 ))
221+ .append ("errorLabels" , new BsonArray (Stream .of ("RetryableWriteError" , "NoWritesPerformed" )
222+ .map (BsonString ::new ).collect (Collectors .toList ())))),
223+ primaryServerAddress
224+ )));
225+ }
226+ }
227+ };
228+ BsonDocument failPointDocument = new BsonDocument ()
229+ .append ("configureFailPoint" , new BsonString ("failCommand" ))
230+ .append ("mode" , new BsonDocument ()
231+ .append ("times" , new BsonInt32 (1 )))
232+ .append ("data" , new BsonDocument ()
233+ .append ("writeConcernError" , new BsonDocument ()
234+ .append ("code" , new BsonInt32 (91 ))
235+ .append ("errorLabels" , new BsonArray (Stream .of ("RetryableWriteError" )
236+ .map (BsonString ::new ).collect (Collectors .toList ()))))
237+ .append ("failCommands" , new BsonArray (singletonList (new BsonString ("insert" )))));
238+ try (MongoClient client = clientCreator .apply (getMongoClientSettingsBuilder ()
239+ .retryWrites (true )
240+ .addCommandListener (commandListener )
241+ .applyToServerSettings (builder ->
242+ // see `poolClearedExceptionMustBeRetryable` for the explanation
243+ builder .heartbeatFrequency (50 , TimeUnit .MILLISECONDS ))
244+ .build ());
245+ FailPoint ignored = FailPoint .enable (failPointDocument , primaryServerAddress )) {
246+ MongoCollection <Document > collection = client .getDatabase (getDefaultDatabaseName ())
247+ .getCollection ("originalErrorMustBePropagatedIfNoWritesPerformed" );
248+ collection .drop ();
249+ try {
250+ collection .insertOne (new Document ());
251+ } catch (MongoException e ) {
252+ assertEquals (e .getCode (), 91 );
253+ return ;
254+ }
255+ fail ("must not reach" );
256+ } finally {
257+ futureFailPointFromListener .thenAccept (FailPoint ::close );
258+ }
259+ }
260+
182261 private boolean canRunTests () {
183262 Document storageEngine = (Document ) getServerStatus ().get ("storageEngine" );
184263
0 commit comments