Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv(ticdc): fix data loss when upstream txn conflicts during scan (#5477) #5583

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
if err != nil {
return nil, err
}
if len(raw.OldValue) == 0 && len(raw.Value) == 0 {
log.Warn("empty value and old value", zap.Any("row", raw))
}
baseInfo := baseKVEntry{
StartTs: raw.StartTs,
CRTs: raw.CRTs,
Expand Down
9 changes: 7 additions & 2 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,7 @@ func (s *eventFeedSession) singleEventFeed(
metricPullEventInitializedCounter.Inc()
initialized = true
s.regionRouter.Release(storeAddr)
cachedEvents := matcher.matchCachedRow()
cachedEvents := matcher.matchCachedRow(initialized)
for _, cachedEvent := range cachedEvents {
revent, err = assembleRowEvent(regionID, cachedEvent, s.enableOldValue)
if err != nil {
Expand All @@ -1504,6 +1504,7 @@ func (s *eventFeedSession) singleEventFeed(
return
}
}
matcher.matchCachedRollbackRow(initialized)
case cdcpb.Event_COMMITTED:
metricPullEventCommittedCounter.Inc()
revent, err = assembleRowEvent(regionID, entry, s.enableOldValue)
Expand Down Expand Up @@ -1541,7 +1542,7 @@ func (s *eventFeedSession) singleEventFeed(
err = errUnreachable
return
}
ok := matcher.matchRow(entry)
ok := matcher.matchRow(entry, initialized)
if !ok {
if !initialized {
matcher.cacheCommitRow(entry)
Expand All @@ -1565,6 +1566,10 @@ func (s *eventFeedSession) singleEventFeed(
}
case cdcpb.Event_ROLLBACK:
metricPullEventRollbackCounter.Inc()
if !initialized {
matcher.cacheRollbackRow(entry)
continue
}
matcher.rollbackRow(entry)
}
}
Expand Down
33 changes: 30 additions & 3 deletions cdc/kv/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type matcher struct {
// TODO : clear the single prewrite
unmatchedValue map[matchKey]*cdcpb.Event_Row
cachedCommit []*cdcpb.Event_Row
cachedRollback []*cdcpb.Event_Row
}

type matchKey struct {
Expand Down Expand Up @@ -58,8 +59,15 @@ func (m *matcher) putPrewriteRow(row *cdcpb.Event_Row) {

// matchRow matches the commit event with the cached prewrite event
// the Value and OldValue will be assigned if a matched prewrite event exists.
func (m *matcher) matchRow(row *cdcpb.Event_Row) bool {
func (m *matcher) matchRow(row *cdcpb.Event_Row, initialized bool) bool {
if value, exist := m.unmatchedValue[newMatchKey(row)]; exist {
// TiKV may send a fake prewrite event with empty value caused by txn heartbeat.
//
// We need to skip match if the region is not initialized,
// as prewrite events may be sent out of order.
if !initialized && len(value.GetValue()) == 0 {
return false
}
row.Value = value.GetValue()
row.OldValue = value.GetOldValue()
delete(m.unmatchedValue, newMatchKey(row))
Expand All @@ -72,13 +80,16 @@ func (m *matcher) cacheCommitRow(row *cdcpb.Event_Row) {
m.cachedCommit = append(m.cachedCommit, row)
}

func (m *matcher) matchCachedRow() []*cdcpb.Event_Row {
func (m *matcher) matchCachedRow(initialized bool) []*cdcpb.Event_Row {
if !initialized {
log.Panic("must be initialized before match cahced rows")
}
cachedCommit := m.cachedCommit
m.cachedCommit = nil
top := 0
for i := 0; i < len(cachedCommit); i++ {
cacheEntry := cachedCommit[i]
ok := m.matchRow(cacheEntry)
ok := m.matchRow(cacheEntry, true)
if !ok {
// when cdc receives a commit log without a corresponding
// prewrite log before initialized, a committed log with
Expand All @@ -97,3 +108,19 @@ func (m *matcher) matchCachedRow() []*cdcpb.Event_Row {
func (m *matcher) rollbackRow(row *cdcpb.Event_Row) {
delete(m.unmatchedValue, newMatchKey(row))
}

func (m *matcher) cacheRollbackRow(row *cdcpb.Event_Row) {
m.cachedRollback = append(m.cachedRollback, row)
}

func (m *matcher) matchCachedRollbackRow(initialized bool) {
if !initialized {
log.Panic("must be initialized before match cahced rollback rows")
}
rollback := m.cachedRollback
m.cachedRollback = nil
for i := 0; i < len(rollback); i++ {
cacheEntry := rollback[i]
m.rollbackRow(cacheEntry)
}
}
135 changes: 129 additions & 6 deletions cdc/kv/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package kv

import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type matcherSuite struct{}
Expand Down Expand Up @@ -47,7 +50,7 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
StartTs: 1,
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
ok := matcher.matchRow(commitRow1, true)
c.Assert(ok, check.IsFalse)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
StartTs: 1,
Expand All @@ -60,7 +63,7 @@ func (s *matcherSuite) TestMatchRow(c *check.C) {
CommitTs: 3,
Key: []byte("k1"),
}
ok = matcher.matchRow(commitRow2)
ok = matcher.matchRow(commitRow2, true)
c.Assert(ok, check.IsTrue)
c.Assert(commitRow2, check.DeepEquals, &cdcpb.Event_Row{
StartTs: 2,
Expand Down Expand Up @@ -92,7 +95,7 @@ func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
CommitTs: 2,
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1)
ok := matcher.matchRow(commitRow1, true)
c.Assert(commitRow1, check.DeepEquals, &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Expand All @@ -106,7 +109,7 @@ func (s *matcherSuite) TestMatchFakePrewrite(c *check.C) {
func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
defer testleak.AfterTest(c)()
matcher := newMatcher()
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
c.Assert(len(matcher.matchCachedRow(true)), check.Equals, 0)
matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Expand All @@ -122,7 +125,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
CommitTs: 5,
Key: []byte("k3"),
})
c.Assert(len(matcher.matchCachedRow()), check.Equals, 0)
c.Assert(len(matcher.matchCachedRow(true)), check.Equals, 0)

matcher.cacheCommitRow(&cdcpb.Event_Row{
StartTs: 1,
Expand Down Expand Up @@ -159,7 +162,7 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
OldValue: []byte("ov3"),
})

c.Assert(matcher.matchCachedRow(), check.DeepEquals, []*cdcpb.Event_Row{{
c.Assert(matcher.matchCachedRow(true), check.DeepEquals, []*cdcpb.Event_Row{{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Expand All @@ -173,3 +176,123 @@ func (s *matcherSuite) TestMatchMatchCachedRow(c *check.C) {
OldValue: []byte("ov2"),
}})
}

func TestMatchRowUninitialized(t *testing.T) {
t.Parallel()
matcher := newMatcher()

// fake prewrite before init.
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
OldValue: []byte("v4"),
})
commitRow1 := &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
}
ok := matcher.matchRow(commitRow1, false)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
}, commitRow1)
require.False(t, ok)
matcher.cacheCommitRow(commitRow1)

// actual prewrite before init.
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
Value: []byte("v3"),
OldValue: []byte("v4"),
})

// normal prewrite and commit before init.
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 2,
Key: []byte("k2"),
Value: []byte("v3"),
OldValue: []byte("v4"),
})
commitRow2 := &cdcpb.Event_Row{
StartTs: 2,
CommitTs: 3,
Key: []byte("k2"),
}
ok = matcher.matchRow(commitRow2, false)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 2,
CommitTs: 3,
Key: []byte("k2"),
Value: []byte("v3"),
OldValue: []byte("v4"),
}, commitRow2)
require.True(t, ok)

// match cached row after init.
rows := matcher.matchCachedRow(true)
require.Len(t, rows, 1)
require.Equal(t, &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Value: []byte("v3"),
OldValue: []byte("v4"),
}, rows[0])
}

func TestMatchMatchCachedRollbackRow(t *testing.T) {
t.Parallel()
matcher := newMatcher()
matcher.matchCachedRollbackRow(true)
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
})
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 3,
Key: []byte("k2"),
})
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 4,
Key: []byte("k3"),
})
matcher.matchCachedRollbackRow(true)

matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
})
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 3,
Key: []byte("k2"),
})
matcher.cacheRollbackRow(&cdcpb.Event_Row{
StartTs: 4,
Key: []byte("k3"),
})

matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 1,
Key: []byte("k1"),
Value: []byte("v1"),
OldValue: []byte("ov1"),
})
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 3,
Key: []byte("k2"),
Value: []byte("v2"),
OldValue: []byte("ov2"),
})
matcher.putPrewriteRow(&cdcpb.Event_Row{
StartTs: 4,
Key: []byte("k3"),
Value: []byte("v3"),
OldValue: []byte("ov3"),
})

matcher.matchCachedRollbackRow(true)
require.Empty(t, matcher.unmatchedValue)
}
9 changes: 7 additions & 2 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func (w *regionWorker) handleEventEntry(

state.initialized = true
w.session.regionRouter.Release(state.sri.rpcCtx.Addr)
cachedEvents := state.matcher.matchCachedRow()
cachedEvents := state.matcher.matchCachedRow(state.initialized)
for _, cachedEvent := range cachedEvents {
revent, err := assembleRowEvent(regionID, cachedEvent, w.enableOldValue)
if err != nil {
Expand All @@ -664,6 +664,7 @@ func (w *regionWorker) handleEventEntry(
return errors.Trace(ctx.Err())
}
}
state.matcher.matchCachedRollbackRow(state.initialized)
case cdcpb.Event_COMMITTED:
w.metrics.metricPullEventCommittedCounter.Inc()
revent, err := assembleRowEvent(regionID, entry, w.enableOldValue)
Expand Down Expand Up @@ -698,7 +699,7 @@ func (w *regionWorker) handleEventEntry(
zap.Uint64("regionID", regionID))
return errUnreachable
}
ok := state.matcher.matchRow(entry)
ok := state.matcher.matchRow(entry, state.initialized)
if !ok {
if !state.initialized {
state.matcher.cacheCommitRow(entry)
Expand All @@ -720,6 +721,10 @@ func (w *regionWorker) handleEventEntry(
}
case cdcpb.Event_ROLLBACK:
w.metrics.metricPullEventRollbackCounter.Inc()
if !state.initialized {
state.matcher.cacheRollbackRow(entry)
continue
}
state.matcher.rollbackRow(entry)
}
}
Expand Down
Loading