Skip to content

Commit

Permalink
pulsar (ticdc): add partition key for pulsar meesage (#9677)
Browse files Browse the repository at this point in the history
ref #9413
  • Loading branch information
asddongmen authored Sep 6, 2023
1 parent e1730e5 commit 254cc2b
Show file tree
Hide file tree
Showing 20 changed files with 197 additions and 63 deletions.
2 changes: 1 addition & 1 deletion cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (m *feedStateManager) resetErrRetry() {
// isChangefeedStable check if there are states other than 'normal' in this sliding window.
func (m *feedStateManager) isChangefeedStable() bool {
for _, val := range m.stateHistory {
if val != model.StateNormal {
if val == model.StatePending {
return false
}
}
Expand Down
60 changes: 35 additions & 25 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
partitionDispatchRuleTS
partitionDispatchRuleTable
partitionDispatchRuleIndexValue
partitionDispatchRuleKey
)

func (r *partitionDispatchRule) fromString(rule string) {
Expand All @@ -62,9 +63,7 @@ func (r *partitionDispatchRule) fromString(rule string) {
case "index-value":
*r = partitionDispatchRuleIndexValue
default:
*r = partitionDispatchRuleDefault
log.Warn("the partition dispatch rule is not default/ts/table/index-value," +
" use the default rule instead.")
*r = partitionDispatchRuleKey
}
}

Expand Down Expand Up @@ -104,7 +103,7 @@ func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic, schema string) (*Ev
f = filter.CaseInsensitive(f)
}

d := getPartitionDispatcher(ruleConfig)
d := getPartitionDispatcher(ruleConfig, schema)
t, err := getTopicDispatcher(ruleConfig, defaultTopic,
util.GetOrZero(cfg.Sink.Protocol), schema)
if err != nil {
Expand Down Expand Up @@ -154,7 +153,7 @@ func (s *EventRouter) GetTopicForDDL(ddl *model.DDLEvent) string {
func (s *EventRouter) GetPartitionForRowChange(
row *model.RowChangedEvent,
partitionNum int32,
) int32 {
) (int32, string) {
_, partitionDispatcher := s.matchDispatcher(
row.Table.Schema, row.Table.Table,
)
Expand Down Expand Up @@ -225,7 +224,7 @@ func (s *EventRouter) matchDispatcher(
}

// getPartitionDispatcher returns the partition dispatcher for a specific partition rule.
func getPartitionDispatcher(ruleConfig *config.DispatchRule) partition.Dispatcher {
func getPartitionDispatcher(ruleConfig *config.DispatchRule, schema string) partition.Dispatcher {
var (
d partition.Dispatcher
rule partitionDispatchRule
Expand All @@ -240,6 +239,14 @@ func getPartitionDispatcher(ruleConfig *config.DispatchRule) partition.Dispatche
d = partition.NewTableDispatcher()
case partitionDispatchRuleDefault:
d = partition.NewDefaultDispatcher()
case partitionDispatchRuleKey:
if sink.IsPulsarScheme(schema) {
d = partition.NewKeyDispatcher(ruleConfig.PartitionRule)
} else {
log.Warn("the partition dispatch rule is not default/ts/table/index-value," +
" use the default rule instead.")
d = partition.NewDefaultDispatcher()
}
}

return d
Expand All @@ -256,29 +263,32 @@ func getTopicDispatcher(
// check if this rule is a valid topic expression
topicExpr := topic.Expression(ruleConfig.TopicRule)

// validate the topic expression for pulsar sink
if sink.IsPulsarScheme(schema) {
err := topicExpr.PulsarValidate()
if err != nil {
return nil, err
}
}

// validate the topic expression for kafka sink
var p config.Protocol
var err error
if protocol != "" {
p, err := config.ParseSinkProtocolFromString(protocol)
p, err = config.ParseSinkProtocolFromString(protocol)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

if schema == sink.PulsarScheme {
err = topicExpr.PulsarValidate()
if err != nil {
return nil, err
}
} else {
if p == config.ProtocolAvro {
err = topicExpr.ValidateForAvro()
if err != nil {
return nil, err
}
} else {
err = topicExpr.Validate()
if err != nil {
return nil, err
}
}
}
if p == config.ProtocolAvro {
err = topicExpr.ValidateForAvro()
if err != nil {
return nil, err
}
} else {
err = topicExpr.Validate()
if err != nil {
return nil, err
}
}
return topic.NewDynamicTopicDispatcher(topicExpr), nil
Expand Down
10 changes: 5 additions & 5 deletions cdc/sink/dmlsink/mq/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
}, "test", "kafka")
require.Nil(t, err)

p := d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _ := d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_default1", Table: "table"},
Columns: []*model.Column{
{
Expand All @@ -260,7 +260,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
IndexColumns: [][]int{{0}},
}, 16)
require.Equal(t, int32(14), p)
p = d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_default2", Table: "table"},
Columns: []*model.Column{
{
Expand All @@ -273,12 +273,12 @@ func TestGetPartitionForRowChange(t *testing.T) {
}, 16)
require.Equal(t, int32(0), p)

p = d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_table", Table: "table"},
CommitTs: 1,
}, 16)
require.Equal(t, int32(15), p)
p = d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "test_index_value", Table: "table"},
Columns: []*model.Column{
{
Expand All @@ -293,7 +293,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
},
}, 10)
require.Equal(t, int32(1), p)
p = d.GetPartitionForRowChange(&model.RowChangedEvent{
p, _ = d.GetPartitionForRowChange(&model.RowChangedEvent{
Table: &model.TableName{Schema: "a", Table: "table"},
CommitTs: 1,
}, 2)
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ func NewDefaultDispatcher() *DefaultDispatcher {

// DispatchRowChangedEvent returns the target partition to which
// a row changed event should be dispatched.
func (d *DefaultDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) int32 {
func (d *DefaultDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) {
return d.tbd.DispatchRowChangedEvent(row, partitionNum)
}
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func TestDefaultDispatcher(t *testing.T) {
IndexColumns: [][]int{{0}},
}

targetPartition := NewDefaultDispatcher().DispatchRowChangedEvent(row, 3)
targetPartition, _ := NewDefaultDispatcher().DispatchRowChangedEvent(row, 3)
require.Equal(t, int32(0), targetPartition)
}
5 changes: 3 additions & 2 deletions cdc/sink/dmlsink/mq/dispatcher/partition/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (

// Dispatcher is an abstraction for dispatching rows into different partitions
type Dispatcher interface {
// DispatchRowChangedEvent returns an index of partitions according to RowChangedEvent.
// DispatchRowChangedEvent returns an index of partitions or a partition key
// according to RowChangedEvent.
// Concurrency Note: This method is thread-safe.
DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) int32
DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string)
}
6 changes: 4 additions & 2 deletions cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package partition

import (
"strconv"
"sync"

"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -35,7 +36,7 @@ func NewIndexValueDispatcher() *IndexValueDispatcher {

// DispatchRowChangedEvent returns the target partition to which
// a row changed event should be dispatched.
func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) int32 {
func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) {
r.lock.Lock()
defer r.lock.Unlock()
r.hasher.Reset()
Expand All @@ -56,5 +57,6 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven
r.hasher.Write([]byte(col.Name), []byte(model.ColumnValueString(col.Value)))
}
}
return int32(r.hasher.Sum32() % uint32(partitionNum))
sum32 := r.hasher.Sum32()
return int32(sum32 % uint32(partitionNum)), strconv.FormatInt(int64(sum32), 10)
}
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/index_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestIndexValueDispatcher(t *testing.T) {
}
p := NewIndexValueDispatcher()
for _, tc := range testCases {
require.Equal(t, tc.expectPartition, p.DispatchRowChangedEvent(tc.row, 16))
index, _ := p.DispatchRowChangedEvent(tc.row, 16)
require.Equal(t, tc.expectPartition, index)
}
}
37 changes: 37 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/partition/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package partition

import (
"github.com/pingcap/tiflow/cdc/model"
)

// KeyDispatcher is a partition dispatcher which dispatches events
// using the provided partition key.
type KeyDispatcher struct {
partitionKey string
}

// NewKeyDispatcher creates a TableDispatcher.
func NewKeyDispatcher(partitionKey string) *KeyDispatcher {
return &KeyDispatcher{
partitionKey: partitionKey,
}
}

// DispatchRowChangedEvent returns the target partition to which
// a row changed event should be dispatched.
func (t *KeyDispatcher) DispatchRowChangedEvent(*model.RowChangedEvent, int32) (int32, string) {
return 0, t.partitionKey
}
51 changes: 51 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/partition/key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package partition

import (
"testing"

"github.com/pingcap/tiflow/cdc/model"
)

func TestKeyDispatcher_DispatchRowChangedEvent(t *testing.T) {
tests := []struct {
name string
partitionKey string
rowChangedEvt *model.RowChangedEvent
wantPartition int32
wantKey string
}{
{
name: "dispatch to partition 0 with partition key 'foo'",
partitionKey: "foo",
rowChangedEvt: &model.RowChangedEvent{},
wantPartition: 0,
wantKey: "foo",
},
// Add more test cases here
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := NewKeyDispatcher(tt.partitionKey)
gotPartition, gotKey := d.DispatchRowChangedEvent(tt.rowChangedEvt, 0)
if gotPartition != tt.wantPartition {
t.Errorf("DispatchRowChangedEvent() gotPartition = %v, want %v", gotPartition, tt.wantPartition)
}
if gotKey != tt.wantKey {
t.Errorf("DispatchRowChangedEvent() gotKey = %v, want %v", gotKey, tt.wantKey)
}
})
}
}
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/mq/dispatcher/partition/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func NewTableDispatcher() *TableDispatcher {

// DispatchRowChangedEvent returns the target partition to which
// a row changed event should be dispatched.
func (t *TableDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) int32 {
func (t *TableDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) {
t.lock.Lock()
defer t.lock.Unlock()
t.hasher.Reset()
// distribute partition by table
t.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table))
return int32(t.hasher.Sum32() % uint32(partitionNum))
return int32(t.hasher.Sum32() % uint32(partitionNum)), row.Table.String()
}
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestTableDispatcher(t *testing.T) {
}
p := NewTableDispatcher()
for _, tc := range testCases {
require.Equal(t, tc.expectPartition, p.DispatchRowChangedEvent(tc.row, 16))
index, _ := p.DispatchRowChangedEvent(tc.row, 16)
require.Equal(t, tc.expectPartition, index)
}
}
10 changes: 7 additions & 3 deletions cdc/sink/dmlsink/mq/dispatcher/partition/ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package partition

import "github.com/pingcap/tiflow/cdc/model"
import (
"fmt"

"github.com/pingcap/tiflow/cdc/model"
)

// TsDispatcher is a partition dispatcher which dispatch events based on ts.
type TsDispatcher struct{}
Expand All @@ -25,6 +29,6 @@ func NewTsDispatcher() *TsDispatcher {

// DispatchRowChangedEvent returns the target partition to which
// a row changed event should be dispatched.
func (t *TsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) int32 {
return int32(row.CommitTs % uint64(partitionNum))
func (t *TsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string) {
return int32(row.CommitTs % uint64(partitionNum)), fmt.Sprintf("%d", row.CommitTs)
}
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func TestTsDispatcher(t *testing.T) {
}
p := &TsDispatcher{}
for _, tc := range testCases {
require.Equal(t, tc.expectPartition, p.DispatchRowChangedEvent(tc.row, 16))
index, _ := p.DispatchRowChangedEvent(tc.row, 16)
require.Equal(t, tc.expectPartition, index)
}
}
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
if err != nil {
return errors.Trace(err)
}
partition := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum)
index, key := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum)
// This never be blocked because this is an unbounded channel.
s.alive.worker.msgChan.In() <- mqEvent{
key: TopicPartitionKey{
Topic: topic, Partition: partition,
Topic: topic, Partition: index, PartitionKey: key,
},
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: row,
Expand Down
Loading

0 comments on commit 254cc2b

Please sign in to comment.