Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx throttler: healthcheck all cells if --tx-throttler-healthcheck-cells is undefined #12477

Merged
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1fecb5d
WIP
timvaillancourt Feb 24, 2023
6beae70
vttablet tx throttler: healthcheck all cells if cells unspecified
timvaillancourt Feb 24, 2023
c8ad379
Fix test typo
timvaillancourt Feb 24, 2023
6ed1bac
Simplify test
timvaillancourt Feb 24, 2023
4c04f24
remove unnecessary 'empty healthCheckCells given' error
timvaillancourt Feb 25, 2023
08c1a1d
last tweak to simplify test
timvaillancourt Feb 25, 2023
0dd51b8
Update go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
timvaillancourt Mar 15, 2023
25bf05b
Merge branch 'main' into tx-throtter-default-all-cells
timvaillancourt Mar 15, 2023
62a9a26
Merge branch 'main' into tx-throtter-default-all-cells
timvaillancourt May 17, 2023
71ea246
Merge branch 'main' into tx-throtter-default-all-cells
timvaillancourt Jun 18, 2023
ed439e4
Move .GetKnownCells() call to .Open()
timvaillancourt Jun 19, 2023
d5c87cc
Update test
timvaillancourt Jun 19, 2023
84e0a4a
Improve err message
timvaillancourt Jun 19, 2023
9887e9f
Add code comment to explain 2nd cell check
timvaillancourt Jun 20, 2023
90d15b0
Add func fetchKnownCells()
timvaillancourt Jun 24, 2023
abc694d
Add test for fetchKnownCells
timvaillancourt Jun 24, 2023
1cf084d
Restart health stream if topo cells change
timvaillancourt Jun 24, 2023
29c496a
Update cell list when changed
timvaillancourt Jun 25, 2023
a0698dd
log.Fatalf -> Errorf
timvaillancourt Jun 25, 2023
8b287b8
Improve test
timvaillancourt Jun 25, 2023
0e8ae6f
use defer to stop ticker
timvaillancourt Jun 25, 2023
ab35c8b
PR suggestion
timvaillancourt Jun 27, 2023
2f5c44d
Add cellsFromTopo bool to txThrottlerState
timvaillancourt Jun 27, 2023
e7fa178
pass context.Context to .fetchKnownCells(...)
timvaillancourt Jun 28, 2023
d88c453
merge main
timvaillancourt Jul 25, 2023
e25e5ef
PR suggestion
timvaillancourt Jul 25, 2023
42bba79
fix defer-cancel() in loop
timvaillancourt Jul 25, 2023
e616a92
Past context to updateHealthCheckCells
timvaillancourt Jul 25, 2023
2a879ad
PR feedback
timvaillancourt Jul 27, 2023
4d8bff0
Dont update cells in config
timvaillancourt Jul 27, 2023
e7ac2c6
Add "the" to cli flag help + force ci to run again
timvaillancourt Jul 27, 2023
b70a1fa
Missing ts.config -> ts.config.healthCheckCells
timvaillancourt Jul 29, 2023
616f4a6
Fix unit test failure
timvaillancourt Jul 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Restart health stream if topo cells change
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Jun 24, 2023
commit 1cf084dbceb9be43e36ff56afb17f5030b7babee
89 changes: 67 additions & 22 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package txthrottler
import (
"context"
"math/rand"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -191,6 +192,7 @@ type txThrottlerState struct {
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
healthCheckChan chan *discovery.TabletHealth
topologyWatchers []TopologyWatcherInterface
}

Expand Down Expand Up @@ -287,12 +289,14 @@ func (t *txThrottler) Throttle(priority int) (result bool) {

func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
// get cells from topo if none defined in tabletenv config
var cellsFromTopo bool
if len(config.healthCheckCells) == 0 {
var err error
if config.healthCheckCells, err = fetchKnownCells(topoServer); err != nil {
log.Errorf("txThrottler: failed to open throttler: %+v", err)
return nil, err
}
cellsFromTopo = true
}

maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}
Expand All @@ -315,40 +319,81 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
config: config,
throttler: t,
}
state.createTxThrottlerHealthCheck(topoServer, config, target.Cell)
state.initHealthCheckStream(topoServer, target)
hcStreamProcessor := state.healthCheckStreamProcessorFactory(topoServer, target, cellsFromTopo)
ctx, cancel := context.WithCancel(context.Background())
state.stopHealthCheck = cancel
go hcStreamProcessor(ctx)

return state, nil
}

state.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
state.topologyWatchers = append(
state.topologyWatchers,
func (ts *txThrottlerState) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) {
ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.config.healthCheckCells)
ts.healthCheckChan = ts.healthCheck.Subscribe()
ts.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(ts.config.healthCheckCells))
for _, cell := range ts.config.healthCheckCells {
ts.topologyWatchers = append(
ts.topologyWatchers,
topologyWatcherFactory(
topoServer,
state.healthCheck,
ts.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
discovery.DefaultTopoReadConcurrency),
)
}
return state, nil
}

func (ts *txThrottlerState) createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, cell string) {
ctx, cancel := context.WithCancel(context.Background())
ts.stopHealthCheck = cancel
ts.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells)
ch := ts.healthCheck.Subscribe()
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case th := <-ch:
ts.StatsUpdate(th)
func (ts *txThrottlerState) closeHealthCheckStream() {
if ts.healthCheck == nil {
return
}
for _, topoWatcher := range ts.topologyWatchers {
topoWatcher.Stop()
}
ts.stopHealthCheck()
ts.healthCheck.Close()
}

func (ts *txThrottlerState) healthCheckStreamProcessorFactory(topoServer *topo.Server, target *querypb.Target, cellsFromTopo bool) func(ctx context.Context) {
if cellsFromTopo {
return func(ctx context.Context) {
for {
select {
case <-time.After(time.Minute):
knownCells, err := fetchKnownCells(topoServer)
if err != nil {
log.Fatalf("txThrottler: failed to fetch cells from topo: %+v", err)
continue
}
if !reflect.DeepEqual(knownCells, ts.config.healthCheckCells) {
log.Info("txThrottler: restarting healthcheck stream due to update to topology cells")
ts.closeHealthCheckStream()
ts.initHealthCheckStream(topoServer, target)
}
case <-ctx.Done():
return
case th := <-ts.healthCheckChan:
ts.StatsUpdate(th)
}
}
}
}(ctx)
} else {
return func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case th := <-ts.healthCheckChan:
ts.StatsUpdate(th)
}
}
}
}
}

func (ts *txThrottlerState) throttle() bool {
Expand Down