From ef7a972df83a1310502e9194f516746a0c1f6718 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Fri, 22 Sep 2023 10:55:44 +0800 Subject: [PATCH] kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755) close pingcap/tiflow#9763 --- .../dmlsink/mq/dispatcher/event_router.go | 34 +++++++------ .../mq/dispatcher/event_router_test.go | 12 +++++ .../dmlsink/mq/dispatcher/topic/dispatcher.go | 12 ++--- .../mq/dispatcher/topic/dispatcher_test.go | 6 ++- .../dmlsink/mq/dispatcher/topic/expression.go | 17 +++++-- .../mq/dispatcher/topic/expression_test.go | 24 ++++++++++ .../multi_topics/conf/changefeed.toml | 1 + .../multi_topics/data/step1.sql | 48 ++++++++++++++++++- tests/integration_tests/multi_topics/run.sh | 2 + 9 files changed, 128 insertions(+), 28 deletions(-) diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router.go b/cdc/sink/dmlsink/mq/dispatcher/event_router.go index 37731327c54..478054557af 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router.go @@ -207,25 +207,29 @@ func getTopicDispatcher( return topic.NewStaticTopicDispatcher(defaultTopic), nil } + if topic.IsHardCode(rule) { + return topic.NewStaticTopicDispatcher(rule), nil + } + // check if this rule is a valid topic expression topicExpr := topic.Expression(rule) - - var err error - // validate the topic expression for pulsar sink - if sink.IsPulsarScheme(schema) { - err = topicExpr.PulsarValidate() - } else { - // validate the topic expression for kafka sink - switch protocol { - case config.ProtocolAvro: - err = topicExpr.ValidateForAvro() - default: - err = topicExpr.Validate() - } - } + err := validateTopicExpression(topicExpr, schema, protocol) if err != nil { return nil, err } - return topic.NewDynamicTopicDispatcher(topicExpr), nil } + +func validateTopicExpression(expr topic.Expression, scheme string, protocol config.Protocol) error { + if sink.IsPulsarScheme(scheme) { + return expr.PulsarValidate() + } + + switch protocol { + case config.ProtocolAvro: + return expr.ValidateForAvro() + default: + } + + return expr.Validate() +} diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go b/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go index 5227f8d47b4..02fb84b913f 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go @@ -63,6 +63,12 @@ func newReplicaConfig4DispatcherTest() *config.ReplicaConfig { PartitionRule: "ts", TopicRule: "{schema}_{table}", }, + // rule-6: hard code the topic + { + Matcher: []string{"hard_code_schema.*"}, + PartitionRule: "default", + TopicRule: "hard_code_topic", + }, }, }, } @@ -75,6 +81,7 @@ func TestEventRouter(t *testing.T) { d, err := NewEventRouter(replicaConfig, config.ProtocolCanalJSON, "test", sink.KafkaScheme) require.NoError(t, err) require.Equal(t, "test", d.GetDefaultTopic()) + topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test") require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) @@ -120,6 +127,11 @@ func TestEventRouter(t *testing.T) { topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "table2") require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher) + + // match rule-6 + topicDispatcher, partitionDispatcher = d.matchDispatcher("hard_code_schema", "test") + require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) } func TestGetActiveTopics(t *testing.T) { diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go b/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go index 90a543b2b12..4053222d517 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go @@ -23,28 +23,28 @@ type Dispatcher interface { Substitute(schema, table string) string } -// StaticTopicDispatcher is a topic dispatcher which dispatches rows and ddls to the default topic. +// StaticTopicDispatcher is a topic dispatcher which dispatches rows and DDL to the specific topic. type StaticTopicDispatcher struct { - defaultTopic string + topic string } // NewStaticTopicDispatcher returns a StaticTopicDispatcher. func NewStaticTopicDispatcher(defaultTopic string) *StaticTopicDispatcher { return &StaticTopicDispatcher{ - defaultTopic: defaultTopic, + topic: defaultTopic, } } // Substitute converts schema/table name in a topic expression to kafka topic name. func (s *StaticTopicDispatcher) Substitute(schema, table string) string { - return s.defaultTopic + return s.topic } func (s *StaticTopicDispatcher) String() string { - return s.defaultTopic + return s.topic } -// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and ddls +// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and DDLs // dynamically to the target topics. type DynamicTopicDispatcher struct { expression Expression diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go b/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go index 73974317f73..c387ac0e85a 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go @@ -29,7 +29,8 @@ func TestDynamicTopicDispatcherForSchema(t *testing.T) { topicExpr := Expression("hello_{schema}_world") err := topicExpr.Validate() - require.Nil(t, err) + require.NoError(t, err) + testCase := []struct { schema string table string @@ -68,7 +69,8 @@ func TestDynamicTopicDispatcherForTable(t *testing.T) { topicExpr := Expression("{schema}_{table}") err := topicExpr.Validate() - require.Nil(t, err) + require.NoError(t, err) + testCases := []struct { schema string table string diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go b/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go index 00f965f68de..61610f6f89d 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go @@ -21,9 +21,12 @@ import ( ) var ( + // hardCodeTopicNameRe is used to match a topic name which is hard code in the config + hardCodeTopicNameRe = regexp.MustCompile(`^([A-Za-z0-9\._\-]+)$`) + // topicNameRE is used to match a valid topic expression topicNameRE = regexp.MustCompile( - `^[A-Za-z0-9\._\-]*\{schema\}([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`, + `^[A-Za-z0-9\._\-]*(\{schema\})?([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`, ) // kafkaForbidRE is used to reject the characters which are forbidden in kafka topic name kafkaForbidRE = regexp.MustCompile(`[^a-zA-Z0-9\._\-]`) @@ -58,13 +61,14 @@ const kafkaTopicNameMaxLength = 249 type Expression string // Validate checks whether a kafka topic name is valid or not. +// return true if the expression is hard coded. func (e Expression) Validate() error { // validate the topic expression - if ok := topicNameRE.MatchString(string(e)); !ok { - return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs() + if ok := topicNameRE.MatchString(string(e)); ok { + return nil } - return nil + return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs() } // ValidateForAvro checks whether topic pattern is {schema}_{table}, the only allowed @@ -130,3 +134,8 @@ func (e Expression) PulsarValidate() error { return nil } + +// IsHardCode checks whether a topic name is hard code or not. +func IsHardCode(topicName string) bool { + return hardCodeTopicNameRe.MatchString(topicName) +} diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go b/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go index c02bdc7ffdf..070ae718d49 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go @@ -227,6 +227,30 @@ func TestSubstituteTopicExpression(t *testing.T) { } } +func TestSchemaOptional(t *testing.T) { + expression := "prefix_{table}" + topicExpr := Expression(expression) + err := topicExpr.Validate() + require.NoError(t, err) + + schemaName := "test" + tableName := "table1" + topicName := topicExpr.Substitute(schemaName, tableName) + require.Equal(t, topicName, "prefix_table1") +} + +func TestTableOptional(t *testing.T) { + expression := "prefix_{schema}" + topicExpr := Expression(expression) + err := topicExpr.Validate() + require.NoError(t, err) + + schemaName := "test" + tableName := "abc" + topicName := topicExpr.Substitute(schemaName, tableName) + require.Equal(t, topicName, "prefix_test") +} + // cmd: go test -run='^$' -bench '^(BenchmarkSubstitute)$' github.com/pingcap/tiflow/cdc/sink/dispatcher/topic // goos: linux // goarch: amd64 diff --git a/tests/integration_tests/multi_topics/conf/changefeed.toml b/tests/integration_tests/multi_topics/conf/changefeed.toml index 0f72c584c5f..2d5a3c83c8c 100644 --- a/tests/integration_tests/multi_topics/conf/changefeed.toml +++ b/tests/integration_tests/multi_topics/conf/changefeed.toml @@ -1,4 +1,5 @@ [sink] dispatchers = [ + { matcher = ['workload.*'], topic = "workload"}, { matcher = ['test.*'], topic = "{schema}_{table}" }, ] diff --git a/tests/integration_tests/multi_topics/data/step1.sql b/tests/integration_tests/multi_topics/data/step1.sql index fa18f2208ce..a33dbc81ccf 100644 --- a/tests/integration_tests/multi_topics/data/step1.sql +++ b/tests/integration_tests/multi_topics/data/step1.sql @@ -41,4 +41,50 @@ create table table3 ); insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) -values (1, 2, 3, 4, 5); \ No newline at end of file +values (1, 2, 3, 4, 5); + +create database workload; +use workload; + +create table table1 +( + id int auto_increment, + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + constraint pk + primary key (id) +); +insert into table1(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) +values (1, 2, 3, 4, 5); + +create table table2 +( + id int auto_increment, + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + constraint pk + primary key (id) +); +insert into table2(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) +values (1, 2, 3, 4, 5); + +create table table3 +( + id int auto_increment, + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + constraint pk + primary key (id) +); + +insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) +values (1, 2, 3, 4, 5); diff --git a/tests/integration_tests/multi_topics/run.sh b/tests/integration_tests/multi_topics/run.sh index fa38b2f1037..99f8acf6458 100644 --- a/tests/integration_tests/multi_topics/run.sh +++ b/tests/integration_tests/multi_topics/run.sh @@ -36,6 +36,8 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/test_table${i}?protocol=canal-json&version=${KAFKA_VERSION}&enable-tidb-extension=true" "" ${i} done + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/workload?protocol=canal-json&enable-tidb-extension=true" "" + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first for i in $(seq 1 3); do check_table_exists test.table${i} ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300