Skip to content

Commit cc0711c

Browse files
committed
GODRIVER-3172 Read response in the background after an op timeout.
1 parent d41a7cc commit cc0711c

File tree

11 files changed

+243
-26
lines changed

11 files changed

+243
-26
lines changed

internal/csot/csot.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ type timeoutKey struct{}
2121
// TODO default behavior.
2222
func MakeTimeoutContext(ctx context.Context, to time.Duration) (context.Context, context.CancelFunc) {
2323
// Only use the passed in Duration as a timeout on the Context if it
24-
// is non-zero.
24+
// is non-zero and if the Context doesn't already have a timeout.
2525
cancelFunc := func() {}
26-
if to != 0 {
26+
if _, deadlineSet := ctx.Deadline(); to != 0 && !deadlineSet {
2727
ctx, cancelFunc = context.WithTimeout(ctx, to)
2828
}
29+
30+
// Add timeoutKey either way to indicate CSOT is enabled.
2931
return context.WithValue(ctx, timeoutKey{}, true), cancelFunc
3032
}
3133

mongo/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
280280
// If no deadline is set on the passed-in context, cs.client.timeout is set, and context is not already
281281
// a Timeout context, honor cs.client.timeout in new Timeout context for change stream operation execution
282282
// and potential retry.
283-
if _, deadlineSet := ctx.Deadline(); !deadlineSet && cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) {
283+
if cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) {
284284
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *cs.client.timeout)
285285
// Redefine ctx to be the new timeout-derived context.
286286
ctx = newCtx

mongo/collection.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,15 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i
11911191
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/find/.
11921192
func (coll *Collection) Find(ctx context.Context, filter interface{},
11931193
opts ...*options.FindOptions) (cur *Cursor, err error) {
1194+
return coll.find(ctx, nil, filter, opts...)
1195+
}
1196+
1197+
func (coll *Collection) find(
1198+
ctx context.Context,
1199+
timeout *time.Duration,
1200+
filter interface{},
1201+
opts ...*options.FindOptions,
1202+
) (cur *Cursor, err error) {
11941203

11951204
if ctx == nil {
11961205
ctx = context.Background()
@@ -1230,7 +1239,7 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
12301239
CommandMonitor(coll.client.monitor).ServerSelector(selector).
12311240
ClusterClock(coll.client.clock).Database(coll.db.name).Collection(coll.name).
12321241
Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).ServerAPI(coll.client.serverAPI).
1233-
Timeout(coll.client.timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger)
1242+
Timeout(timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger)
12341243

12351244
cursorOpts := coll.client.createBaseCursorOptions()
12361245

@@ -1408,7 +1417,7 @@ func (coll *Collection) FindOne(ctx context.Context, filter interface{},
14081417
// by the server.
14091418
findOpts = append(findOpts, options.Find().SetLimit(-1))
14101419

1411-
cursor, err := coll.Find(ctx, filter, findOpts...)
1420+
cursor, err := coll.find(ctx, coll.client.timeout, filter, findOpts...)
14121421
return &SingleResult{
14131422
ctx: ctx,
14141423
cur: cursor,

mongo/gridfs/bucket.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error {
260260
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
261261
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
262262
// be shared by both delete operations.
263-
if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
263+
if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
264264
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
265265
// Redefine ctx to be the new timeout-derived context.
266266
ctx = newCtx
@@ -387,7 +387,7 @@ func (b *Bucket) DropContext(ctx context.Context) error {
387387
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
388388
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
389389
// be shared by both drop operations.
390-
if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
390+
if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
391391
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
392392
// Redefine ctx to be the new timeout-derived context.
393393
ctx = newCtx

mongo/integration/client_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package integration
88

99
import (
1010
"context"
11+
"errors"
1112
"fmt"
1213
"net"
1314
"os"
@@ -19,6 +20,7 @@ import (
1920
"go.mongodb.org/mongo-driver/bson"
2021
"go.mongodb.org/mongo-driver/bson/bsoncodec"
2122
"go.mongodb.org/mongo-driver/bson/bsonrw"
23+
"go.mongodb.org/mongo-driver/bson/bsontype"
2224
"go.mongodb.org/mongo-driver/bson/primitive"
2325
"go.mongodb.org/mongo-driver/event"
2426
"go.mongodb.org/mongo-driver/internal/assert"
@@ -1006,3 +1008,137 @@ func TestClientStress(t *testing.T) {
10061008
}
10071009
})
10081010
}
1011+
1012+
func TestCSOT(t *testing.T) {
1013+
mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
1014+
1015+
csotOpts := mtest.NewOptions().ClientOptions(options.Client().SetTimeout(10 * time.Second))
1016+
mt.RunOpts("includes maxTimeMS if CSOT timeout is set", csotOpts, func(mt *mtest.T) {
1017+
mt.Run("with context.Background", func(mt *mtest.T) {
1018+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
1019+
require.NoError(mt, err, "InsertOne error")
1020+
1021+
maxTimeVal := mt.GetStartedEvent().Command.Lookup("maxTimeMS")
1022+
1023+
require.True(mt, len(maxTimeVal.Value) > 0, "expected maxTimeMS BSON value to be non-empty")
1024+
require.Equal(mt, maxTimeVal.Type, bsontype.Int64, "expected maxTimeMS value to be type Int64")
1025+
1026+
maxTimeMS := maxTimeVal.Int64()
1027+
assert.True(mt, maxTimeMS > 0, "expected maxTimeMS value to be greater than 0")
1028+
})
1029+
mt.Run("with context.WithTimeout", func(mt *mtest.T) {
1030+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
1031+
defer cancel()
1032+
1033+
_, err := mt.Coll.InsertOne(ctx, bson.D{})
1034+
require.NoError(mt, err, "InsertOne error")
1035+
1036+
maxTimeVal := mt.GetStartedEvent().Command.Lookup("maxTimeMS")
1037+
require.True(mt, len(maxTimeVal.Value) > 0, "expected maxTimeMS BSON value to be non-empty")
1038+
require.Equal(mt, maxTimeVal.Type, bsontype.Int64, "expected maxTimeMS value to be type Int64")
1039+
1040+
maxTimeMS := maxTimeVal.Int64()
1041+
assert.True(mt,
1042+
maxTimeMS > 60_000,
1043+
"expected maxTimeMS value to be greater than 60000, but got %v",
1044+
maxTimeMS)
1045+
})
1046+
})
1047+
1048+
mt.RunOpts("timeout errors wrap context.DeadlineExceeded", csotOpts, func(mt *mtest.T) {
1049+
// Test that a client-side timeout is a context.DeadlineExceeded
1050+
mt.Run("MaxTimeMSExceeded", func(mt *mtest.T) {
1051+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
1052+
require.NoError(mt, err, "InsertOne error")
1053+
1054+
mt.SetFailPoint(mtest.FailPoint{
1055+
ConfigureFailPoint: "failCommand",
1056+
Mode: mtest.FailPointMode{
1057+
Times: 1,
1058+
},
1059+
Data: mtest.FailPointData{
1060+
FailCommands: []string{"find"},
1061+
ErrorCode: 50, // MaxTimeMSExceeded
1062+
},
1063+
})
1064+
1065+
err = mt.Coll.FindOne(context.Background(), bson.D{}).Err()
1066+
1067+
assert.True(mt,
1068+
errors.Is(err, context.DeadlineExceeded),
1069+
"expected error %[1]T(%[1]q) to wrap context.DeadlineExceeded",
1070+
err)
1071+
assert.True(mt,
1072+
mongo.IsTimeout(err),
1073+
"expected error %[1]T(%[1]q) to be a timeout error",
1074+
err)
1075+
})
1076+
// Test that a server-side timeout is a context.DeadlineExceeded
1077+
mt.Run("ErrDeadlineWouldBeExceeded", func(mt *mtest.T) {
1078+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
1079+
require.NoError(mt, err, "InsertOne error")
1080+
1081+
mt.SetFailPoint(mtest.FailPoint{
1082+
ConfigureFailPoint: "failCommand",
1083+
Mode: mtest.FailPointMode{
1084+
Times: 1,
1085+
},
1086+
Data: mtest.FailPointData{
1087+
FailCommands: []string{"find"},
1088+
BlockConnection: true,
1089+
BlockTimeMS: 1000,
1090+
},
1091+
})
1092+
1093+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond)
1094+
defer cancel()
1095+
err = mt.Coll.FindOne(ctx, bson.D{}).Err()
1096+
1097+
assert.True(mt,
1098+
errors.Is(err, driver.ErrDeadlineWouldBeExceeded),
1099+
"expected error %[1]T(%[1]q) to wrap driver.ErrDeadlineWouldBeExceeded",
1100+
err)
1101+
assert.True(mt,
1102+
errors.Is(err, context.DeadlineExceeded),
1103+
"expected error %[1]T(%[1]q) to wrap context.DeadlineExceeded",
1104+
err)
1105+
assert.True(mt,
1106+
mongo.IsTimeout(err),
1107+
"expected error %[1]T(%[1]q) to be a timeout error",
1108+
err)
1109+
})
1110+
mt.Run("context.DeadlineExceeded", func(mt *mtest.T) {
1111+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
1112+
require.NoError(mt, err, "InsertOne error")
1113+
1114+
mt.SetFailPoint(mtest.FailPoint{
1115+
ConfigureFailPoint: "failCommand",
1116+
Mode: mtest.FailPointMode{
1117+
Times: 1,
1118+
},
1119+
Data: mtest.FailPointData{
1120+
FailCommands: []string{"find"},
1121+
BlockConnection: true,
1122+
BlockTimeMS: 1000,
1123+
},
1124+
})
1125+
1126+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1127+
defer cancel()
1128+
err = mt.Coll.FindOne(ctx, bson.D{}).Err()
1129+
1130+
assert.False(mt,
1131+
errors.Is(err, driver.ErrDeadlineWouldBeExceeded),
1132+
"expected error %[1]T(%[1]q) to not wrap driver.ErrDeadlineWouldBeExceeded",
1133+
err)
1134+
assert.True(mt,
1135+
errors.Is(err, context.DeadlineExceeded),
1136+
"expected error %[1]T(%[1]q) to wrap context.DeadlineExceeded",
1137+
err)
1138+
assert.True(mt,
1139+
mongo.IsTimeout(err),
1140+
"expected error %[1]T(%[1]q) to be a timeout error",
1141+
err)
1142+
})
1143+
})
1144+
}

x/mongo/driver/errors.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ func (e Error) NamespaceNotFound() bool {
377377

378378
// ExtractErrorFromServerResponse extracts an error from a server response bsoncore.Document
379379
// if there is one. Also used in testing for SDAM.
380-
func ExtractErrorFromServerResponse(doc bsoncore.Document) error {
380+
func ExtractErrorFromServerResponse(doc bsoncore.Document, isCSOT bool) error {
381381
var errmsg, codeName string
382382
var code int32
383383
var labels []string
@@ -514,14 +514,20 @@ func ExtractErrorFromServerResponse(doc bsoncore.Document) error {
514514
errmsg = "command failed"
515515
}
516516

517-
return Error{
517+
err := Error{
518518
Code: code,
519519
Message: errmsg,
520520
Name: codeName,
521521
Labels: labels,
522522
TopologyVersion: tv,
523523
Raw: doc,
524524
}
525+
526+
// TODO: Comment.
527+
if isCSOT && err.Code == 50 {
528+
err.Wrapped = context.DeadlineExceeded
529+
}
530+
return err
525531
}
526532

527533
if len(wcError.WriteErrors) > 0 || wcError.WriteConcernError != nil {

x/mongo/driver/operation.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -499,9 +499,9 @@ func (op Operation) Execute(ctx context.Context) error {
499499
return err
500500
}
501501

502-
// If no deadline is set on the passed-in context, op.Timeout is set, and context is not already
503-
// a Timeout context, honor op.Timeout in new Timeout context for operation execution.
504-
if _, deadlineSet := ctx.Deadline(); !deadlineSet && op.Timeout != nil && !csot.IsTimeoutContext(ctx) {
502+
// If op.Timeout is set, and context is not already a Timeout context, honor
503+
// op.Timeout in new Timeout context for operation execution.
504+
if op.Timeout != nil && !csot.IsTimeoutContext(ctx) {
505505
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *op.Timeout)
506506
// Redefine ctx to be the new timeout-derived context.
507507
ctx = newCtx
@@ -683,8 +683,7 @@ func (op Operation) Execute(ctx context.Context) error {
683683
first = false
684684
}
685685

686-
// Calculate maxTimeMS value to potentially be appended to the wire message.
687-
maxTimeMS, err := op.calculateMaxTimeMS(ctx, srvr.RTTMonitor().P90(), srvr.RTTMonitor().Stats())
686+
maxTimeMS, err := op.calculateMaxTimeMS(ctx, srvr.RTTMonitor().P90())
688687
if err != nil {
689688
return err
690689
}
@@ -1089,7 +1088,7 @@ func (op Operation) readWireMessage(ctx context.Context, conn Connection) (resul
10891088
}
10901089

10911090
// decode
1092-
res, err := op.decodeResult(opcode, rem)
1091+
res, err := op.decodeResult(opcode, rem, csot.IsTimeoutContext(ctx))
10931092
// Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating
10941093
// everything.
10951094
op.updateClusterTimes(res)
@@ -1562,7 +1561,7 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
15621561
// if the ctx is a Timeout context. If the context is not a Timeout context, it uses the
15631562
// operation's MaxTimeMS if set. If no MaxTimeMS is set on the operation, and context is
15641563
// not a Timeout context, calculateMaxTimeMS returns 0.
1565-
func (op Operation) calculateMaxTimeMS(ctx context.Context, rtt90 time.Duration, rttStats string) (uint64, error) {
1564+
func (op Operation) calculateMaxTimeMS(ctx context.Context, rtt90 time.Duration) (uint64, error) {
15661565
if csot.IsTimeoutContext(ctx) {
15671566
if deadline, ok := ctx.Deadline(); ok {
15681567
remainingTimeout := time.Until(deadline)
@@ -1573,11 +1572,13 @@ func (op Operation) calculateMaxTimeMS(ctx context.Context, rtt90 time.Duration,
15731572
maxTimeMS := int64((maxTime + (time.Millisecond - 1)) / time.Millisecond)
15741573
if maxTimeMS <= 0 {
15751574
return 0, fmt.Errorf(
1576-
"remaining time %v until context deadline is less than or equal to 90th percentile RTT: %w\n%v",
1575+
"maxTimeMS calculated by context deadline is negative "+
1576+
"(remaining time: %v, 90th percentile RTT %v): %w",
15771577
remainingTimeout,
1578-
ErrDeadlineWouldBeExceeded,
1579-
rttStats)
1578+
rtt90,
1579+
ErrDeadlineWouldBeExceeded)
15801580
}
1581+
15811582
return uint64(maxTimeMS), nil
15821583
}
15831584
} else if op.MaxTime != nil {
@@ -1827,7 +1828,7 @@ func (Operation) decodeOpReply(wm []byte) opReply {
18271828
return reply
18281829
}
18291830

1830-
func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore.Document, error) {
1831+
func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte, isCSOT bool) (bsoncore.Document, error) {
18311832
switch opcode {
18321833
case wiremessage.OpReply:
18331834
reply := op.decodeOpReply(wm)
@@ -1845,7 +1846,7 @@ func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore
18451846
return nil, NewCommandResponseError("malformed OP_REPLY: invalid document", err)
18461847
}
18471848

1848-
return rdr, ExtractErrorFromServerResponse(rdr)
1849+
return rdr, ExtractErrorFromServerResponse(rdr, isCSOT)
18491850
case wiremessage.OpMsg:
18501851
_, wm, ok := wiremessage.ReadMsgFlags(wm)
18511852
if !ok {
@@ -1882,7 +1883,7 @@ func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore
18821883
return nil, NewCommandResponseError("malformed OP_MSG: invalid document", err)
18831884
}
18841885

1885-
return res, ExtractErrorFromServerResponse(res)
1886+
return res, ExtractErrorFromServerResponse(res, isCSOT)
18861887
default:
18871888
return nil, fmt.Errorf("cannot decode result from %s", opcode)
18881889
}

x/mongo/driver/operation_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func TestOperation(t *testing.T) {
328328
t.Run(tc.name, func(t *testing.T) {
329329
t.Parallel()
330330

331-
got, err := tc.op.calculateMaxTimeMS(tc.ctx, tc.rtt90, "")
331+
got, err := tc.op.calculateMaxTimeMS(tc.ctx, tc.rtt90)
332332

333333
// Assert that the calculated maxTimeMS is less than or equal to the expected value. A few
334334
// milliseconds will have elapsed toward the context deadline, and (remainingTimeout

x/mongo/driver/topology/connection.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"sync/atomic"
1919
"time"
2020

21+
"go.mongodb.org/mongo-driver/internal/csot"
2122
"go.mongodb.org/mongo-driver/mongo/address"
2223
"go.mongodb.org/mongo-driver/mongo/description"
2324
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
@@ -77,6 +78,8 @@ type connection struct {
7778
// TODO(GODRIVER-2824): change driverConnectionID type to int64.
7879
driverConnectionID uint64
7980
generation uint64
81+
82+
awaitingResponse bool
8083
}
8184

8285
// newConnection handles the creation of a connection. It does not connect the connection.
@@ -414,8 +417,15 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) {
414417

415418
dst, errMsg, err := c.read(ctx)
416419
if err != nil {
417-
// We closeConnection the connection because we don't know if there are other bytes left to read.
418-
c.close()
420+
if csot.IsTimeoutContext(ctx) {
421+
// If CSOT is enabled, use the background-read behavior instead of
422+
// closing the connection.
423+
c.awaitingResponse = true
424+
} else {
425+
// We closeConnection the connection because we don't know if there
426+
// are other bytes left to read.
427+
c.close()
428+
}
419429
message := errMsg
420430
if errors.Is(err, io.EOF) {
421431
message = "socket was unexpectedly closed"

0 commit comments

Comments
 (0)