Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ driver-test-data.tar.gz
perf
**mongocryptd.pid
*.test
**.md
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually keep a design.md file around and it's annoying to have to manually ignore it.

27 changes: 16 additions & 11 deletions x/mongo/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ type Expirable interface {
Alive() bool
}

// StreamerConnection represents a Connection that supports streaming wire protocol messages using the moreToCome and
// exhaustAllowed flags.
//
// The SetStreaming and CurrentlyStreaming functions correspond to the moreToCome flag on server responses. If a
// response has moreToCome set, SetStreaming(true) will be called and CurrentlyStreaming() should return true.
//
// CanStream corresponds to the exhaustAllowed flag. The operations layer will set exhaustAllowed on outgoing wire
// messages to inform the server that the driver supports streaming.
type StreamerConnection interface {
Connection
SetStreaming(bool)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to give these functions names that define the semantics of exhaustAllowed/moreToCome. If this is confusing, I'm open to giving them more literal names like SetMoreToCome, IsMoreToCome, and SupportsExhaustAllowed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaning more towards the more literal names, but I don't feel that strongly about it.

CurrentlyStreaming() bool
SupportsStreaming() bool
}

// Compressor is an interface used to compress wire messages. If a Connection supports compression
// it should implement this interface as well. The CompressWireMessage method will be called during
// the execution of an operation if the wire message is allowed to be compressed.
Expand Down Expand Up @@ -127,20 +142,10 @@ func (ssd SingleConnectionDeployment) SupportsRetryWrites() bool { return false
func (ssd SingleConnectionDeployment) Kind() description.TopologyKind { return description.Single }

// Connection implements the Server interface. It always returns the embedded connection.
//
// This method returns a Connection with a no-op Close method. This ensures that a
// SingleConnectionDeployment can be used across multiple operation executions.
func (ssd SingleConnectionDeployment) Connection(context.Context) (Connection, error) {
return nopCloserConnection{ssd.C}, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this wrapping is necessary. There are two instances where a connection should be used for an operation and then should not be closed or returned to a pool: handshakes and heartbeats. Both of these uses already wrap the connection in topology.initConnection, which serves the same purpose as nopCloserConnection, so the extra wrapping adds nothing.

Having this wrapping also hurts us because if initConnection implements Streamer, wrapping it in nopCloserConnection "gets rid of" the Streamer implementation.

return ssd.C, nil
}

// nopCloserConnection is an adapter used in a SingleConnectionDeployment. It passes through all
// functionality expcect for closing, which is a no-op. This is done so the connection can be used
// across multiple operations.
type nopCloserConnection struct{ Connection }

func (ncc nopCloserConnection) Close() error { return nil }

// TODO(GODRIVER-617): We can likely use 1 type for both the Type and the RetryMode by using
// 2 bits for the mode and 1 bit for the type. Although in the practical sense, we might not want to
// do that since the type of retryability is tied to the operation itself and isn't going change,
Expand Down
28 changes: 24 additions & 4 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
if len(scratch) > 0 {
scratch = scratch[:0]
}
wm, startedInfo, err := op.createWireMessage(ctx, scratch, desc)
wm, startedInfo, err := op.createWireMessage(ctx, scratch, desc, conn)
if err != nil {
return err
}
Expand Down Expand Up @@ -575,6 +575,12 @@ func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) (
return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err}
}

return op.readWireMessage(ctx, conn, wm)
}

func (op Operation) readWireMessage(ctx context.Context, conn Connection, wm []byte) ([]byte, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly the same logic that was originally in roundTrip but split off into a separate function so ExecuteExhaust can call it as well. The only difference is that it calls SetStreaming if a streaming connection is given and the response has moreToCome.

var err error

wm, err = conn.ReadWireMessage(ctx, wm[:0])
if err != nil {
labels := []string{NetworkError}
Expand All @@ -590,6 +596,12 @@ func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) (
return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err}
}

// If we're using a streamable connection, we set its streaming state based on the moreToCome flag in the server
// response.
if streamer, ok := conn.(StreamerConnection); ok {
streamer.SetStreaming(wiremessage.IsMsgMoreToCome(wm))
}

// decompress wiremessage
wm, err = op.decompressWireMessage(wm)
if err != nil {
Expand Down Expand Up @@ -675,12 +687,12 @@ func (Operation) decompressWireMessage(wm []byte) ([]byte, error) {
}

func (op Operation) createWireMessage(ctx context.Context, dst []byte,
desc description.SelectedServer) ([]byte, startedInformation, error) {
desc description.SelectedServer, conn Connection) ([]byte, startedInformation, error) {

if desc.WireVersion == nil || desc.WireVersion.Max < wiremessage.OpmsgWireVersion {
return op.createQueryWireMessage(dst, desc)
}
return op.createMsgWireMessage(ctx, dst, desc)
return op.createMsgWireMessage(ctx, dst, desc, conn)
}

func (op Operation) addBatchArray(dst []byte) []byte {
Expand Down Expand Up @@ -758,7 +770,9 @@ func (op Operation) createQueryWireMessage(dst []byte, desc description.Selected
return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil
}

func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc description.SelectedServer) ([]byte, startedInformation, error) {
func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc description.SelectedServer,
conn Connection) ([]byte, startedInformation, error) {

var info startedInformation
var flags wiremessage.MsgFlag
var wmindex int32
Expand All @@ -767,6 +781,12 @@ func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc d
if op.WriteConcern != nil && !writeconcern.AckWrite(op.WriteConcern) && (op.Batches == nil || len(op.Batches.Documents) == 0) {
flags = wiremessage.MoreToCome
}
// Set the ExhaustAllowed flag if the connection supports streaming. This will tell the server that it can
// respond with the MoreToCome flag and then stream responses over this connection.
if streamer, ok := conn.(StreamerConnection); ok && streamer.SupportsStreaming() {
flags |= wiremessage.ExhaustAllowed
}

info.requestID = wiremessage.NextRequestID()
wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpMsg)
dst = wiremessage.AppendMsgFlags(dst, flags)
Expand Down
35 changes: 35 additions & 0 deletions x/mongo/driver/operation_exhaust.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package driver

import (
"context"
"errors"

"go.mongodb.org/mongo-driver/x/mongo/driver/description"
)

// ExecuteExhaust reads a response from the provided StreamerConnection. This will error if the connection's
// CurrentlyStreaming function returns false.
func (op Operation) ExecuteExhaust(ctx context.Context, conn StreamerConnection, scratch []byte) error {
if !conn.CurrentlyStreaming() {
return errors.New("exhaust read must be done with a connection that is currently streaming")
}

scratch = scratch[:0]
res, err := op.readWireMessage(ctx, conn, scratch)
if err != nil {
return err
}
if op.ProcessResponseFn != nil {
if err = op.ProcessResponseFn(res, nil, description.Server{}); err != nil {
return err
}
}

return nil
}
107 changes: 100 additions & 7 deletions x/mongo/driver/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/google/go-cmp/cmp"
"go.mongodb.org/mongo-driver/bson/bsontype"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
Expand Down Expand Up @@ -518,6 +519,93 @@ func TestOperation(t *testing.T) {
})
}
})
t.Run("ExecuteExhaust", func(t *testing.T) {
t.Run("errors if connection is not streaming", func(t *testing.T) {
conn := &mockConnection{
rStreaming: false,
}
err := Operation{}.ExecuteExhaust(context.TODO(), conn, nil)
assert.NotNil(t, err, "expected error, got nil")
})
})
t.Run("exhaustAllowed and moreToCome", func(t *testing.T) {
// Test the interaction between exhaustAllowed and moreToCome on requests/responses when using the Execute
// and ExecuteExhaust methods.

// Create a server response wire message that has moreToCome=false.
serverResponseDoc := bsoncore.BuildDocumentFromElements(nil,
bsoncore.AppendInt32Element(nil, "ok", 1),
)
nonStreamingResponse := createExhaustServerResponse(t, serverResponseDoc, false)

// Create a connection that reports that it cannot stream messages.
conn := &mockConnection{
rDesc: description.Server{
WireVersion: &description.VersionRange{
Max: 6,
},
},
rReadWM: nonStreamingResponse,
rCanStream: false,
}
op := Operation{
CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) {
return bsoncore.AppendInt32Element(dst, "isMaster", 1), nil
},
Database: "admin",
Deployment: SingleConnectionDeployment{conn},
}
err := op.Execute(context.TODO(), nil)
assert.Nil(t, err, "Execute error: %v", err)

// The wire message sent to the server should not have exhaustAllowed=true. After execution, the connection
// should not be in a streaming state.
assertExhaustAllowedSet(t, conn.pWriteWM, false)
assert.False(t, conn.CurrentlyStreaming(), "expected CurrentlyStreaming to be false")

// Modify the connection to report that it can stream and create a new server response with moreToCome=true.
streamingResponse := createExhaustServerResponse(t, serverResponseDoc, true)
conn.rReadWM = streamingResponse
conn.rCanStream = true
err = op.Execute(context.TODO(), nil)
assert.Nil(t, err, "Execute error: %v", err)
assertExhaustAllowedSet(t, conn.pWriteWM, true)
assert.True(t, conn.CurrentlyStreaming(), "expected CurrentlyStreaming to be true")

// Reset the server response and go through ExecuteExhaust to mimic streaming the next response. After
// execution, the connection should still be in a streaming state.
conn.rReadWM = streamingResponse
err = op.ExecuteExhaust(context.TODO(), conn, nil)
assert.Nil(t, err, "ExecuteExhaust error: %v", err)
assert.True(t, conn.CurrentlyStreaming(), "expected CurrentlyStreaming to be true")
})
}

func createExhaustServerResponse(t *testing.T, response bsoncore.Document, moreToCome bool) []byte {
idx, wm := wiremessage.AppendHeaderStart(nil, 0, wiremessage.CurrentRequestID()+1, wiremessage.OpMsg)
var flags wiremessage.MsgFlag
if moreToCome {
flags = wiremessage.MoreToCome
}
wm = wiremessage.AppendMsgFlags(wm, flags)
wm = wiremessage.AppendMsgSectionType(wm, wiremessage.SingleDocument)
wm = bsoncore.AppendDocument(wm, response)
return bsoncore.UpdateLength(wm, idx, int32(len(wm)))
}

func assertExhaustAllowedSet(t *testing.T, wm []byte, expected bool) {
t.Helper()
_, _, _, _, wm, ok := wiremessage.ReadHeader(wm)
if !ok {
t.Fatal("could not read wm header")
}
flags, wm, ok := wiremessage.ReadMsgFlags(wm)
if !ok {
t.Fatal("could not read wm flags")
}

actual := flags&wiremessage.ExhaustAllowed > 0
assert.Equal(t, expected, actual, "expected exhaustAllowed set %v, got %v", expected, actual)
}

type mockDeployment struct {
Expand Down Expand Up @@ -554,19 +642,24 @@ type mockConnection struct {
pReadDst []byte

// returns
rWriteErr error
rReadWM []byte
rReadErr error
rDesc description.Server
rCloseErr error
rID string
rAddr address.Address
rWriteErr error
rReadWM []byte
rReadErr error
rDesc description.Server
rCloseErr error
rID string
rAddr address.Address
rCanStream bool
rStreaming bool
}

func (m *mockConnection) Description() description.Server { return m.rDesc }
func (m *mockConnection) Close() error { return m.rCloseErr }
func (m *mockConnection) ID() string { return m.rID }
func (m *mockConnection) Address() address.Address { return m.rAddr }
func (m *mockConnection) SupportsStreaming() bool { return m.rCanStream }
func (m *mockConnection) CurrentlyStreaming() bool { return m.rStreaming }
func (m *mockConnection) SetStreaming(streaming bool) { m.rStreaming = streaming }

func (m *mockConnection) WriteWireMessage(_ context.Context, wm []byte) error {
m.pWriteWM = wm
Expand Down
12 changes: 12 additions & 0 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type connection struct {
config *connectionConfig
cancelConnectContext context.CancelFunc
connectContextMade chan struct{}
canStream bool
currentlyStreaming bool

// pool related fields
pool *pool
Expand Down Expand Up @@ -339,6 +341,7 @@ func (c *connection) bumpIdleDeadline() {
type initConnection struct{ *connection }

var _ driver.Connection = initConnection{}
var _ driver.StreamerConnection = initConnection{}

func (c initConnection) Description() description.Server {
if c.connection == nil {
Expand All @@ -361,6 +364,15 @@ func (c initConnection) WriteWireMessage(ctx context.Context, wm []byte) error {
func (c initConnection) ReadWireMessage(ctx context.Context, dst []byte) ([]byte, error) {
return c.readWireMessage(ctx, dst)
}
func (c initConnection) SetStreaming(streaming bool) {
c.currentlyStreaming = streaming
}
func (c initConnection) CurrentlyStreaming() bool {
return c.currentlyStreaming
}
func (c initConnection) SupportsStreaming() bool {
return c.canStream
}

// Connection implements the driver.Connection interface to allow reading and writing wire
// messages and the driver.Expirable interface to allow expiring.
Expand Down