Skip to content

Commit

Permalink
GODRIVER-1489 Add ability to stream wire messages (mongodb#405)
Browse files Browse the repository at this point in the history
  • Loading branch information
Divjot Arora committed Jun 2, 2020
1 parent 57a47fe commit ae39d96
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ driver-test-data.tar.gz
perf
**mongocryptd.pid
*.test
**.md
27 changes: 16 additions & 11 deletions x/mongo/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ type Expirable interface {
Alive() bool
}

// StreamerConnection represents a Connection that supports streaming wire protocol messages using the moreToCome and
// exhaustAllowed flags.
//
// The SetStreaming and CurrentlyStreaming functions correspond to the moreToCome flag on server responses. If a
// response has moreToCome set, SetStreaming(true) will be called and CurrentlyStreaming() should return true.
//
// CanStream corresponds to the exhaustAllowed flag. The operations layer will set exhaustAllowed on outgoing wire
// messages to inform the server that the driver supports streaming.
type StreamerConnection interface {
Connection
SetStreaming(bool)
CurrentlyStreaming() bool
SupportsStreaming() bool
}

// Compressor is an interface used to compress wire messages. If a Connection supports compression
// it should implement this interface as well. The CompressWireMessage method will be called during
// the execution of an operation if the wire message is allowed to be compressed.
Expand Down Expand Up @@ -128,20 +143,10 @@ func (ssd SingleConnectionDeployment) SupportsRetryWrites() bool { return false
func (ssd SingleConnectionDeployment) Kind() description.TopologyKind { return description.Single }

// Connection implements the Server interface. It always returns the embedded connection.
//
// This method returns a Connection with a no-op Close method. This ensures that a
// SingleConnectionDeployment can be used across multiple operation executions.
func (ssd SingleConnectionDeployment) Connection(context.Context) (Connection, error) {
return nopCloserConnection{ssd.C}, nil
return ssd.C, nil
}

// nopCloserConnection is an adapter used in a SingleConnectionDeployment. It passes through all
// functionality expcect for closing, which is a no-op. This is done so the connection can be used
// across multiple operations.
type nopCloserConnection struct{ Connection }

func (ncc nopCloserConnection) Close() error { return nil }

// TODO(GODRIVER-617): We can likely use 1 type for both the Type and the RetryMode by using
// 2 bits for the mode and 1 bit for the type. Although in the practical sense, we might not want to
// do that since the type of retryability is tied to the operation itself and isn't going change,
Expand Down
28 changes: 24 additions & 4 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
if len(scratch) > 0 {
scratch = scratch[:0]
}
wm, startedInfo, err := op.createWireMessage(ctx, scratch, desc)
wm, startedInfo, err := op.createWireMessage(ctx, scratch, desc, conn)
if err != nil {
return err
}
Expand Down Expand Up @@ -575,6 +575,12 @@ func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) (
return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err}
}

return op.readWireMessage(ctx, conn, wm)
}

func (op Operation) readWireMessage(ctx context.Context, conn Connection, wm []byte) ([]byte, error) {
var err error

wm, err = conn.ReadWireMessage(ctx, wm[:0])
if err != nil {
labels := []string{NetworkError}
Expand All @@ -590,6 +596,12 @@ func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) (
return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err}
}

// If we're using a streamable connection, we set its streaming state based on the moreToCome flag in the server
// response.
if streamer, ok := conn.(StreamerConnection); ok {
streamer.SetStreaming(wiremessage.IsMsgMoreToCome(wm))
}

// decompress wiremessage
wm, err = op.decompressWireMessage(wm)
if err != nil {
Expand Down Expand Up @@ -675,12 +687,12 @@ func (Operation) decompressWireMessage(wm []byte) ([]byte, error) {
}

func (op Operation) createWireMessage(ctx context.Context, dst []byte,
desc description.SelectedServer) ([]byte, startedInformation, error) {
desc description.SelectedServer, conn Connection) ([]byte, startedInformation, error) {

if desc.WireVersion == nil || desc.WireVersion.Max < wiremessage.OpmsgWireVersion {
return op.createQueryWireMessage(dst, desc)
}
return op.createMsgWireMessage(ctx, dst, desc)
return op.createMsgWireMessage(ctx, dst, desc, conn)
}

func (op Operation) addBatchArray(dst []byte) []byte {
Expand Down Expand Up @@ -758,7 +770,9 @@ func (op Operation) createQueryWireMessage(dst []byte, desc description.Selected
return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil
}

func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc description.SelectedServer) ([]byte, startedInformation, error) {
func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc description.SelectedServer,
conn Connection) ([]byte, startedInformation, error) {

var info startedInformation
var flags wiremessage.MsgFlag
var wmindex int32
Expand All @@ -767,6 +781,12 @@ func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc d
if op.WriteConcern != nil && !writeconcern.AckWrite(op.WriteConcern) && (op.Batches == nil || len(op.Batches.Documents) == 0) {
flags = wiremessage.MoreToCome
}
// Set the ExhaustAllowed flag if the connection supports streaming. This will tell the server that it can
// respond with the MoreToCome flag and then stream responses over this connection.
if streamer, ok := conn.(StreamerConnection); ok && streamer.SupportsStreaming() {
flags |= wiremessage.ExhaustAllowed
}

info.requestID = wiremessage.NextRequestID()
wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpMsg)
dst = wiremessage.AppendMsgFlags(dst, flags)
Expand Down
35 changes: 35 additions & 0 deletions x/mongo/driver/operation_exhaust.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package driver

import (
"context"
"errors"

"go.mongodb.org/mongo-driver/x/mongo/driver/description"
)

// ExecuteExhaust reads a response from the provided StreamerConnection. This will error if the connection's
// CurrentlyStreaming function returns false.
func (op Operation) ExecuteExhaust(ctx context.Context, conn StreamerConnection, scratch []byte) error {
if !conn.CurrentlyStreaming() {
return errors.New("exhaust read must be done with a connection that is currently streaming")
}

scratch = scratch[:0]
res, err := op.readWireMessage(ctx, conn, scratch)
if err != nil {
return err
}
if op.ProcessResponseFn != nil {
if err = op.ProcessResponseFn(res, nil, description.Server{}); err != nil {
return err
}
}

return nil
}
107 changes: 100 additions & 7 deletions x/mongo/driver/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/google/go-cmp/cmp"
"go.mongodb.org/mongo-driver/bson/bsontype"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
Expand Down Expand Up @@ -518,6 +519,93 @@ func TestOperation(t *testing.T) {
})
}
})
t.Run("ExecuteExhaust", func(t *testing.T) {
t.Run("errors if connection is not streaming", func(t *testing.T) {
conn := &mockConnection{
rStreaming: false,
}
err := Operation{}.ExecuteExhaust(context.TODO(), conn, nil)
assert.NotNil(t, err, "expected error, got nil")
})
})
t.Run("exhaustAllowed and moreToCome", func(t *testing.T) {
// Test the interaction between exhaustAllowed and moreToCome on requests/responses when using the Execute
// and ExecuteExhaust methods.

// Create a server response wire message that has moreToCome=false.
serverResponseDoc := bsoncore.BuildDocumentFromElements(nil,
bsoncore.AppendInt32Element(nil, "ok", 1),
)
nonStreamingResponse := createExhaustServerResponse(t, serverResponseDoc, false)

// Create a connection that reports that it cannot stream messages.
conn := &mockConnection{
rDesc: description.Server{
WireVersion: &description.VersionRange{
Max: 6,
},
},
rReadWM: nonStreamingResponse,
rCanStream: false,
}
op := Operation{
CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) {
return bsoncore.AppendInt32Element(dst, "isMaster", 1), nil
},
Database: "admin",
Deployment: SingleConnectionDeployment{conn},
}
err := op.Execute(context.TODO(), nil)
assert.Nil(t, err, "Execute error: %v", err)

// The wire message sent to the server should not have exhaustAllowed=true. After execution, the connection
// should not be in a streaming state.
assertExhaustAllowedSet(t, conn.pWriteWM, false)
assert.False(t, conn.CurrentlyStreaming(), "expected CurrentlyStreaming to be false")

// Modify the connection to report that it can stream and create a new server response with moreToCome=true.
streamingResponse := createExhaustServerResponse(t, serverResponseDoc, true)
conn.rReadWM = streamingResponse
conn.rCanStream = true
err = op.Execute(context.TODO(), nil)
assert.Nil(t, err, "Execute error: %v", err)
assertExhaustAllowedSet(t, conn.pWriteWM, true)
assert.True(t, conn.CurrentlyStreaming(), "expected CurrentlyStreaming to be true")

// Reset the server response and go through ExecuteExhaust to mimic streaming the next response. After
// execution, the connection should still be in a streaming state.
conn.rReadWM = streamingResponse
err = op.ExecuteExhaust(context.TODO(), conn, nil)
assert.Nil(t, err, "ExecuteExhaust error: %v", err)
assert.True(t, conn.CurrentlyStreaming(), "expected CurrentlyStreaming to be true")
})
}

func createExhaustServerResponse(t *testing.T, response bsoncore.Document, moreToCome bool) []byte {
idx, wm := wiremessage.AppendHeaderStart(nil, 0, wiremessage.CurrentRequestID()+1, wiremessage.OpMsg)
var flags wiremessage.MsgFlag
if moreToCome {
flags = wiremessage.MoreToCome
}
wm = wiremessage.AppendMsgFlags(wm, flags)
wm = wiremessage.AppendMsgSectionType(wm, wiremessage.SingleDocument)
wm = bsoncore.AppendDocument(wm, response)
return bsoncore.UpdateLength(wm, idx, int32(len(wm)))
}

func assertExhaustAllowedSet(t *testing.T, wm []byte, expected bool) {
t.Helper()
_, _, _, _, wm, ok := wiremessage.ReadHeader(wm)
if !ok {
t.Fatal("could not read wm header")
}
flags, wm, ok := wiremessage.ReadMsgFlags(wm)
if !ok {
t.Fatal("could not read wm flags")
}

actual := flags&wiremessage.ExhaustAllowed > 0
assert.Equal(t, expected, actual, "expected exhaustAllowed set %v, got %v", expected, actual)
}

type mockDeployment struct {
Expand Down Expand Up @@ -554,20 +642,25 @@ type mockConnection struct {
pReadDst []byte

// returns
rWriteErr error
rReadWM []byte
rReadErr error
rDesc description.Server
rCloseErr error
rID string
rAddr address.Address
rWriteErr error
rReadWM []byte
rReadErr error
rDesc description.Server
rCloseErr error
rID string
rAddr address.Address
rCanStream bool
rStreaming bool
}

func (m *mockConnection) Description() description.Server { return m.rDesc }
func (m *mockConnection) Close() error { return m.rCloseErr }
func (m *mockConnection) ID() string { return m.rID }
func (m *mockConnection) Address() address.Address { return m.rAddr }
func (m *mockConnection) Stale() bool { return false }
func (m *mockConnection) SupportsStreaming() bool { return m.rCanStream }
func (m *mockConnection) CurrentlyStreaming() bool { return m.rStreaming }
func (m *mockConnection) SetStreaming(streaming bool) { m.rStreaming = streaming }

func (m *mockConnection) WriteWireMessage(_ context.Context, wm []byte) error {
m.pWriteWM = wm
Expand Down
12 changes: 12 additions & 0 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type connection struct {
config *connectionConfig
cancelConnectContext context.CancelFunc
connectContextMade chan struct{}
canStream bool
currentlyStreaming bool

// pool related fields
pool *pool
Expand Down Expand Up @@ -339,6 +341,7 @@ func (c *connection) bumpIdleDeadline() {
type initConnection struct{ *connection }

var _ driver.Connection = initConnection{}
var _ driver.StreamerConnection = initConnection{}

func (c initConnection) Description() description.Server {
if c.connection == nil {
Expand All @@ -362,6 +365,15 @@ func (c initConnection) WriteWireMessage(ctx context.Context, wm []byte) error {
func (c initConnection) ReadWireMessage(ctx context.Context, dst []byte) ([]byte, error) {
return c.readWireMessage(ctx, dst)
}
func (c initConnection) SetStreaming(streaming bool) {
c.currentlyStreaming = streaming
}
func (c initConnection) CurrentlyStreaming() bool {
return c.currentlyStreaming
}
func (c initConnection) SupportsStreaming() bool {
return c.canStream
}

// Connection implements the driver.Connection interface to allow reading and writing wire
// messages and the driver.Expirable interface to allow expiring.
Expand Down

0 comments on commit ae39d96

Please sign in to comment.