Skip to content

Commit

Permalink
kafka(ticdc): event router allow hard code topics and set the schema …
Browse files Browse the repository at this point in the history
…optional in the topic expression (#9755)

close #9763
  • Loading branch information
3AceShowHand authored Sep 22, 2023
1 parent f5ecaf9 commit ef7a972
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 28 deletions.
34 changes: 19 additions & 15 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,25 +207,29 @@ func getTopicDispatcher(
return topic.NewStaticTopicDispatcher(defaultTopic), nil
}

if topic.IsHardCode(rule) {
return topic.NewStaticTopicDispatcher(rule), nil
}

// check if this rule is a valid topic expression
topicExpr := topic.Expression(rule)

var err error
// validate the topic expression for pulsar sink
if sink.IsPulsarScheme(schema) {
err = topicExpr.PulsarValidate()
} else {
// validate the topic expression for kafka sink
switch protocol {
case config.ProtocolAvro:
err = topicExpr.ValidateForAvro()
default:
err = topicExpr.Validate()
}
}
err := validateTopicExpression(topicExpr, schema, protocol)
if err != nil {
return nil, err
}

return topic.NewDynamicTopicDispatcher(topicExpr), nil
}

func validateTopicExpression(expr topic.Expression, scheme string, protocol config.Protocol) error {
if sink.IsPulsarScheme(scheme) {
return expr.PulsarValidate()
}

switch protocol {
case config.ProtocolAvro:
return expr.ValidateForAvro()
default:
}

return expr.Validate()
}
12 changes: 12 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func newReplicaConfig4DispatcherTest() *config.ReplicaConfig {
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
// rule-6: hard code the topic
{
Matcher: []string{"hard_code_schema.*"},
PartitionRule: "default",
TopicRule: "hard_code_topic",
},
},
},
}
Expand All @@ -75,6 +81,7 @@ func TestEventRouter(t *testing.T) {
d, err := NewEventRouter(replicaConfig, config.ProtocolCanalJSON, "test", sink.KafkaScheme)
require.NoError(t, err)
require.Equal(t, "test", d.GetDefaultTopic())

topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)
Expand Down Expand Up @@ -120,6 +127,11 @@ func TestEventRouter(t *testing.T) {
topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "table2")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher)

// match rule-6
topicDispatcher, partitionDispatcher = d.matchDispatcher("hard_code_schema", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)
}

func TestGetActiveTopics(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@ type Dispatcher interface {
Substitute(schema, table string) string
}

// StaticTopicDispatcher is a topic dispatcher which dispatches rows and ddls to the default topic.
// StaticTopicDispatcher is a topic dispatcher which dispatches rows and DDL to the specific topic.
type StaticTopicDispatcher struct {
defaultTopic string
topic string
}

// NewStaticTopicDispatcher returns a StaticTopicDispatcher.
func NewStaticTopicDispatcher(defaultTopic string) *StaticTopicDispatcher {
return &StaticTopicDispatcher{
defaultTopic: defaultTopic,
topic: defaultTopic,
}
}

// Substitute converts schema/table name in a topic expression to kafka topic name.
func (s *StaticTopicDispatcher) Substitute(schema, table string) string {
return s.defaultTopic
return s.topic
}

func (s *StaticTopicDispatcher) String() string {
return s.defaultTopic
return s.topic
}

// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and ddls
// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and DDLs
// dynamically to the target topics.
type DynamicTopicDispatcher struct {
expression Expression
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestDynamicTopicDispatcherForSchema(t *testing.T) {

topicExpr := Expression("hello_{schema}_world")
err := topicExpr.Validate()
require.Nil(t, err)
require.NoError(t, err)

testCase := []struct {
schema string
table string
Expand Down Expand Up @@ -68,7 +69,8 @@ func TestDynamicTopicDispatcherForTable(t *testing.T) {

topicExpr := Expression("{schema}_{table}")
err := topicExpr.Validate()
require.Nil(t, err)
require.NoError(t, err)

testCases := []struct {
schema string
table string
Expand Down
17 changes: 13 additions & 4 deletions cdc/sink/dmlsink/mq/dispatcher/topic/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
)

var (
// hardCodeTopicNameRe is used to match a topic name which is hard code in the config
hardCodeTopicNameRe = regexp.MustCompile(`^([A-Za-z0-9\._\-]+)$`)

// topicNameRE is used to match a valid topic expression
topicNameRE = regexp.MustCompile(
`^[A-Za-z0-9\._\-]*\{schema\}([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`,
`^[A-Za-z0-9\._\-]*(\{schema\})?([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`,
)
// kafkaForbidRE is used to reject the characters which are forbidden in kafka topic name
kafkaForbidRE = regexp.MustCompile(`[^a-zA-Z0-9\._\-]`)
Expand Down Expand Up @@ -58,13 +61,14 @@ const kafkaTopicNameMaxLength = 249
type Expression string

// Validate checks whether a kafka topic name is valid or not.
// return true if the expression is hard coded.
func (e Expression) Validate() error {
// validate the topic expression
if ok := topicNameRE.MatchString(string(e)); !ok {
return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs()
if ok := topicNameRE.MatchString(string(e)); ok {
return nil
}

return nil
return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs()
}

// ValidateForAvro checks whether topic pattern is {schema}_{table}, the only allowed
Expand Down Expand Up @@ -130,3 +134,8 @@ func (e Expression) PulsarValidate() error {

return nil
}

// IsHardCode checks whether a topic name is hard code or not.
func IsHardCode(topicName string) bool {
return hardCodeTopicNameRe.MatchString(topicName)
}
24 changes: 24 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,30 @@ func TestSubstituteTopicExpression(t *testing.T) {
}
}

func TestSchemaOptional(t *testing.T) {
expression := "prefix_{table}"
topicExpr := Expression(expression)
err := topicExpr.Validate()
require.NoError(t, err)

schemaName := "test"
tableName := "table1"
topicName := topicExpr.Substitute(schemaName, tableName)
require.Equal(t, topicName, "prefix_table1")
}

func TestTableOptional(t *testing.T) {
expression := "prefix_{schema}"
topicExpr := Expression(expression)
err := topicExpr.Validate()
require.NoError(t, err)

schemaName := "test"
tableName := "abc"
topicName := topicExpr.Substitute(schemaName, tableName)
require.Equal(t, topicName, "prefix_test")
}

// cmd: go test -run='^$' -bench '^(BenchmarkSubstitute)$' github.com/pingcap/tiflow/cdc/sink/dispatcher/topic
// goos: linux
// goarch: amd64
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/multi_topics/conf/changefeed.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[sink]
dispatchers = [
{ matcher = ['workload.*'], topic = "workload"},
{ matcher = ['test.*'], topic = "{schema}_{table}" },
]
48 changes: 47 additions & 1 deletion tests/integration_tests/multi_topics/data/step1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,50 @@ create table table3
);

insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (1, 2, 3, 4, 5);
values (1, 2, 3, 4, 5);

create database workload;
use workload;

create table table1
(
id int auto_increment,
c_tinyint tinyint null,
c_smallint smallint null,
c_mediumint mediumint null,
c_int int null,
c_bigint bigint null,
constraint pk
primary key (id)
);
insert into table1(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (1, 2, 3, 4, 5);

create table table2
(
id int auto_increment,
c_tinyint tinyint null,
c_smallint smallint null,
c_mediumint mediumint null,
c_int int null,
c_bigint bigint null,
constraint pk
primary key (id)
);
insert into table2(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (1, 2, 3, 4, 5);

create table table3
(
id int auto_increment,
c_tinyint tinyint null,
c_smallint smallint null,
c_mediumint mediumint null,
c_int int null,
c_bigint bigint null,
constraint pk
primary key (id)
);

insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (1, 2, 3, 4, 5);
2 changes: 2 additions & 0 deletions tests/integration_tests/multi_topics/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ function run() {
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/test_table${i}?protocol=canal-json&version=${KAFKA_VERSION}&enable-tidb-extension=true" "" ${i}
done

run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/workload?protocol=canal-json&enable-tidb-extension=true" ""

# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
for i in $(seq 1 3); do
check_table_exists test.table${i} ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
Expand Down

0 comments on commit ef7a972

Please sign in to comment.