Skip to content

Commit

Permalink
sink (pulsar): add Expression Pulsar Validate for topic name and unit…
Browse files Browse the repository at this point in the history
… test . (#9401)

ref #9413
  • Loading branch information
yumchina authored Jul 27, 2023
1 parent 57d2ccc commit 1dc0555
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 0 deletions.
36 changes: 36 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/topic/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package topic

import (
"regexp"
"strings"

"github.com/pingcap/tiflow/pkg/errors"
)
Expand All @@ -35,6 +36,15 @@ var (
avroTopicNameRE = regexp.MustCompile(
`^[A-Za-z0-9\._\-]*\{schema\}[A-Za-z0-9\._\-]*\{table\}[A-Za-z0-9\._\-]*$`,
)
// pulsarTopicNameRE is used to match pulsar topic
pulsarTopicNameRE = regexp.MustCompile(
`(^((persistent|non-persistent)://)[A-Za-z0-9{}._\-]*/[A-Za-z0-9{}._\-]*/[A-Za-z0-9{}._\-]*$)|` +
`(^[A-Za-z0-9._-]*\{schema}[A-Za-z0-9._-]*\{table}[A-Za-z0-9._-]*)$`,
)
// pulsarTopicNameREFull is used to match pulsar full topic name
pulsarTopicNameREFull = regexp.MustCompile(
`(?:persistent|non-persistent)://.*`,
)
)

// The max length of kafka topic name is 249.
Expand Down Expand Up @@ -94,3 +104,29 @@ func (e Expression) Substitute(schema, table string) string {
return topicName
}
}

// PulsarValidate checks whether a pulsar topic name is valid or not.
func (e Expression) PulsarValidate() error {
// validate the topic expression
topicName := string(e)

if len(topicName) == 0 {
return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs(
"topic name is empty")
}

// if not full name, must be simple name
if !pulsarTopicNameREFull.MatchString(topicName) {
if strings.Contains(topicName, "/") {
return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs(
"it should be in the format of a <topic> and topic name must contain '{schema}'" +
"and simple topic name must not contain '/'")
}
} else if !pulsarTopicNameRE.MatchString(topicName) {
return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs(
"it should be in the format of <tenant>/<namespace>/<topic> or <topic> " +
"and topic name must contain '{schema}'")
}

return nil
}
195 changes: 195 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/topic/expression_pulsar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package topic

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestPulsarValidate(t *testing.T) {
t.Parallel()
schema := "schema"
table := "table"
testTopicCases := []struct {
name string // test case name
expression string
wantErr string
expectedTopic string
}{
// invalid cases
{
name: "like a full topic ,no {schema}",
expression: "persistent://",
wantErr: "invalid topic expression",
},
{
name: "like a full topic",
expression: "persistent://{schema}",
wantErr: "invalid topic expression",
},
{
name: "like a full topic ,no {schema}",
expression: "persistent://public",
wantErr: "invalid topic expression",
},
{
name: "like a full topic ,no {schema}",
expression: "persistent://public_test-table",
wantErr: "invalid topic expression",
},
{
name: "like a full topic ,need '/' after 'test-table'",
expression: "persistent://public/_test-table",
wantErr: "invalid topic expression",
},
//{
// // if the {schema} is a not exist namespace in pulsar server, pulsar client will get an error,
// // but cdc can not check it
// expression: "persistent://public/{schema}/_test-table",
// expectedResult: false,
//},
{
name: "like a full topic",
expression: "persistent_public/test__{table}",
wantErr: "invalid topic expression",
},
{
name: "like a full topic",
expression: "persistent://{schema}_{table}",
wantErr: "invalid topic expression",
},
{
name: "like a full topic, but more '/' ",
expression: "persistent://{schema}/{table}/test/name",
wantErr: "invalid topic expression",
},
{
name: "like a full topic, but more '/' ",
expression: "persistent://test/{table}/test/name/admin",
wantErr: "invalid topic expression",
},
{
name: "like a full topic, but less '/' ",
expression: "non-persistent://public/test_{schema}_{table}",
wantErr: "invalid topic expression",
},
{
name: "like a full topic, but less '/' ",
expression: "non-persistent://public/test {table}_123456aaaa",
wantErr: "invalid topic expression",
},
// valid cases
{
name: "simple topic ,no {schema}",
expression: "public", // no {schema}
expectedTopic: "public",
},
{
name: "simple topic ,no {schema}",
expression: "_xyz",
expectedTopic: "_xyz",
},
{
name: "simple topic ,no {schema}",
expression: "123456",
expectedTopic: "123456",
},
{
name: "simple topic ,no {schema}",
expression: "ABCD",
expectedTopic: "ABCD",
},
{
name: "like a full topic ,no {schema}",
expression: "persistent:public_test-table",
expectedTopic: "persistent:public_test-table",
},
{
name: "simple topic",
expression: "{schema}",
expectedTopic: "schema",
},
{
name: "simple topic",
expression: "AZ_{schema}",
expectedTopic: "AZ_schema",
},
{
name: "simple topic",
expression: "{table}_{schema}",
expectedTopic: "table_schema",
},
{
name: "simple topic",
expression: "123_{schema}_non-persistenttest__{table})",
expectedTopic: "123_schema_non-persistenttest__table)",
},
{
name: "simple topic",
expression: "persistent_public_test_{schema}_{table}",
expectedTopic: "persistent_public_test_schema_table",
},
{
name: "simple topic",
expression: "persistent{schema}_{table}",
expectedTopic: "persistentschema_table",
},
{
name: "full topic",
expression: "persistent://public/default/{schema}_{table}",
expectedTopic: "persistent://public/default/schema_table",
},
{
name: "full topic",
expression: "persistent://public/default/2342-{schema}_abc234",
expectedTopic: "persistent://public/default/2342-schema_abc234",
},
{
name: "full topic",
expression: "persistent://{schema}/{schema}/2342-{schema}_abc234",
expectedTopic: "persistent://schema/schema/2342-schema_abc234",
},
{
name: "full topic",
expression: "persistent://{schema}/dev/2342-{schema}_abc234",
expectedTopic: "persistent://schema/dev/2342-schema_abc234",
},
{
name: "full topic",
expression: "non-persistent://public/default/test_{schema}_{table}_back_up",
expectedTopic: "non-persistent://public/default/test_schema_table_back_up",
},
{
name: "full topic",
expression: "123_{schema}_non-persistenttest__{table}",
expectedTopic: "123_schema_non-persistenttest__table",
},
}

for i, tc := range testTopicCases {
topicExpr := Expression(tc.expression)
err := topicExpr.PulsarValidate()
t.Logf("case %d: %s", i, tc.name)
if err != nil {
require.Contains(t, err.Error(), tc.wantErr, fmt.Sprintf("case:%s", tc.name))
} else {
require.Nil(t, err, fmt.Sprintf("case:%s", tc.name))
topicName := topicExpr.Substitute(schema, table)
require.Equal(t, topicName, tc.expectedTopic, fmt.Sprintf("case:%s", tc.name))
}
}
}

0 comments on commit 1dc0555

Please sign in to comment.