Skip to content

Commit

Permalink
log_backup: Fix owner transfer panic (#47537) (#47557)
Browse files Browse the repository at this point in the history
close #47533
  • Loading branch information
ti-chi-bot authored Nov 24, 2023
1 parent 5a6b36c commit 40512ea
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 10 deletions.
5 changes: 4 additions & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (c *CheckpointAdvancer) stopSubscriber() {
c.subscriber = nil
}

func (c *CheckpointAdvancer) spawnSubscriptionHandler(ctx context.Context) {
func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()
c.subscriber = NewSubscriber(c.env, c.env, WithMasterContext(ctx))
Expand All @@ -466,9 +466,12 @@ func (c *CheckpointAdvancer) spawnSubscriptionHandler(ctx context.Context) {
}

func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()
if c.subscriber == nil {
return nil
}
failpoint.Inject("get_subscriber", nil)
if err := c.subscriber.UpdateStoreTopology(ctx); err != nil {
log.Warn("[log backup advancer] Error when updating store topology.", logutil.ShortError(err))
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/streamhelper/advancer_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func (c *CheckpointAdvancer) OnStart(ctx context.Context) {
// OnBecomeOwner implements daemon.Interface. If the tidb-server become owner, this function will be called.
func (c *CheckpointAdvancer) OnBecomeOwner(ctx context.Context) {
metrics.AdvancerOwner.Set(1.0)
c.spawnSubscriptionHandler(ctx)
c.SpawnSubscriptionHandler(ctx)
go func() {
<-ctx.Done()
c.onStop()
c.OnStop()
}()
}

Expand All @@ -46,7 +46,7 @@ func (c *CheckpointAdvancer) Name() string {
return "LogBackup::Advancer"
}

func (c *CheckpointAdvancer) onStop() {
func (c *CheckpointAdvancer) OnStop() {
metrics.AdvancerOwner.Set(0.0)
c.stopSubscriber()
}
Expand Down
35 changes: 35 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,38 @@ func TestResolveLock(t *testing.T) {
require.Len(t, r.FailureSubRanges, 0)
require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint)
}

func TestOwnerDropped(t *testing.T) {
ctx := context.Background()
c := createFakeCluster(t, 4, false)
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
installSubscribeSupport(c)
env := &testEnv{testCtx: t, fakeCluster: c}
fp := "github.com/pingcap/tidb/br/pkg/streamhelper/get_subscriber"
defer func() {
if t.Failed() {
fmt.Println(c)
}
}()

adv := streamhelper.NewCheckpointAdvancer(env)
adv.OnStart(ctx)
adv.SpawnSubscriptionHandler(ctx)
require.NoError(t, adv.OnTick(ctx))
failpoint.Enable(fp, "pause")
ch := make(chan struct{})
go func() {
defer close(ch)
require.NoError(t, adv.OnTick(ctx))
}()
adv.OnStop()
failpoint.Disable(fp)

cp := c.advanceCheckpoints()
c.flushAll()
<-ch
adv.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
// Advancer will manually poll the checkpoint...
require.Equal(t, vsf.MinValue(), cp)
})
}
17 changes: 11 additions & 6 deletions br/pkg/streamhelper/flush_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,10 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e
cancel()
return errors.Annotate(err, "failed to subscribe events")
}
lcx := logutil.ContextWithField(cx, zap.Uint64("store-id", s.storeID),
zap.String("category", "log backup flush subscriber"))
s.cancel = cancel
s.background = spawnJoinable(func() { s.listenOver(cli) })
s.background = spawnJoinable(func() { s.listenOver(lcx, cli) })
return nil
}

Expand All @@ -242,15 +244,16 @@ func (s *subscription) close() {
// because it is a ever-sharing channel.
}

func (s *subscription) listenOver(cli eventStream) {
func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
storeID := s.storeID
log.Info("[log backup flush subscriber] Listen starting.", zap.Uint64("store", storeID))
logutil.CL(ctx).Info("Listen starting.", zap.Uint64("store", storeID))
for {
// Shall we use RecvMsg for better performance?
// Note that the spans.Full requires the input slice be immutable.
msg, err := cli.Recv()
if err != nil {
log.Info("[log backup flush subscriber] Listen stopped.", zap.Uint64("store", storeID), logutil.ShortError(err))
logutil.CL(ctx).Info("Listen stopped.",
zap.Uint64("store", storeID), logutil.ShortError(err))
if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled {
return
}
Expand All @@ -261,12 +264,14 @@ func (s *subscription) listenOver(cli eventStream) {
for _, m := range msg.Events {
start, err := decodeKey(m.StartKey)
if err != nil {
log.Warn("start key not encoded, skipping", logutil.Key("event", m.StartKey), logutil.ShortError(err))
logutil.CL(ctx).Warn("start key not encoded, skipping",
logutil.Key("event", m.StartKey), logutil.ShortError(err))
continue
}
end, err := decodeKey(m.EndKey)
if err != nil {
log.Warn("end key not encoded, skipping", logutil.Key("event", m.EndKey), logutil.ShortError(err))
logutil.CL(ctx).Warn("end key not encoded, skipping",
logutil.Key("event", m.EndKey), logutil.ShortError(err))
continue
}
s.output <- spans.Valued{
Expand Down

0 comments on commit 40512ea

Please sign in to comment.