Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): fix resolved moves too fast when part of tables are not maintained redo writer #5587

Merged
merged 7 commits into from
May 27, 2022
7 changes: 5 additions & 2 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,11 @@ func (m *ManagerImpl) updateTableResolvedTs(ctx context.Context) error {
return err
}
minResolvedTs := uint64(math.MaxUint64)
for tableID, rts := range rtsMap {
m.rtsMap[tableID] = rts
for tableID := range m.rtsMap {
if rts, ok := rtsMap[tableID]; ok {
Copy link
Contributor

@nongfushanquan nongfushanquan May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why part of tables are not maintained in redo log writer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no data of this table, and flush resolved ts of this table has not been executed, the redo log writer doesn't know this table.
The redo log writer only records the information of the table that it has met.

m.rtsMap[tableID] = rts
}
rts := m.rtsMap[tableID]
if rts < minResolvedTs {
minResolvedTs = rts
}
Expand Down
75 changes: 75 additions & 0 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package redo

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -173,6 +174,80 @@ func TestLogManagerInProcessor(t *testing.T) {
require.Nil(t, err)
}

// TestUpdateResolvedTsWithDelayedTable tests redo manager doesn't move resolved
// ts forward if one or more tables resolved ts are not returned from underlying
// writer, this secenario happens when there is no data or resolved ts of this
// table sent to redo log writer yet.
func TestUpdateResolvedTsWithDelayedTable(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
cfg := &config.ConsistentConfig{
Level: string(ConsistentLevelEventual),
Storage: "blackhole://",
}
errCh := make(chan error, 1)
opts := &ManagerOptions{
EnableBgRunner: true,
ErrCh: errCh,
}
logMgr, err := NewManager(ctx, cfg, opts)
require.Nil(t, err)

var (
table53 = int64(53)
table55 = int64(55)
table57 = int64(57)

startTs = uint64(100)
table53Ts = uint64(125)
table55Ts = uint64(120)
table57Ts = uint64(110)
)
tables := []model.TableID{table53, table55, table57}
for _, tableID := range tables {
logMgr.AddTable(tableID, startTs)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
logMgr.bgWriteLog(ctx, errCh)
}()

// table 53 has new data, resolved-ts moves forward to 125
rows := []*model.RowChangedEvent{
{CommitTs: table53Ts, Table: &model.TableName{TableID: table53}},
{CommitTs: table53Ts, Table: &model.TableName{TableID: table53}},
}
err = logMgr.EmitRowChangedEvents(ctx, table53, rows...)
require.Nil(t, err)
require.Eventually(t, func() bool {
tsMap, err := logMgr.writer.GetCurrentResolvedTs(ctx, []int64{table53})
require.Nil(t, err)
ts, ok := tsMap[table53]
return ok && ts == table53Ts
}, time.Second, time.Millisecond*10)

// table 55 has no data, but receives resolved-ts event and moves forward to 120
err = logMgr.FlushLog(ctx, table55, table55Ts)
require.Nil(t, err)

// get min resolved ts should take each table into consideration
err = logMgr.updateTableResolvedTs(ctx)
require.Nil(t, err)
require.Equal(t, startTs, logMgr.GetMinResolvedTs())

// table 57 moves forward, update table resolved ts and check again
logMgr.FlushLog(ctx, table57, table57Ts)
err = logMgr.updateTableResolvedTs(ctx)
require.Nil(t, err)
require.Equal(t, table57Ts, logMgr.GetMinResolvedTs())

cancel()
wg.Wait()
}

// TestLogManagerInOwner tests how redo log manager is used in owner,
// where the redo log manager needs to handle DDL event only.
func TestLogManagerInOwner(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion cdc/redo/writer/blackhole_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func (bs *blackHoleWriter) GetCurrentResolvedTs(_ context.Context, tableIDs []in
defer bs.tableRtsMu.RUnlock()
rtsMap := make(map[int64]uint64, len(bs.tableRtsMap))
for _, tableID := range tableIDs {
rtsMap[tableID] = bs.tableRtsMap[tableID]
if rts, ok := bs.tableRtsMap[tableID]; ok {
rtsMap[tableID] = rts
}
}
return rtsMap, nil
}
Expand Down