Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: fix atomicity of transactions is broken (#1371) #1375

Merged
merged 2 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 57 additions & 24 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,73 @@ import (
"go.uber.org/zap"
)

type txnsWithTheSameCommitTs struct {
txns map[model.Ts]*model.SingleTableTxn
commitTs model.Ts
}

func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) {
if row.CommitTs != t.commitTs {
log.Panic("unexpected row change event",
zap.Uint64("commitTs of txn", t.commitTs),
zap.Any("row", row))
}
if t.txns == nil {
t.txns = make(map[model.Ts]*model.SingleTableTxn)
}
txn, exist := t.txns[row.StartTs]
if !exist {
txn = &model.SingleTableTxn{
StartTs: row.StartTs,
CommitTs: row.CommitTs,
Table: row.Table,
}
t.txns[row.StartTs] = txn
}
txn.Append(row)
}

// UnresolvedTxnCache caches unresolved txns
type UnresolvedTxnCache struct {
unresolvedTxnsMu sync.Mutex
unresolvedTxns map[model.TableID][]*model.SingleTableTxn
unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs
checkpointTs uint64
}

// NewUnresolvedTxnCache returns a new UnresolvedTxnCache
func NewUnresolvedTxnCache() *UnresolvedTxnCache {
return &UnresolvedTxnCache{
unresolvedTxns: make(map[model.TableID][]*model.SingleTableTxn),
unresolvedTxns: make(map[model.TableID][]*txnsWithTheSameCommitTs),
}
}

// Append adds unresolved rows to cache
// the rows inputed into this function will go through the following handling logic
// 1. group by tableID from one input stream
// 2. for each tableID stream, the callers of this function should **make sure** that the CommitTs of rows is **strictly increasing**
// 3. group by CommitTs, according to CommitTs cut the rows into many group of rows in the same CommitTs
// 4. group by StartTs, cause the StartTs is the unique identifier of the transaction, according to StartTs cut the rows into many txns
func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowChangedEvent) int {
c.unresolvedTxnsMu.Lock()
defer c.unresolvedTxnsMu.Unlock()
appendRows := 0
for _, row := range rows {
if filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
if filter != nil && filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs))
continue
}
txns := c.unresolvedTxns[row.Table.TableID]
if len(txns) == 0 || txns[len(txns)-1].StartTs != row.StartTs {
if len(txns) == 0 || txns[len(txns)-1].commitTs != row.CommitTs {
// fail-fast check
if len(txns) != 0 && txns[len(txns)-1].CommitTs > row.CommitTs {
if len(txns) != 0 && txns[len(txns)-1].commitTs > row.CommitTs {
log.Panic("the commitTs of the emit row is less than the received row",
zap.Stringer("table", row.Table),
zap.Uint64("emit row startTs", row.StartTs),
zap.Uint64("emit row commitTs", row.CommitTs),
zap.Uint64("last received row startTs", txns[len(txns)-1].StartTs),
zap.Uint64("last received row commitTs", txns[len(txns)-1].CommitTs))
zap.Uint64("last received row commitTs", txns[len(txns)-1].commitTs))
}
txns = append(txns, &model.SingleTableTxn{
StartTs: row.StartTs,
CommitTs: row.CommitTs,
Table: row.Table,
txns = append(txns, &txnsWithTheSameCommitTs{
commitTs: row.CommitTs,
})
c.unresolvedTxns[row.Table.TableID] = txns
}
Expand All @@ -73,6 +101,7 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha
}

// Resolved returns resolved txns according to resolvedTs
// The returned map contains many txns grouped by tableID. for each table, the each commitTs of txn in txns slice is strictly increasing
func (c *UnresolvedTxnCache) Resolved(resolvedTs uint64) map[model.TableID][]*model.SingleTableTxn {
if resolvedTs <= atomic.LoadUint64(&c.checkpointTs) {
return nil
Expand All @@ -88,40 +117,44 @@ func (c *UnresolvedTxnCache) Resolved(resolvedTs uint64) map[model.TableID][]*mo
return resolvedTxnsMap
}

// Unresolved returns unresolved txns
func (c *UnresolvedTxnCache) Unresolved() map[model.TableID][]*model.SingleTableTxn {
return c.unresolvedTxns
}

// UpdateCheckpoint updates the checkpoint ts
func (c *UnresolvedTxnCache) UpdateCheckpoint(checkpointTs uint64) {
atomic.StoreUint64(&c.checkpointTs, checkpointTs)
}

func splitResolvedTxn(
resolvedTs uint64, unresolvedTxns map[model.TableID][]*model.SingleTableTxn,
resolvedTs uint64, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (minTs uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns))
minTs = resolvedTs
for tableID, txns := range unresolvedTxns {
i := sort.Search(len(txns), func(i int) bool {
return txns[i].CommitTs > resolvedTs
return txns[i].commitTs > resolvedTs
})
if i == 0 {
continue
}
var resolvedTxns []*model.SingleTableTxn
var resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs
if i == len(txns) {
resolvedTxns = txns
resolvedTxnsWithTheSameCommitTs = txns
delete(unresolvedTxns, tableID)
} else {
resolvedTxns = txns[:i]
resolvedTxnsWithTheSameCommitTs = txns[:i]
unresolvedTxns[tableID] = txns[i:]
}
var txnsLength int
for _, txns := range resolvedTxnsWithTheSameCommitTs {
txnsLength += len(txns.txns)
}
resolvedTxns := make([]*model.SingleTableTxn, 0, txnsLength)
for _, txns := range resolvedTxnsWithTheSameCommitTs {
for _, txn := range txns.txns {
resolvedTxns = append(resolvedTxns, txn)
}
}
resolvedRowsMap[tableID] = resolvedTxns

if len(resolvedTxns) > 0 && resolvedTxns[0].CommitTs < minTs {
minTs = resolvedTxns[0].CommitTs
if len(resolvedTxnsWithTheSameCommitTs) > 0 && resolvedTxnsWithTheSameCommitTs[0].commitTs < minTs {
minTs = resolvedTxnsWithTheSameCommitTs[0].commitTs
}
}
return
Expand Down
167 changes: 115 additions & 52 deletions cdc/sink/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package common

import (
"sort"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand All @@ -29,70 +31,131 @@ var _ = check.Suite(&SinkCommonSuite{})

func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
defer testleak.AfterTest(c)()
testCases := []struct {
unresolvedTxns map[model.TableID][]*model.SingleTableTxn
resolvedTs uint64
expectedResolvedTxns map[model.TableID][]*model.SingleTableTxn
expectedUnresolvedTxns map[model.TableID][]*model.SingleTableTxn
expectedMinTs uint64
}{{
unresolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}},
2: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}},
testCases := [][]struct {
input []*model.RowChangedEvent
resolvedTs model.Ts
expected map[model.TableID][]*model.SingleTableTxn
}{{{ // Testing basic transaction collocation, no txns with the same committs
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}},
{StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}},
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}},
},
resolvedTs: 5,
expectedResolvedTxns: map[model.TableID][]*model.SingleTableTxn{},
expectedUnresolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}},
2: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}},
resolvedTs: 6,
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
}}},
2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 6, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}},
}}},
},
expectedMinTs: 5,
}, {
unresolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}},
2: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}},
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}},
},
resolvedTs: 23,
expectedResolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}},
2: {{CommitTs: 23}},
resolvedTs: 13,
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
}}, {Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 11, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}},
}}},
2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 12, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}},
}}},
3: {{Table: &model.TableName{TableID: 3}, StartTs: 1, CommitTs: 7, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}},
}}, {Table: &model.TableName{TableID: 3}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}},
}}},
},
expectedUnresolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 33}, {CommitTs: 34}},
2: {{CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}},
},
expectedMinTs: 11,
}}, {{ // Testing the short circuit path
input: []*model.RowChangedEvent{},
resolvedTs: 6,
expected: nil,
}, {
unresolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}},
2: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}},
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 13, Table: &model.TableName{TableID: 2}},
},
resolvedTs: 30,
expectedResolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}},
2: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}},
resolvedTs: 6,
expected: map[model.TableID][]*model.SingleTableTxn{},
}}, {{ // Testing the txns with the same commitTs
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}},
{StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}},
{StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}},
{StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}},
},
expectedUnresolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 33}, {CommitTs: 34}},
resolvedTs: 6,
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
}}},
2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 6, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}},
{StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}},
}}, {Table: &model.TableName{TableID: 2}, StartTs: 2, CommitTs: 6, Rows: []*model.RowChangedEvent{
{StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}},
}}},
},
expectedMinTs: 11,
}, {
unresolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}},
2: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}},
input: []*model.RowChangedEvent{
{StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}},
{StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}},
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}},
},
resolvedTs: 40,
expectedResolvedTxns: map[model.TableID][]*model.SingleTableTxn{
1: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}},
2: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}},
resolvedTs: 13,
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
}}, {Table: &model.TableName{TableID: 1}, StartTs: 2, CommitTs: 8, Rows: []*model.RowChangedEvent{
{StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}},
}}, {Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 9, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}},
}}},
2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 7, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}},
{StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}},
}}, {Table: &model.TableName{TableID: 2}, StartTs: 2, CommitTs: 7, Rows: []*model.RowChangedEvent{
{StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}},
}}},
},
expectedUnresolvedTxns: map[model.TableID][]*model.SingleTableTxn{},
expectedMinTs: 11,
}}
}}}
for _, tc := range testCases {
minTs, resolvedTxns := splitResolvedTxn(tc.resolvedTs, tc.unresolvedTxns)
c.Assert(minTs, check.Equals, tc.expectedMinTs)
c.Assert(resolvedTxns, check.DeepEquals, tc.expectedResolvedTxns)
c.Assert(tc.unresolvedTxns, check.DeepEquals, tc.expectedUnresolvedTxns)
cache := NewUnresolvedTxnCache()
for _, t := range tc {
cache.Append(nil, t.input...)
resolved := cache.Resolved(t.resolvedTs)
for tableID, txns := range resolved {
sort.Slice(txns, func(i, j int) bool {
if txns[i].CommitTs != txns[j].CommitTs {
return txns[i].CommitTs < txns[j].CommitTs
}
return txns[i].StartTs < txns[j].StartTs
})
resolved[tableID] = txns
}
c.Assert(resolved, check.DeepEquals, t.expected,
check.Commentf("%s", cmp.Diff(resolved, t.expected)))
}
}
}
Loading