Skip to content

Commit

Permalink
Update topology.Server type
Browse files Browse the repository at this point in the history
GODRIVER-932

Change-Id: I1234c66952cde375a272052f87f9753ab14e7706
  • Loading branch information
skriptble committed May 1, 2019
1 parent 3148566 commit 8748f69
Show file tree
Hide file tree
Showing 78 changed files with 2,548 additions and 518 deletions.
2 changes: 2 additions & 0 deletions .errcheck-excludes
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
(go.mongodb.org/mongo-driver/x/mongo/driver.Connection).Close
(*go.mongodb.org/mongo-driver/x/network/connection.connection).Close
(go.mongodb.org/mongo-driver/x/network/connection.Connection).Close
(*go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology.connection).close
(*go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology.Subscription).Unsubscribe
(*go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology.Server).Close
(*go.mongodb.org/mongo-driver/x/network/connection.pool).closeConnection
Expand Down
15 changes: 7 additions & 8 deletions internal/testutil/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/connection"
"go.mongodb.org/mongo-driver/x/network/connstring"
"go.mongodb.org/mongo-driver/x/network/description"
)
Expand Down Expand Up @@ -86,10 +85,10 @@ func MonitoredTopology(t *testing.T, dbName string, monitor *event.CommandMonito
topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
return append(
opts,
topology.WithConnectionOptions(func(opts ...connection.Option) []connection.Option {
topology.WithConnectionOptions(func(opts ...topology.ConnectionOption) []topology.ConnectionOption {
return append(
opts,
connection.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
topology.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
return monitor
}),
)
Expand All @@ -106,7 +105,7 @@ func MonitoredTopology(t *testing.T, dbName string, monitor *event.CommandMonito
s, err := monitoredTopology.SelectServer(context.Background(), description.WriteSelector())
require.NoError(t, err)

c, err := s.Connection(context.Background())
c, err := s.ConnectionLegacy(context.Background())
require.NoError(t, err)

_, err = (&command.Write{
Expand All @@ -128,10 +127,10 @@ func GlobalMonitoredTopology(t *testing.T, monitor *event.CommandMonitor) *topol
topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
return append(
opts,
topology.WithConnectionOptions(func(opts ...connection.Option) []connection.Option {
topology.WithConnectionOptions(func(opts ...topology.ConnectionOption) []topology.ConnectionOption {
return append(
opts,
connection.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
topology.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
return monitor
}),
)
Expand All @@ -150,7 +149,7 @@ func GlobalMonitoredTopology(t *testing.T, monitor *event.CommandMonitor) *topol
s, err := monitoredTopology.SelectServer(context.Background(), description.WriteSelector())
require.NoError(t, err)

c, err := s.Connection(context.Background())
c, err := s.ConnectionLegacy(context.Background())
require.NoError(t, err)

_, err = (&command.Write{
Expand Down Expand Up @@ -182,7 +181,7 @@ func Topology(t *testing.T) *topology.Topology {
s, err := liveTopology.SelectServer(context.Background(), description.WriteSelector())
require.NoError(t, err)

c, err := s.Connection(context.Background())
c, err := s.ConnectionLegacy(context.Background())
require.NoError(t, err)

_, err = (&command.Write{
Expand Down
6 changes: 3 additions & 3 deletions internal/testutil/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func EnableMaxTimeFailPoint(t *testing.T, s *topology.Server) error {
{"mode", bsonx.String("alwaysOn")},
},
}
conn, err := s.Connection(context.Background())
conn, err := s.ConnectionLegacy(context.Background())
require.NoError(t, err)
defer testhelpers.RequireNoErrorOnClose(t, conn)
_, err = cmd.RoundTrip(context.Background(), s.SelectedDescription(), conn)
Expand All @@ -135,7 +135,7 @@ func DisableMaxTimeFailPoint(t *testing.T, s *topology.Server) {
{"mode", bsonx.String("off")},
},
}
conn, err := s.Connection(context.Background())
conn, err := s.ConnectionLegacy(context.Background())
require.NoError(t, err)
defer testhelpers.RequireNoErrorOnClose(t, conn)
_, err = cmd.RoundTrip(context.Background(), s.SelectedDescription(), conn)
Expand All @@ -144,7 +144,7 @@ func DisableMaxTimeFailPoint(t *testing.T, s *topology.Server) {

// RunCommand runs an arbitrary command on a given database of target server
func RunCommand(t *testing.T, s *topology.Server, db string, b bsonx.Doc) (bson.Raw, error) {
conn, err := s.Connection(context.Background())
conn, err := s.ConnectionLegacy(context.Background())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (cs *ChangeStream) runCommand(ctx context.Context, replaceOptions bool) err
}

desc := ss.Description()
conn, err := ss.Connection(ctx)
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return replaceErrors(err)
}
Expand Down
39 changes: 20 additions & 19 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package mongo

import (
"context"
"crypto/tls"
"strings"
"time"

Expand All @@ -18,13 +19,13 @@ import (
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy"
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy/auth"
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy/session"
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology"
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/connection"
"go.mongodb.org/mongo-driver/x/network/connstring"
"go.mongodb.org/mongo-driver/x/network/description"
)
Expand Down Expand Up @@ -195,7 +196,7 @@ func (c *Client) configure(opts *options.ClientOptions) error {
return err
}

var connOpts []connection.Option
var connOpts []topology.ConnectionOption
var serverOpts []topology.ServerOption
var topologyOpts []topology.Option

Expand All @@ -211,15 +212,15 @@ func (c *Client) configure(opts *options.ClientOptions) error {
if len(opts.Compressors) > 0 {
comps = opts.Compressors

connOpts = append(connOpts, connection.WithCompressors(
connOpts = append(connOpts, topology.WithCompressors(
func(compressors []string) []string {
return append(compressors, comps...)
},
))

for _, comp := range comps {
if comp == "zlib" {
connOpts = append(connOpts, connection.WithZlibLevel(func(level *int) *int {
connOpts = append(connOpts, topology.WithZlibLevel(func(level *int) *int {
return opts.ZlibLevel
}))
}
Expand All @@ -230,8 +231,8 @@ func (c *Client) configure(opts *options.ClientOptions) error {
))
}
// Handshaker
var handshaker = func(connection.Handshaker) connection.Handshaker {
return &command.Handshake{Client: command.ClientDoc(appName), Compressors: comps}
var handshaker = func(driver.Handshaker) driver.Handshaker {
return driver.IsMaster().AppName(appName).Compressors(comps)
}
// Auth & Database & Password & Username
if opts.Auth != nil {
Expand Down Expand Up @@ -274,24 +275,24 @@ func (c *Client) configure(opts *options.ClientOptions) error {
}
}

handshaker = func(connection.Handshaker) connection.Handshaker {
handshaker = func(driver.Handshaker) driver.Handshaker {
return auth.Handshaker(nil, handshakeOpts)
}
}
connOpts = append(connOpts, connection.WithHandshaker(handshaker))
connOpts = append(connOpts, topology.WithHandshaker(handshaker))
// ConnectTimeout
if opts.ConnectTimeout != nil {
serverOpts = append(serverOpts, topology.WithHeartbeatTimeout(
func(time.Duration) time.Duration { return *opts.ConnectTimeout },
))
connOpts = append(connOpts, connection.WithConnectTimeout(
connOpts = append(connOpts, topology.WithConnectTimeout(
func(time.Duration) time.Duration { return *opts.ConnectTimeout },
))
}
// Dialer
if opts.Dialer != nil {
connOpts = append(connOpts, connection.WithDialer(
func(connection.Dialer) connection.Dialer { return opts.Dialer },
connOpts = append(connOpts, topology.WithDialer(
func(topology.Dialer) topology.Dialer { return opts.Dialer },
))
}
// Direct
Expand Down Expand Up @@ -320,7 +321,7 @@ func (c *Client) configure(opts *options.ClientOptions) error {
}
// MaxConIdleTime
if opts.MaxConnIdleTime != nil {
connOpts = append(connOpts, connection.WithIdleTimeout(
connOpts = append(connOpts, topology.WithIdleTimeout(
func(time.Duration) time.Duration { return *opts.MaxConnIdleTime },
))
}
Expand All @@ -334,7 +335,7 @@ func (c *Client) configure(opts *options.ClientOptions) error {
}
// Monitor
if opts.Monitor != nil {
connOpts = append(connOpts, connection.WithMonitor(
connOpts = append(connOpts, topology.WithMonitor(
func(*event.CommandMonitor) *event.CommandMonitor { return opts.Monitor },
))
}
Expand Down Expand Up @@ -373,15 +374,15 @@ func (c *Client) configure(opts *options.ClientOptions) error {
if opts.SocketTimeout != nil {
connOpts = append(
connOpts,
connection.WithReadTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }),
connection.WithWriteTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }),
topology.WithReadTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }),
topology.WithWriteTimeout(func(time.Duration) time.Duration { return *opts.SocketTimeout }),
)
}
// TLSConfig
if opts.TLSConfig != nil {
connOpts = append(connOpts, connection.WithTLSConfig(
func(*connection.TLSConfig) *connection.TLSConfig {
return &connection.TLSConfig{Config: opts.TLSConfig}
connOpts = append(connOpts, topology.WithTLSConfig(
func(*tls.Config) *tls.Config {
return opts.TLSConfig
},
))
}
Expand All @@ -396,7 +397,7 @@ func (c *Client) configure(opts *options.ClientOptions) error {
serverOpts = append(
serverOpts,
topology.WithClock(func(*session.ClusterClock) *session.ClusterClock { return c.clock }),
topology.WithConnectionOptions(func(...connection.Option) []connection.Option { return connOpts }),
topology.WithConnectionOptions(func(...topology.ConnectionOption) []topology.ConnectionOption { return connOpts }),
)
c.topologyOptions = append(topologyOpts, topology.WithServerOptions(
func(...topology.ServerOption) []topology.ServerOption { return serverOpts },
Expand Down
7 changes: 3 additions & 4 deletions mongo/retryable_writes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal/testutil"
"go.mongodb.org/mongo-driver/internal/testutil/helpers"
testhelpers "go.mongodb.org/mongo-driver/internal/testutil/helpers"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy/session"
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology"
"go.mongodb.org/mongo-driver/x/network/connection"
"go.mongodb.org/mongo-driver/x/network/connstring"
)

Expand Down Expand Up @@ -361,10 +360,10 @@ func createRetryMonitoredTopology(t *testing.T, clock *session.ClusterClock, mon
topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
return append(
opts,
topology.WithConnectionOptions(func(opts ...connection.Option) []connection.Option {
topology.WithConnectionOptions(func(opts ...topology.ConnectionOption) []topology.ConnectionOption {
return append(
opts,
connection.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
topology.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
return monitor
}),
)
Expand Down
9 changes: 4 additions & 5 deletions mongo/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal/testutil"
"go.mongodb.org/mongo-driver/internal/testutil/helpers"
testhelpers "go.mongodb.org/mongo-driver/internal/testutil/helpers"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
Expand All @@ -31,7 +31,6 @@ import (
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy/session"
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/connection"
"go.mongodb.org/mongo-driver/x/network/connstring"
"go.mongodb.org/mongo-driver/x/network/description"
)
Expand Down Expand Up @@ -182,10 +181,10 @@ func createMonitoredTopology(t *testing.T, clock *session.ClusterClock, monitor
topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
return append(
opts,
topology.WithConnectionOptions(func(opts ...connection.Option) []connection.Option {
topology.WithConnectionOptions(func(opts ...topology.ConnectionOption) []topology.ConnectionOption {
return append(
opts,
connection.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
topology.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
return monitor
}),
)
Expand All @@ -211,7 +210,7 @@ func createMonitoredTopology(t *testing.T, clock *session.ClusterClock, monitor
t.Fatal(err)
}

c, err := s.Connection(context.Background())
c, err := s.ConnectionLegacy(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion mongo/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func killSessions(t *testing.T, client *Client) {
DB: "admin",
Command: bsonx.Doc{{"killAllSessions", bsonx.Array(vals)}},
}
conn, err := s.Connection(ctx)
conn, err := s.ConnectionLegacy(ctx)
require.NoError(t, err)
defer testhelpers.RequireNoErrorOnClose(t, conn)
// ignore the error because command kills its own implicit session
Expand Down
66 changes: 66 additions & 0 deletions x/mongo/driver/batches.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package driver

import (
"errors"

"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
)

// this is the amount of reserved buffer space in a message that the
// driver reserves for command overhead.
const reservedCommandBufferBytes = 16 * 10 * 10 * 10

// ErrDocumentTooLarge occurs when a document that is larger than the maximum size accepted by a
// server is passed to an insert command.
var ErrDocumentTooLarge = errors.New("an inserted document is too large")

// Batches contains the necessary information to batch split an operation. This is only used for write
// oeprations.
type Batches struct {
Identifier string
Documents []bsoncore.Document
Current []bsoncore.Document
Ordered *bool
}

// Valid returns true if Batches contains both an identifier and the length of Documents is greater
// than zero.
func (b *Batches) Valid() bool { return b != nil && b.Identifier != "" && len(b.Documents) > 0 }

// ClearBatch clears the Current batch. This must be called before AdvanceBatch will advance to the
// next batch.
func (b *Batches) ClearBatch() { b.Current = b.Current[:0] }

// AdvanceBatch splits the next batch using maxCount and targetBbatchSize. This method will do nothing if
// the current batch has not been cleared. We do this so that when this is called during execute we
// can call it without first needing to check if we already have a batch, which makes the code
// simpler and makes retrying easier.
func (b *Batches) AdvanceBatch(maxCount, targetBatchSize int) error {
if len(b.Current) > 0 {
return nil
}
if targetBatchSize > reservedCommandBufferBytes {
targetBatchSize -= reservedCommandBufferBytes
}

if maxCount <= 0 {
maxCount = 1
}

splitAfter := 0
size := 1
for _, doc := range b.Documents {
if len(doc) > targetBatchSize {
return ErrDocumentTooLarge
}
if size+len(doc) > targetBatchSize {
break
}

size += len(doc)
splitAfter++
}

b.Current, b.Documents = b.Documents[:splitAfter], b.Documents[splitAfter:]
return nil
}
Loading

0 comments on commit 8748f69

Please sign in to comment.