Skip to content

Commit

Permalink
Create topology.pool type
Browse files Browse the repository at this point in the history
GODRIVER-929

Change-Id: Ia01fd8c0586273d6af751b92932457bd64f1ae03
  • Loading branch information
skriptble committed Apr 25, 2019
1 parent 3e5f96b commit 9bb1f8b
Show file tree
Hide file tree
Showing 7 changed files with 812 additions and 19 deletions.
1 change: 1 addition & 0 deletions .errcheck-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(*go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology.Subscription).Unsubscribe
(*go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology.Server).Close
(*go.mongodb.org/mongo-driver/x/network/connection.pool).closeConnection
(*go.mongodb.org/mongo-driver/x/mongo/driverlegacy/topology.pool).close
(go.mongodb.org/mongo-driver/x/network/wiremessage.ReadWriteCloser).Close
(*go.mongodb.org/mongo-driver/mongo.Cursor).Close
(*go.mongodb.org/mongo-driver/mongo.ChangeStream).Close
Expand Down
14 changes: 14 additions & 0 deletions x/mongo/driverlegacy/topology/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,17 @@ closing it.

The connection implementations in this package are conduits for wire messages but they have no
ability to encode, decode, or validate wire messages. That must be handled by consumers.

## Pool
The `pool` type implements a connection pool. It handles caching idle connections and dialing
new ones, but it does not track a maximum number of connections. That is the responsibility of a
wrapping type, like `Server`.

The `pool` type has no concept of closing, instead it has concepts of connecting and disconnecting.
This allows a `Topology` to be disconnected,but keeping the memory around to be reconnected later.
There is a `close` method, but this is used to close a connection.

There are three methods related to getting and putting connections: `get`, `close`, and `put`. The
`get` method will either retrieve a connection from the cache or it will dial a new `connection`.
The `close` method will close the underlying socket of a `connection`. The `put` method will put a
connection into the pool, placing it in the cahce if there is space, otherwise it will close it.
16 changes: 10 additions & 6 deletions x/mongo/driverlegacy/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import (
"go.mongodb.org/mongo-driver/x/network/wiremessage"
)

// ErrConnectionClosed is returned when attempting to call a method on a Connection that has already
// been closed.
var ErrConnectionClosed = errors.New("the Connection is closed")

var globalConnectionID uint64

func nextConnectionID() uint64 { return atomic.AddUint64(&globalConnectionID, 1) }
Expand All @@ -45,6 +41,11 @@ type connection struct {
readTimeout time.Duration
writeTimeout time.Duration
desc description.Server

// pool related fields
pool *pool
poolID uint64
generation uint64
}

// newConnection handles the creation of a connection. It will dial, configure TLS, and perform
Expand Down Expand Up @@ -290,8 +291,11 @@ func (c *Connection) Close() error {
if c.connection == nil {
return nil
}
// TODO(GODRIVER-929): Return c.connection to the pool.
// TODO(GODRIVER-929): Release an entry in the semaphore.
// TODO(GODRIVER-932): Release an entry in the semaphore.
err := c.pool.put(c.connection)
if err != nil {
return err
}
c.connection = nil
return nil
}
Expand Down
76 changes: 76 additions & 0 deletions x/mongo/driverlegacy/topology/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"errors"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -444,3 +445,78 @@ func (tnc *testNetConn) SetWriteDeadline(t time.Time) error {
}
return tnc.nc.SetWriteDeadline(t)
}

// bootstrapConnection creates a listener that will listen for a single connection
// on the return address. The user provided run function will be called with the accepted
// connection. The user is responsible for closing the connection.
func bootstrapConnections(t *testing.T, num int, run func(net.Conn)) net.Addr {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Errorf("Could not set up a listener: %v", err)
t.FailNow()
}
go func() {
for i := 0; i < num; i++ {
c, err := l.Accept()
if err != nil {
t.Errorf("Could not accept a connection: %v", err)
}
go run(c)
}
_ = l.Close()
}()
return l.Addr()
}

type netconn struct {
net.Conn
closed chan struct{}
d *dialer
}

func (nc *netconn) Close() error {
nc.closed <- struct{}{}
nc.d.connclosed(nc)
return nc.Conn.Close()
}

type dialer struct {
Dialer
opened map[*netconn]struct{}
closed map[*netconn]struct{}
sync.Mutex
}

func newdialer(d Dialer) *dialer {
return &dialer{Dialer: d, opened: make(map[*netconn]struct{}), closed: make(map[*netconn]struct{})}
}

func (d *dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
d.Lock()
defer d.Unlock()
c, err := d.Dialer.DialContext(ctx, network, address)
if err != nil {
return nil, err
}
nc := &netconn{Conn: c, closed: make(chan struct{}, 1), d: d}
d.opened[nc] = struct{}{}
return nc, nil
}

func (d *dialer) connclosed(nc *netconn) {
d.Lock()
defer d.Unlock()
d.closed[nc] = struct{}{}
}

func (d *dialer) lenopened() int {
d.Lock()
defer d.Unlock()
return len(d.opened)
}

func (d *dialer) lenclosed() int {
d.Lock()
defer d.Unlock()
return len(d.closed)
}
195 changes: 195 additions & 0 deletions x/mongo/driverlegacy/topology/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package topology

import (
"context"
"sync"
"sync/atomic"
"time"

"go.mongodb.org/mongo-driver/x/network/address"
)

// ErrPoolConnected is returned from an attempt to connect an already connected pool
var ErrPoolConnected = PoolError("pool is connected")

// ErrPoolDisconnected is returned from an attempt to disconnect an already disconnected
// or disconnecting pool.
var ErrPoolDisconnected = PoolError("pool is disconnected or disconnecting")

// ErrConnectionClosed is returned from an attempt to use an already closed connection.
var ErrConnectionClosed = ConnectionError{ConnectionID: "<closed>", message: "connection is closed"}

// ErrWrongPool is return when a connection is returned to a pool it doesn't belong to.
var ErrWrongPool = PoolError("connection does not belong to this pool")

// PoolError is an error returned from a Pool method.
type PoolError string

func (pe PoolError) Error() string { return string(pe) }

type pool struct {
address address.Address
opts []ConnectionOption
conns chan *connection
generation uint64

connected int32 // Must be accessed using the sync/atomic package.
nextid uint64
opened map[uint64]*connection // opened holds all of the currently open connections.

sync.Mutex
}

// newPool creates a new pool that will hold size number of idle connections. It will use the
// provided options when creating connections.
func newPool(addr address.Address, size uint64, opts ...ConnectionOption) *pool {
return &pool{
address: addr,
conns: make(chan *connection, size),
generation: 0,
connected: disconnected,
opened: make(map[uint64]*connection),
opts: opts,
}
}

// drain lazily drains the pool by increasing the generation ID.
func (p *pool) drain() { atomic.AddUint64(&p.generation, 1) }
func (p *pool) expired(generation uint64) bool { return generation < atomic.LoadUint64(&p.generation) }

// connect puts the pool into the connected state, allowing it to be used.
func (p *pool) connect() error {
if !atomic.CompareAndSwapInt32(&p.connected, disconnected, connected) {
return ErrPoolConnected
}
atomic.AddUint64(&p.generation, 1)
return nil
}

func (p *pool) disconnect(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&p.connected, connected, disconnecting) {
return ErrPoolDisconnected
}

// We first clear out the idle connections, then we wait until the context's deadline is hit or
// it's cancelled, after which we aggressively close the remaining open connections.
for {
select {
case pc := <-p.conns:
_ = p.close(pc) // We don't care about errors while closing the connection.
continue
default:
}
break
}
if dl, ok := ctx.Deadline(); ok {
// If we have a deadline then we interpret it as a request to gracefully shutdown. We wait
// until either all the connections have landed back in the pool (and have been closed) or
// until the timer is done.
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
timer := time.NewTimer(time.Now().Sub(dl))
defer timer.Stop()
for {
select {
case <-timer.C:
case <-ticker.C: // Can we repalce this with an actual signal channel? We will know when p.inflight hits zero from the close method.
p.Lock()
if len(p.opened) > 0 {
p.Unlock()
continue
}
p.Unlock()
}
break
}
}

// We copy the remaining connections into a slice, then iterate it to close them. This allows us
// to use a single function to actually clean up and close connections at the expense of a
// double itertion in the worse case.
p.Lock()
toClose := make([]*connection, 0, len(p.opened))
for _, pc := range p.opened {
toClose = append(toClose, pc)
}
p.Unlock()
for _, pc := range toClose {
_ = p.close(pc) // We don't care about errors while closing the connection.
}
atomic.StoreInt32(&p.connected, disconnected)
return nil
}

func (p *pool) get(ctx context.Context) (*connection, error) {
if atomic.LoadInt32(&p.connected) != connected {
return nil, ErrPoolDisconnected
}
select {
case c := <-p.conns:
if c.expired() {
go p.close(c)
return p.get(ctx)
}

return c, nil
case <-ctx.Done():
return nil, ctx.Err()
default:
c, err := newConnection(ctx, p.address, p.opts...)
if err != nil {
return nil, err
}

c.pool = p
c.poolID = atomic.AddUint64(&p.nextid, 1)
c.generation = p.generation

if atomic.LoadInt32(&p.connected) != connected {
_ = p.close(c) // The pool is disconnected or disconnecting, ignore the error from closing the connection.
return nil, ErrPoolDisconnected
}
p.Lock()
p.opened[c.poolID] = c
p.Unlock()
return c, nil
}
}

// close closes a connection, not the pool itself. This method will actually close the connection,
// making it unusable, to instead return the connection to the pool, use put.
func (p *pool) close(c *connection) error {
if c.pool != p {
return ErrWrongPool
}
p.Lock()
delete(p.opened, c.poolID)
p.Unlock()
if c.nc == nil {
return nil // We're closing an already closed connection.
}
err := c.nc.Close()
c.nc = nil
if err != nil {
return ConnectionError{ConnectionID: c.id, Wrapped: err, message: "failed to close net.Conn"}
}
return nil
}

// put returns a connection to this pool. If the pool is connected, the connection is not
// expired, and there is space in the cache, the connection is returned to the cache.
func (p *pool) put(c *connection) error {
if c.pool != p {
return ErrWrongPool
}
if atomic.LoadInt32(&p.connected) != connected || c.expired() {
return p.close(c)
}

select {
case p.conns <- c:
return nil
default:
return p.close(c)
}
}
Loading

0 comments on commit 9bb1f8b

Please sign in to comment.