Skip to content

Commit

Permalink
router: fix connection score becomes wrong when a connection closes d…
Browse files Browse the repository at this point in the history
…uring redirecting (#451)
  • Loading branch information
djshow832 authored Jan 16, 2024
1 parent 3c493b2 commit 475507c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
4 changes: 3 additions & 1 deletion pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func (b *backendWrapper) String() string {
// connWrapper wraps RedirectableConn.
type connWrapper struct {
RedirectableConn
phase connPhase
// Reference to the target backend if it's redirecting, otherwise nil.
redirectingBackend *backendWrapper
// Last redirect start time of this connection.
lastRedirect time.Time
phase connPhase
}
17 changes: 13 additions & 4 deletions pkg/manager/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (router *ScoreBasedRouter) RedirectConnections() error {
connWrapper.phase = phaseRedirectNotify
// we dont care the results
_ = connWrapper.Redirect(backend)
connWrapper.redirectingBackend = backend
}
}
}
Expand Down Expand Up @@ -263,18 +264,25 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec
router.adjustBackendList(toBe, true)
connWrapper.phase = phaseRedirectFail
}
connWrapper.redirectingBackend = nil
addMigrateMetrics(from, to, succeed, connWrapper.lastRedirect)
}

// OnConnClosed implements ConnEventReceiver.OnConnClosed interface.
func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) error {
router.Lock()
defer router.Unlock()
// OnConnClosed is always called after processing ongoing redirect events,
// so the addr passed in is the right backend.
be := router.ensureBackend(addr, true)
be.Value.connScore--
router.removeConn(be, router.getConnWrapper(conn))
connWrapper := router.getConnWrapper(conn)
redirectingBackend := connWrapper.Value.redirectingBackend
// If this connection is redirecting, decrease the score of the target backend.
if redirectingBackend != nil {
redirectingBackend.connScore--
connWrapper.Value.redirectingBackend = nil
} else {
be.Value.connScore--
}
router.removeConn(be, connWrapper)
return nil
}

Expand Down Expand Up @@ -375,6 +383,7 @@ func (router *ScoreBasedRouter) rebalance(maxNum int) {
conn.phase = phaseRedirectNotify
conn.lastRedirect = curTime
conn.Redirect(idlestBackend)
conn.redirectingBackend = idlestBackend
}
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/manager/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,3 +900,23 @@ func TestBackendHealthy(t *testing.T) {
require.False(t, conn.to.Healthy())
tester.redirectFinish(1, false)
}

func TestCloseRedirectingConns(t *testing.T) {
// Make the connection redirect.
tester := newRouterTester(t)
tester.addBackends(1)
tester.addConnections(1)
require.Equal(t, 1, tester.getBackendByIndex(0).connScore)
tester.killBackends(1)
tester.addBackends(1)
tester.rebalance(1)
require.Equal(t, 0, tester.getBackendByIndex(0).connScore)
require.Equal(t, 1, tester.getBackendByIndex(1).connScore)
// Close the connection.
tester.updateBackendStatusByAddr(tester.getBackendByIndex(0).Addr(), StatusHealthy)
tester.closeConnections(1, true)
require.Equal(t, 0, tester.getBackendByIndex(0).connScore)
require.Equal(t, 0, tester.getBackendByIndex(1).connScore)
require.Equal(t, 0, tester.getBackendByIndex(0).connList.Len())
require.Equal(t, 0, tester.getBackendByIndex(1).connList.Len())
}

0 comments on commit 475507c

Please sign in to comment.