From 1dc0555833ca7461f3e17b34729e28a3dfd13e37 Mon Sep 17 00:00:00 2001 From: yumchina <53415352+yumchina@users.noreply.github.com> Date: Thu, 27 Jul 2023 15:31:35 +0800 Subject: [PATCH] sink (pulsar): add Expression Pulsar Validate for topic name and unit test . (#9401) ref pingcap/tiflow#9413 --- .../dmlsink/mq/dispatcher/topic/expression.go | 36 ++++ .../topic/expression_pulsar_test.go | 195 ++++++++++++++++++ 2 files changed, 231 insertions(+) create mode 100644 cdc/sink/dmlsink/mq/dispatcher/topic/expression_pulsar_test.go diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go b/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go index a6dde858a8d..00f965f68de 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go @@ -15,6 +15,7 @@ package topic import ( "regexp" + "strings" "github.com/pingcap/tiflow/pkg/errors" ) @@ -35,6 +36,15 @@ var ( avroTopicNameRE = regexp.MustCompile( `^[A-Za-z0-9\._\-]*\{schema\}[A-Za-z0-9\._\-]*\{table\}[A-Za-z0-9\._\-]*$`, ) + // pulsarTopicNameRE is used to match pulsar topic + pulsarTopicNameRE = regexp.MustCompile( + `(^((persistent|non-persistent)://)[A-Za-z0-9{}._\-]*/[A-Za-z0-9{}._\-]*/[A-Za-z0-9{}._\-]*$)|` + + `(^[A-Za-z0-9._-]*\{schema}[A-Za-z0-9._-]*\{table}[A-Za-z0-9._-]*)$`, + ) + // pulsarTopicNameREFull is used to match pulsar full topic name + pulsarTopicNameREFull = regexp.MustCompile( + `(?:persistent|non-persistent)://.*`, + ) ) // The max length of kafka topic name is 249. @@ -94,3 +104,29 @@ func (e Expression) Substitute(schema, table string) string { return topicName } } + +// PulsarValidate checks whether a pulsar topic name is valid or not. +func (e Expression) PulsarValidate() error { + // validate the topic expression + topicName := string(e) + + if len(topicName) == 0 { + return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs( + "topic name is empty") + } + + // if not full name, must be simple name + if !pulsarTopicNameREFull.MatchString(topicName) { + if strings.Contains(topicName, "/") { + return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs( + "it should be in the format of a and topic name must contain '{schema}'" + + "and simple topic name must not contain '/'") + } + } else if !pulsarTopicNameRE.MatchString(topicName) { + return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs( + "it should be in the format of // or " + + "and topic name must contain '{schema}'") + } + + return nil +} diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/expression_pulsar_test.go b/cdc/sink/dmlsink/mq/dispatcher/topic/expression_pulsar_test.go new file mode 100644 index 00000000000..2ebaf2ce468 --- /dev/null +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/expression_pulsar_test.go @@ -0,0 +1,195 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package topic + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPulsarValidate(t *testing.T) { + t.Parallel() + schema := "schema" + table := "table" + testTopicCases := []struct { + name string // test case name + expression string + wantErr string + expectedTopic string + }{ + // invalid cases + { + name: "like a full topic ,no {schema}", + expression: "persistent://", + wantErr: "invalid topic expression", + }, + { + name: "like a full topic", + expression: "persistent://{schema}", + wantErr: "invalid topic expression", + }, + { + name: "like a full topic ,no {schema}", + expression: "persistent://public", + wantErr: "invalid topic expression", + }, + { + name: "like a full topic ,no {schema}", + expression: "persistent://public_test-table", + wantErr: "invalid topic expression", + }, + { + name: "like a full topic ,need '/' after 'test-table'", + expression: "persistent://public/_test-table", + wantErr: "invalid topic expression", + }, + //{ + // // if the {schema} is a not exist namespace in pulsar server, pulsar client will get an error, + // // but cdc can not check it + // expression: "persistent://public/{schema}/_test-table", + // expectedResult: false, + //}, + { + name: "like a full topic", + expression: "persistent_public/test__{table}", + wantErr: "invalid topic expression", + }, + { + name: "like a full topic", + expression: "persistent://{schema}_{table}", + wantErr: "invalid topic expression", + }, + { + name: "like a full topic, but more '/' ", + expression: "persistent://{schema}/{table}/test/name", + wantErr: "invalid topic expression", + }, + { + name: "like a full topic, but more '/' ", + expression: "persistent://test/{table}/test/name/admin", + wantErr: "invalid topic expression", + }, + { + name: "like a full topic, but less '/' ", + expression: "non-persistent://public/test_{schema}_{table}", + wantErr: "invalid topic expression", + }, + { + name: "like a full topic, but less '/' ", + expression: "non-persistent://public/test {table}_123456aaaa", + wantErr: "invalid topic expression", + }, + // valid cases + { + name: "simple topic ,no {schema}", + expression: "public", // no {schema} + expectedTopic: "public", + }, + { + name: "simple topic ,no {schema}", + expression: "_xyz", + expectedTopic: "_xyz", + }, + { + name: "simple topic ,no {schema}", + expression: "123456", + expectedTopic: "123456", + }, + { + name: "simple topic ,no {schema}", + expression: "ABCD", + expectedTopic: "ABCD", + }, + { + name: "like a full topic ,no {schema}", + expression: "persistent:public_test-table", + expectedTopic: "persistent:public_test-table", + }, + { + name: "simple topic", + expression: "{schema}", + expectedTopic: "schema", + }, + { + name: "simple topic", + expression: "AZ_{schema}", + expectedTopic: "AZ_schema", + }, + { + name: "simple topic", + expression: "{table}_{schema}", + expectedTopic: "table_schema", + }, + { + name: "simple topic", + expression: "123_{schema}_non-persistenttest__{table})", + expectedTopic: "123_schema_non-persistenttest__table)", + }, + { + name: "simple topic", + expression: "persistent_public_test_{schema}_{table}", + expectedTopic: "persistent_public_test_schema_table", + }, + { + name: "simple topic", + expression: "persistent{schema}_{table}", + expectedTopic: "persistentschema_table", + }, + { + name: "full topic", + expression: "persistent://public/default/{schema}_{table}", + expectedTopic: "persistent://public/default/schema_table", + }, + { + name: "full topic", + expression: "persistent://public/default/2342-{schema}_abc234", + expectedTopic: "persistent://public/default/2342-schema_abc234", + }, + { + name: "full topic", + expression: "persistent://{schema}/{schema}/2342-{schema}_abc234", + expectedTopic: "persistent://schema/schema/2342-schema_abc234", + }, + { + name: "full topic", + expression: "persistent://{schema}/dev/2342-{schema}_abc234", + expectedTopic: "persistent://schema/dev/2342-schema_abc234", + }, + { + name: "full topic", + expression: "non-persistent://public/default/test_{schema}_{table}_back_up", + expectedTopic: "non-persistent://public/default/test_schema_table_back_up", + }, + { + name: "full topic", + expression: "123_{schema}_non-persistenttest__{table}", + expectedTopic: "123_schema_non-persistenttest__table", + }, + } + + for i, tc := range testTopicCases { + topicExpr := Expression(tc.expression) + err := topicExpr.PulsarValidate() + t.Logf("case %d: %s", i, tc.name) + if err != nil { + require.Contains(t, err.Error(), tc.wantErr, fmt.Sprintf("case:%s", tc.name)) + } else { + require.Nil(t, err, fmt.Sprintf("case:%s", tc.name)) + topicName := topicExpr.Substitute(schema, table) + require.Equal(t, topicName, tc.expectedTopic, fmt.Sprintf("case:%s", tc.name)) + } + } +}