From 4f7331dadbabdc820367b7eac4534894d7dfaa6a Mon Sep 17 00:00:00 2001 From: Matt Dale <9760375+matthewdale@users.noreply.github.com> Date: Tue, 11 Jun 2024 18:22:04 -0700 Subject: [PATCH] GODRIVER-3217 Use the manually-specified maxTimeMS on Find and Aggregate if it would be ommitted by CSOT (#1644) --- mongo/integration/csot_test.go | 258 ++++++++++++++++++++++++--------- x/mongo/driver/operation.go | 16 +- 2 files changed, 204 insertions(+), 70 deletions(-) diff --git a/mongo/integration/csot_test.go b/mongo/integration/csot_test.go index 3eb0328616..fb1cc340a2 100644 --- a/mongo/integration/csot_test.go +++ b/mongo/integration/csot_test.go @@ -26,13 +26,12 @@ import ( // Test automatic "maxTimeMS" appending and connection closing behavior when // CSOT is disabled and enabled. -func TestCSOT(t *testing.T) { +func TestCSOT_maxTimeMS(t *testing.T) { mt := mtest.New(t, mtest.NewOptions().CreateClient(false)) testCases := []struct { desc string commandName string - setup func(coll *mongo.Collection) error operation func(ctx context.Context, coll *mongo.Collection) error topologies []mtest.TopologyKind @@ -54,10 +53,6 @@ func TestCSOT(t *testing.T) { { desc: "FindOne", commandName: "find", - setup: func(coll *mongo.Collection) error { - _, err := coll.InsertOne(context.Background(), bson.D{}) - return err - }, operation: func(ctx context.Context, coll *mongo.Collection) error { return coll.FindOne(ctx, bson.D{}).Err() }, @@ -68,10 +63,6 @@ func TestCSOT(t *testing.T) { { desc: "Find", commandName: "find", - setup: func(coll *mongo.Collection) error { - _, err := coll.InsertOne(context.Background(), bson.D{}) - return err - }, operation: func(ctx context.Context, coll *mongo.Collection) error { _, err := coll.Find(ctx, bson.D{}) return err @@ -83,10 +74,6 @@ func TestCSOT(t *testing.T) { { desc: "FindOneAndDelete", commandName: "findAndModify", - setup: func(coll *mongo.Collection) error { - _, err := coll.InsertOne(context.Background(), bson.D{}) - return err - }, operation: func(ctx context.Context, coll *mongo.Collection) error { return coll.FindOneAndDelete(ctx, bson.D{}).Err() }, @@ -97,10 +84,6 @@ func TestCSOT(t *testing.T) { { desc: "FindOneAndUpdate", commandName: "findAndModify", - setup: func(coll *mongo.Collection) error { - _, err := coll.InsertOne(context.Background(), bson.D{}) - return err - }, operation: func(ctx context.Context, coll *mongo.Collection) error { return coll.FindOneAndUpdate(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}}).Err() }, @@ -111,10 +94,6 @@ func TestCSOT(t *testing.T) { { desc: "FindOneAndReplace", commandName: "findAndModify", - setup: func(coll *mongo.Collection) error { - _, err := coll.InsertOne(context.Background(), bson.D{}) - return err - }, operation: func(ctx context.Context, coll *mongo.Collection) error { return coll.FindOneAndReplace(ctx, bson.D{}, bson.D{}).Err() }, @@ -243,10 +222,6 @@ func TestCSOT(t *testing.T) { { desc: "Cursor getMore", commandName: "getMore", - setup: func(coll *mongo.Collection) error { - _, err := coll.InsertMany(context.Background(), []interface{}{bson.D{}, bson.D{}}) - return err - }, operation: func(ctx context.Context, coll *mongo.Collection) error { cursor, err := coll.Find(ctx, bson.D{}, options.Find().SetBatchSize(1)) if err != nil { @@ -261,6 +236,14 @@ func TestCSOT(t *testing.T) { }, } + // insertTwoDocuments inserts two documents in the test collection. + insertTwoDocuments := func(mt *mtest.T) { + mt.Helper() + + _, err := mt.Coll.InsertMany(context.Background(), []interface{}{bson.D{}, bson.D{}}) + require.NoError(mt, err, "InsertMany error") + } + // getStartedEvent returns the first command started event that matches the // specified command name. getStartedEvent := func(mt *mtest.T, command string) *event.CommandStartedEvent { @@ -281,12 +264,13 @@ func TestCSOT(t *testing.T) { return nil } - // assertMaxTimeMSIsSet asserts that "maxTimeMS" is set to a positive value - // on the given command document. - assertMaxTimeMSIsSet := func(mt *mtest.T, command bson.Raw) { + // getMaxTimeMS asserts that "maxTimeMS" is set on the command document for + // the given command name and returns the value. + getMaxTimeMS := func(mt *mtest.T, command string) int64 { mt.Helper() - maxTimeVal := command.Lookup("maxTimeMS") + evt := getStartedEvent(mt, command) + maxTimeVal := evt.Command.Lookup("maxTimeMS") require.Greater(mt, len(maxTimeVal.Value), @@ -300,14 +284,18 @@ func TestCSOT(t *testing.T) { maxTimeVal.Int64(), int64(0), "expected maxTimeMS value to be greater than 0") + + return maxTimeVal.Int64() } // assertMaxTimeMSIsSet asserts that "maxTimeMS" is not set on the given // command document. - assertMaxTimeMSNotSet := func(mt *mtest.T, command bson.Raw) { + assertMaxTimeMSNotSet := func(mt *mtest.T, command string) { mt.Helper() - _, err := command.LookupErr("maxTimeMS") + evt := getStartedEvent(mt, command) + + _, err := evt.Command.LookupErr("maxTimeMS") assert.ErrorIs(mt, err, bsoncore.ErrElementNotFound, @@ -318,41 +306,34 @@ func TestCSOT(t *testing.T) { mt.RunOpts(tc.desc, mtest.NewOptions().Topologies(tc.topologies...), func(mt *mtest.T) { mt.Run("maxTimeMS", func(mt *mtest.T) { mt.Run("timeoutMS not set", func(mt *mtest.T) { - if tc.setup != nil { - err := tc.setup(mt.Coll) - require.NoError(mt, err) - } + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) err := tc.operation(context.Background(), mt.Coll) require.NoError(mt, err) - - evt := getStartedEvent(mt, tc.commandName) - assertMaxTimeMSNotSet(mt, evt.Command) + assertMaxTimeMSNotSet(mt, tc.commandName) }) csotOpts := mtest.NewOptions().ClientOptions(options.Client().SetTimeout(10 * time.Second)) mt.RunOpts("timeoutMS and context.Background", csotOpts, func(mt *mtest.T) { - if tc.setup != nil { - err := tc.setup(mt.Coll) - require.NoError(mt, err) - } + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) err := tc.operation(context.Background(), mt.Coll) require.NoError(mt, err) - evt := getStartedEvent(mt, tc.commandName) - if tc.sendsMaxTimeMSWithTimeoutMS { - assertMaxTimeMSIsSet(mt, evt.Command) - } else { - assertMaxTimeMSNotSet(mt, evt.Command) + if !tc.sendsMaxTimeMSWithTimeoutMS { + assertMaxTimeMSNotSet(mt, tc.commandName) + return } + + maxTimeMS := getMaxTimeMS(mt, tc.commandName) + assert.Greater(mt, maxTimeMS, int64(0), "expected maxTimeMS to be greater than 0") }) mt.RunOpts("timeoutMS and Context with deadline", csotOpts, func(mt *mtest.T) { - if tc.setup != nil { - err := tc.setup(mt.Coll) - require.NoError(mt, err) - } + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -360,12 +341,13 @@ func TestCSOT(t *testing.T) { err := tc.operation(ctx, mt.Coll) require.NoError(mt, err) - evt := getStartedEvent(mt, tc.commandName) - if tc.sendsMaxTimeMSWithContextDeadline { - assertMaxTimeMSIsSet(mt, evt.Command) - } else { - assertMaxTimeMSNotSet(mt, evt.Command) + if !tc.sendsMaxTimeMSWithContextDeadline { + assertMaxTimeMSNotSet(mt, tc.commandName) + return } + + maxTimeMS := getMaxTimeMS(mt, tc.commandName) + assert.Greater(mt, maxTimeMS, int64(0), "expected maxTimeMS to be greater than 0") }) }) @@ -375,10 +357,8 @@ func TestCSOT(t *testing.T) { Topologies(mtest.Single, mtest.ReplicaSet). MinServerVersion("4.2") mt.RunOpts("prevents connection closure with timeoutMS", opts, func(mt *mtest.T) { - if tc.setup != nil { - err := tc.setup(mt.Coll) - require.NoError(mt, err) - } + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) mt.SetFailPoint(mtest.FailPoint{ ConfigureFailPoint: "failCommand", @@ -403,7 +383,7 @@ func TestCSOT(t *testing.T) { cancel() if !mongo.IsTimeout(err) { - t.Logf("CSOT-disabled operation %d returned a non-timeout error: %v", i, err) + t.Errorf("CSOT-disabled operation %d returned a non-timeout error: %v", i, err) } } @@ -428,7 +408,7 @@ func TestCSOT(t *testing.T) { cancel() if !mongo.IsTimeout(err) { - t.Logf("CSOT-enabled operation %d returned a non-timeout error: %v", i, err) + t.Errorf("CSOT-enabled operation %d returned a non-timeout error: %v", i, err) } } @@ -441,8 +421,10 @@ func TestCSOT(t *testing.T) { }) } - csotOpts := mtest.NewOptions().ClientOptions(options.Client().SetTimeout(10 * time.Second)) - mt.RunOpts("maxTimeMS is omitted for values greater than 2147483647ms", csotOpts, func(mt *mtest.T) { + mt.Run("maxTimeMS is omitted for values greater than 2147483647ms", func(mt *mtest.T) { + // Set a client-level timeoutMS value. + mt.ResetClient(options.Client().SetTimeout(10 * time.Second)) + ctx, cancel := context.WithTimeout(context.Background(), (2147483647+1000)*time.Millisecond) defer cancel() _, err := mt.Coll.InsertOne(ctx, bson.D{}) @@ -455,6 +437,152 @@ func TestCSOT(t *testing.T) { bsoncore.ErrElementNotFound, "expected maxTimeMS BSON value to be missing, but is present") }) + + // Deprecated MaxTime option tests. + mt.Run("Find uses MaxTime option when no other timeouts are set", func(mt *mtest.T) { + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) + + // Set a 5-second MaxTime value. + opts := options.Find().SetMaxTime(5 * time.Second) + + cursor, err := mt.Coll.Find(context.Background(), bson.D{}, opts) + require.NoError(mt, err, "Find error") + err = cursor.Close(context.Background()) + require.NoError(mt, err, "Cursor.Close error") + + // Assert that maxTimeMS is set and that it's equal to the MaxTime + // value. + maxTimeMS := getMaxTimeMS(mt, "find") + assert.Equal(mt, + int64(5_000), + maxTimeMS, + "expected maxTimeMS to be equal to the MaxTime value") + }) + mt.Run("Find ignores MaxTime option when timeoutMS is set", func(mt *mtest.T) { + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) + + // Set a 10-second client-level timeoutMS value . + mt.ResetClient(options.Client().SetTimeout(10 * time.Second)) + + // Set a 5-second MaxTime value. + opts := options.Find().SetMaxTime(5 * time.Second) + + cursor, err := mt.Coll.Find(context.Background(), bson.D{}, opts) + require.NoError(mt, err, "Find error") + err = cursor.Close(context.Background()) + require.NoError(mt, err, "Cursor.Close error") + + // Assert that maxTimeMS is set and that it's greater than the + // MaxTime value. + maxTimeMS := getMaxTimeMS(mt, "find") + assert.Greater(mt, + maxTimeMS, + int64(5_000), + "expected maxTimeMS to be greater than the MaxTime value") + }) + // TODO(GODRIVER-2944): Remove this test once the "timeoutMode" option is + // supported. + mt.Run("Find uses MaxTime option when timeoutMS and Context with deadline are set", func(mt *mtest.T) { + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) + + // Set a 10-second client-level timeoutMS value . + mt.ResetClient(options.Client().SetTimeout(10 * time.Second)) + + // Set a 10-second operation-level Context timeout. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Set a 5-second MaxTime value. + opts := options.Find().SetMaxTime(5 * time.Second) + + cursor, err := mt.Coll.Find(ctx, bson.D{}, opts) + require.NoError(mt, err, "Find error") + err = cursor.Close(context.Background()) + require.NoError(mt, err, "Cursor.Close error") + + // Assert that maxTimeMS is set and that it's equal to the MaxTime + // value. + maxTimeMS := getMaxTimeMS(mt, "find") + assert.Equal(mt, + int64(5_000), + maxTimeMS, + "expected maxTimeMS to be equal to the MaxTime value") + }) + mt.Run("Aggregate uses MaxTime option when no other timeouts are set", func(mt *mtest.T) { + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) + + // Set a 5-second MaxTime value. + opts := options.Aggregate().SetMaxTime(5 * time.Second) + + cursor, err := mt.Coll.Aggregate(context.Background(), bson.D{}, opts) + require.NoError(mt, err, "Aggregate error") + err = cursor.Close(context.Background()) + require.NoError(mt, err, "Cursor.Close error") + + // Assert that maxTimeMS is set and that it's equal to the MaxTime + // value. + maxTimeMS := getMaxTimeMS(mt, "aggregate") + assert.Equal(mt, + int64(5_000), + maxTimeMS, + "expected maxTimeMS to be equal to the MaxTime value") + }) + mt.Run("Aggregate ignores MaxTime option when timeoutMS is set", func(mt *mtest.T) { + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) + + // Set a 10-second client-level timeoutMS value . + mt.ResetClient(options.Client().SetTimeout(10 * time.Second)) + + // Set a 5-second MaxTime value. + opts := options.Aggregate().SetMaxTime(5 * time.Second) + + cursor, err := mt.Coll.Aggregate(context.Background(), bson.D{}, opts) + require.NoError(mt, err, "Aggregate error") + err = cursor.Close(context.Background()) + require.NoError(mt, err, "Cursor.Close error") + + // Assert that maxTimeMS is set and that it's greater than the + // MaxTime value. + maxTimeMS := getMaxTimeMS(mt, "aggregate") + assert.Greater(mt, + maxTimeMS, + int64(5_000), + "expected maxTimeMS to be greater than the MaxTime value") + }) + // TODO(GODRIVER-2944): Remove this test once the "timeoutMode" option is + // supported. + mt.Run("Aggregate uses MaxTime option when timeoutMS and Context with deadline are set", func(mt *mtest.T) { + // Insert some documents so the collection isn't empty. + insertTwoDocuments(mt) + + // Set a 10-second client-level timeoutMS value . + mt.ResetClient(options.Client().SetTimeout(10 * time.Second)) + + // Set a 10-second operation-level Context timeout. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Set a 5-second MaxTime value. + opts := options.Aggregate().SetMaxTime(5 * time.Second) + + cursor, err := mt.Coll.Aggregate(ctx, bson.D{}, opts) + require.NoError(mt, err, "Aggregate error") + err = cursor.Close(context.Background()) + require.NoError(mt, err, "Cursor.Close error") + + // Assert that maxTimeMS is set and that it's equal to the MaxTime + // value. + maxTimeMS := getMaxTimeMS(mt, "aggregate") + assert.Equal(mt, + int64(5_000), + maxTimeMS, + "expected maxTimeMS to be equal to the MaxTime value") + }) } func TestCSOT_errors(t *testing.T) { diff --git a/x/mongo/driver/operation.go b/x/mongo/driver/operation.go index 568622d616..db5367bed5 100644 --- a/x/mongo/driver/operation.go +++ b/x/mongo/driver/operation.go @@ -1574,11 +1574,17 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer) // operation's MaxTimeMS if set. If no MaxTimeMS is set on the operation, and context is // not a Timeout context, calculateMaxTimeMS returns 0. func (op Operation) calculateMaxTimeMS(ctx context.Context, mon RTTMonitor) (uint64, error) { - if csot.IsTimeoutContext(ctx) { - if op.OmitCSOTMaxTimeMS { - return 0, nil - } - + // If CSOT is enabled and we're not omitting the CSOT-calculated maxTimeMS + // value, then calculate maxTimeMS. + // + // This allows commands that do not currently send CSOT-calculated maxTimeMS + // (e.g. Find and Aggregate) to still use a manually-provided maxTimeMS + // value. + // + // TODO(GODRIVER-2944): Remove or refactor this logic when we add the + // "timeoutMode" option, which will allow users to opt-in to the + // CSOT-calculated maxTimeMS values if that's the behavior they want. + if csot.IsTimeoutContext(ctx) && !op.OmitCSOTMaxTimeMS { if deadline, ok := ctx.Deadline(); ok { remainingTimeout := time.Until(deadline) rtt90 := mon.P90()