Skip to content

Commit

Permalink
GODRIVER-2339 Use interface for RTTMonitor and add Stats method for e…
Browse files Browse the repository at this point in the history
…rrors. (#1009)

Extends the IsTimeout() helper to recognize the MaxTimeMSExpired error. Modifies the Server interface to contain a new RTTMonitor interface. Adds Stats() method to rttMonitor. Returns RTT monitor stats with ErrDeadlineWouldBeExceeded errors.
  • Loading branch information
benjirewis authored Aug 1, 2022
1 parent a006621 commit 0e8481a
Show file tree
Hide file tree
Showing 15 changed files with 430 additions and 97 deletions.
181 changes: 181 additions & 0 deletions data/client-side-operations-timeout/error-transformations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
{
"description": "MaxTimeMSExpired server errors are transformed into a custom timeout error",
"schemaVersion": "1.9",
"runOnRequirements": [
{
"minServerVersion": "4.0",
"topologies": [
"replicaset"
]
},
{
"minServerVersion": "4.2",
"topologies": [
"replicaset",
"sharded"
]
}
],
"createEntities": [
{
"client": {
"id": "failPointClient",
"useMultipleMongoses": false
}
},
{
"client": {
"id": "client",
"uriOptions": {
"timeoutMS": 50
},
"useMultipleMongoses": false,
"observeEvents": [
"commandStartedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "test"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "coll"
}
}
],
"initialData": [
{
"collectionName": "coll",
"databaseName": "test",
"documents": []
}
],
"tests": [
{
"description": "basic MaxTimeMSExpired error is transformed",
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "failPointClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"insert"
],
"errorCode": 50
}
}
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 1
}
},
"expectError": {
"isTimeoutError": true
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"commandName": "insert",
"databaseName": "test",
"command": {
"insert": "coll",
"maxTimeMS": {
"$$type": [
"int",
"long"
]
}
}
}
}
]
}
]
},
{
"description": "write concern error MaxTimeMSExpired is transformed",
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "failPointClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"insert"
],
"writeConcernError": {
"code": 50,
"errmsg": "maxTimeMS expired"
}
}
}
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 1
}
},
"expectError": {
"isTimeoutError": true
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"commandName": "insert",
"databaseName": "test",
"command": {
"insert": "coll",
"maxTimeMS": {
"$$type": [
"int",
"long"
]
}
}
}
}
]
}
]
}
]
}
96 changes: 96 additions & 0 deletions data/client-side-operations-timeout/error-transformations.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
description: "MaxTimeMSExpired server errors are transformed into a custom timeout error"

schemaVersion: "1.9"

# failCommand is available on 4.0 for replica sets and 4.2 for sharded clusters.
runOnRequirements:
- minServerVersion: "4.0"
topologies: ["replicaset"]
- minServerVersion: "4.2"
topologies: ["replicaset", "sharded"]

createEntities:
- client:
id: &failPointClient failPointClient
useMultipleMongoses: false
- client:
id: &client client
uriOptions:
timeoutMS: 50
useMultipleMongoses: false
observeEvents:
- commandStartedEvent
- database:
id: &database database
client: *client
databaseName: &databaseName test
- collection:
id: &collection collection
database: *database
collectionName: &collectionName coll

initialData:
- collectionName: *collectionName
databaseName: *databaseName
documents: []

tests:
# A server response like {ok: 0, code: 50, ...} is transformed.
- description: "basic MaxTimeMSExpired error is transformed"
operations:
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 1 }
data:
failCommands: ["insert"]
errorCode: 50
- name: insertOne
object: *collection
arguments:
document: { _id: 1 }
expectError:
isTimeoutError: true
expectEvents:
- client: *client
events:
- commandStartedEvent:
commandName: insert
databaseName: *databaseName
command:
insert: *collectionName
maxTimeMS: { $$type: ["int", "long"] }

# A server response like {ok: 1, writeConcernError: {code: 50, ...}} is transformed.
- description: "write concern error MaxTimeMSExpired is transformed"
operations:
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 1 }
data:
failCommands: ["insert"]
writeConcernError:
code: 50
errmsg: "maxTimeMS expired"
- name: insertOne
object: *collection
arguments:
document: { _id: 1 }
expectError:
isTimeoutError: true
expectEvents:
- client: *client
events:
- commandStartedEvent:
commandName: insert
databaseName: *databaseName
command:
insert: *collectionName
maxTimeMS: { $$type: ["int", "long"] }
24 changes: 24 additions & 0 deletions internal/csot_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,27 @@ func MakeTimeoutContext(ctx context.Context, to time.Duration) (context.Context,
func IsTimeoutContext(ctx context.Context) bool {
return ctx.Value(timeoutKey{}) != nil
}

// ZeroRTTMonitor implements the RTTMonitor interface and is used internally for testing. It returns 0 for all
// RTT calculations and an empty string for RTT statistics.
type ZeroRTTMonitor struct{}

// EWMA implements the RTT monitor interface.
func (zrm *ZeroRTTMonitor) EWMA() time.Duration {
return 0
}

// Min implements the RTT monitor interface.
func (zrm *ZeroRTTMonitor) Min() time.Duration {
return 0
}

// P90 implements the RTT monitor interface.
func (zrm *ZeroRTTMonitor) P90() time.Duration {
return 0
}

// Stats implements the RTT monitor interface.
func (zrm *ZeroRTTMonitor) Stats() string {
return ""
}
9 changes: 2 additions & 7 deletions mongo/change_stream_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package mongo

import (
"context"
"time"

"go.mongodb.org/mongo-driver/mongo/description"
"go.mongodb.org/mongo-driver/x/mongo/driver"
Expand Down Expand Up @@ -36,12 +35,8 @@ func (c *changeStreamDeployment) Connection(context.Context) (driver.Connection,
return c.conn, nil
}

func (c *changeStreamDeployment) MinRTT() time.Duration {
return c.server.MinRTT()
}

func (c *changeStreamDeployment) RTT90() time.Duration {
return c.server.RTT90()
func (c *changeStreamDeployment) RTTMonitor() driver.RTTMonitor {
return c.server.RTTMonitor()
}

func (c *changeStreamDeployment) ProcessError(err error, conn driver.Connection) driver.ProcessErrorResult {
Expand Down
12 changes: 12 additions & 0 deletions mongo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func IsTimeout(err error) bool {
if err == driver.ErrDeadlineWouldBeExceeded {
return true
}
if ce, ok := err.(CommandError); ok && ce.IsMaxTimeMSExpiredError() {
return true
}
if we, ok := err.(WriteException); ok && we.WriteConcernError != nil &&
we.WriteConcernError.IsMaxTimeMSExpiredError() {
return true
}
if ne, ok := err.(net.Error); ok {
return ne.Timeout()
}
Expand Down Expand Up @@ -365,6 +372,11 @@ func (wce WriteConcernError) Error() string {
return wce.Message
}

// IsMaxTimeMSExpiredError returns true if the error is a MaxTimeMSExpired error.
func (wce WriteConcernError) IsMaxTimeMSExpiredError() bool {
return wce.Code == 50
}

// WriteException is the error type returned by the InsertOne, DeleteOne, DeleteMany, UpdateOne, UpdateMany, and
// ReplaceOne operations.
type WriteException struct {
Expand Down
8 changes: 4 additions & 4 deletions mongo/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func TestClient(t *testing.T) {
for _, desc := range topo.Description().Servers {
server, err := topo.FindServer(desc)
assert.Nil(mt, err, "FindServer error: %v", err)
if server.MinRTT() <= 250*time.Millisecond {
if server.RTTMonitor().Min() <= 250*time.Millisecond {
done = false
}
}
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestClient(t *testing.T) {
for _, desc := range topo.Description().Servers {
server, err := topo.FindServer(desc)
assert.Nil(mt, err, "FindServer error: %v", err)
if server.MinRTT() <= 250*time.Millisecond {
if server.RTTMonitor().Min() <= 250*time.Millisecond {
done = false
}
}
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestClient(t *testing.T) {
for _, desc := range topo.Description().Servers {
server, err := topo.FindServer(desc)
assert.Nil(mt, err, "FindServer error: %v", err)
if server.RTT90() <= 300*time.Millisecond {
if server.RTTMonitor().P90() <= 300*time.Millisecond {
done = false
}
}
Expand Down Expand Up @@ -691,7 +691,7 @@ func TestClient(t *testing.T) {
for _, desc := range topo.Description().Servers {
server, err := topo.FindServer(desc)
assert.Nil(mt, err, "FindServer error: %v", err)
if server.RTT90() <= 275*time.Millisecond {
if server.RTTMonitor().P90() <= 275*time.Millisecond {
done = false
}
}
Expand Down
Loading

0 comments on commit 0e8481a

Please sign in to comment.