Skip to content

Commit

Permalink
socket: separate read/write network timeouts
Browse files Browse the repository at this point in the history
Splits DialInfo.Timeout (defaults to 60s when using mgo.Dial()) into ReadTimeout
and WriteTimeout to address #160. Read/write timeout defaults to
DialInfo.Timeout to preserve existing behaviour.
  • Loading branch information
domodwyer committed May 9, 2018
1 parent 45151e7 commit 775e6d7
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 70 deletions.
17 changes: 12 additions & 5 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,17 @@ type isMasterResult struct {
}

func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error {
// I'm not really sure why the timeout was hard coded to these values (I
// assume because everything is passed as a func arg, and thus this info is
// unavailable here), but leaving them as is for backwards compatibility.
config := &DialInfo{
Timeout: 10 * time.Second,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

// Monotonic let's it talk to a slave and still hold the socket.
session := newSession(Monotonic, cluster, 10*time.Second)
session := newSession(Monotonic, cluster, config)
session.setSocket(socket)

var cmd = bson.D{{Name: "isMaster", Value: 1}}
Expand Down Expand Up @@ -624,9 +633,7 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout
// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration,
) (s *mongoSocket, err error) {
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(mode Mode, slaveOk bool, syncTimeout time.Duration, serverTags []bson.D, info *DialInfo) (s *mongoSocket, err error) {
var started time.Time
var syncCount uint
for {
Expand Down Expand Up @@ -670,7 +677,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
continue
}

s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout)
s, abended, err := server.AcquireSocketWithBlocking(info)
if err == errPoolTimeout {
// No need to remove servers from the topology if acquiring a socket fails for this reason.
return nil, err
Expand Down
44 changes: 22 additions & 22 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,29 +124,31 @@ var errServerClosed = errors.New("server was closed")
// use in this server is greater than the provided limit, errPoolLimit is
// returned.
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond)
info := &DialInfo{
PoolLimit: poolLimit,
ReadTimeout: timeout,
WriteTimeout: timeout,
Timeout: timeout,
}
return server.acquireSocketInternal(info, false)
}

// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_
// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout
// should elapse before a socket is available, it will return errPoolTimeout.
func (server *mongoServer) AcquireSocketWithBlocking(
poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration,
) (socket *mongoSocket, abended bool, err error) {
return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout)
func (server *mongoServer) AcquireSocketWithBlocking(info *DialInfo) (socket *mongoSocket, abended bool, err error) {
return server.acquireSocketInternal(info, true)
}

func (server *mongoServer) acquireSocketInternal(
poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration,
) (socket *mongoSocket, abended bool, err error) {
func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock bool) (socket *mongoSocket, abended bool, err error) {
for {
server.Lock()
abended = server.abended
if server.closed {
server.Unlock()
return nil, abended, errServerClosed
}
if poolLimit > 0 {
if info.poolLimit() > 0 {
if shouldBlock {
// Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout
// with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout,
Expand All @@ -158,11 +160,11 @@ func (server *mongoServer) acquireSocketInternal(
// https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine.
waitDone := make(chan struct{})
timeoutHit := false
if poolTimeout > 0 {
if info.PoolTimeout > 0 {
go func() {
select {
case <-waitDone:
case <-time.After(poolTimeout):
case <-time.After(info.PoolTimeout):
// timeoutHit is part of the wait condition, so needs to be changed under mutex.
server.Lock()
defer server.Unlock()
Expand All @@ -172,7 +174,7 @@ func (server *mongoServer) acquireSocketInternal(
}()
}
timeSpentWaiting := time.Duration(0)
for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit {
for len(server.liveSockets)-len(server.unusedSockets) >= info.poolLimit() && !timeoutHit {
// We only count time spent in Wait(), and not time evaluating the entire loop,
// so that in the happy non-blocking path where the condition above evaluates true
// first time, we record a nice round zero wait time.
Expand All @@ -191,7 +193,7 @@ func (server *mongoServer) acquireSocketInternal(
// Record that we fetched a connection of of a socket list and how long we spent waiting
stats.noticeSocketAcquisition(timeSpentWaiting)
} else {
if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit {
if len(server.liveSockets)-len(server.unusedSockets) >= info.poolLimit() {
server.Unlock()
return nil, false, errPoolLimit
}
Expand All @@ -202,15 +204,15 @@ func (server *mongoServer) acquireSocketInternal(
socket = server.unusedSockets[n-1]
server.unusedSockets[n-1] = nil // Help GC.
server.unusedSockets = server.unusedSockets[:n-1]
info := server.info
serverInfo := server.info
server.Unlock()
err = socket.InitialAcquire(info, timeout)
err = socket.InitialAcquire(serverInfo, info)
if err != nil {
continue
}
} else {
server.Unlock()
socket, err = server.Connect(timeout)
socket, err = server.Connect(info)
if err == nil {
server.Lock()
// We've waited for the Connect, see if we got
Expand All @@ -231,20 +233,18 @@ func (server *mongoServer) acquireSocketInternal(

// Connect establishes a new connection to the server. This should
// generally be done through server.AcquireSocket().
func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error) {
func (server *mongoServer) Connect(info *DialInfo) (*mongoSocket, error) {
server.RLock()
master := server.info.Master
dial := server.dial
server.RUnlock()

logf("Establishing new connection to %s (timeout=%s)...", server.Addr, timeout)
logf("Establishing new connection to %s (timeout=%s)...", server.Addr, info.Timeout)
var conn net.Conn
var err error
switch {
case !dial.isSet():
// Cannot do this because it lacks timeout support. :-(
//conn, err = net.DialTCP("tcp", nil, server.tcpaddr)
conn, err = net.DialTimeout("tcp", server.ResolvedAddr, timeout)
conn, err = net.DialTimeout("tcp", server.ResolvedAddr, info.Timeout)
if tcpconn, ok := conn.(*net.TCPConn); ok {
tcpconn.SetKeepAlive(true)
} else if err == nil {
Expand All @@ -264,7 +264,7 @@ func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error)
logf("Connection to %s established.", server.Addr)

stats.conn(+1, master)
return newSocket(server, conn, timeout), nil
return newSocket(server, conn, info), nil
}

// Close forces closing all sockets that are alive, whether
Expand Down
Loading

0 comments on commit 775e6d7

Please sign in to comment.