diff --git a/Makefile b/Makefile index 47a0cb22870..967806d1633 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,12 @@ CURDIR := $(shell pwd) path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH))) export PATH := $(CURDIR)/bin:$(CURDIR)/tools/bin:$(path_to_add):$(PATH) +# DBUS_SESSION_BUS_ADDRESS pulsar client use dbus to detect the connection status, +# but it will not exit when the connection is closed. +# I try to use leak_helper to detect goroutine leak,but it does not work. +# https://github.com/benthosdev/benthos/issues/1184 suggest to use environment variable to disable dbus. +export DBUS_SESSION_BUS_ADDRESS := /dev/null + SHELL := /usr/bin/env bash TEST_DIR := /tmp/tidb_cdc_test diff --git a/cdc/sink/ddlsink/factory/factory.go b/cdc/sink/ddlsink/factory/factory.go index b000b5daac3..1a8107c1bf1 100644 --- a/cdc/sink/ddlsink/factory/factory.go +++ b/cdc/sink/ddlsink/factory/factory.go @@ -25,11 +25,13 @@ import ( "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq" "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" "github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/kafka" kafkav2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" "github.com/pingcap/tiflow/pkg/util" ) @@ -59,6 +61,9 @@ func New( return mysql.NewDDLSink(ctx, changefeedID, sinkURI, cfg) case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: return cloudstorage.NewDDLSink(ctx, changefeedID, sinkURI, cfg) + case sink.PulsarScheme, sink.PulsarSSLScheme: + return mq.NewPulsarDDLSink(ctx, changefeedID, sinkURI, cfg, manager.NewPulsarTopicManager, + pulsarConfig.NewCreatorFactory, ddlproducer.NewPulsarProducer) default: return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme) diff --git a/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go index 8b377f55d50..ff35c4cf759 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go @@ -16,9 +16,12 @@ package ddlproducer import ( "context" + "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/kafka" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" ) // DDLProducer is the interface for DDL message producer. @@ -38,3 +41,7 @@ type DDLProducer interface { // Factory is a function to create a producer. type Factory func(ctx context.Context, changefeedID model.ChangeFeedID, syncProducer kafka.SyncProducer) DDLProducer + +// PulsarFactory is a function to create a pulsar producer. +type PulsarFactory func(ctx context.Context, changefeedID model.ChangeFeedID, + pConfig *pulsarConfig.Config, client pulsar.Client, sinkConfig *config.SinkConfig) (DDLProducer, error) diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go new file mode 100644 index 00000000000..498a63039db --- /dev/null +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go @@ -0,0 +1,113 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddlproducer + +import ( + "context" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" +) + +// Assert DDLEventSink implementation +var _ DDLProducer = (*PulsarMockProducers)(nil) + +// PulsarMockProducers is a mock pulsar producer +type PulsarMockProducers struct { + events map[string][]*pulsar.ProducerMessage +} + +// SyncBroadcastMessage pulsar consume all partitions +func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string, + totalPartitionsNum int32, message *common.Message, +) error { + // call SyncSendMessage + + log.Info("pulsarProducers SyncBroadcastMessage in") + return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) +} + +// SyncSendMessage sends a message +// partitionNum is not used,pulsar consume all partitions +func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string, + partitionNum int32, message *common.Message, +) error { + data := &pulsar.ProducerMessage{ + Payload: message.Value, + Key: message.GetPartitionKey(), + } + p.events[topic] = append(p.events[topic], data) + + return nil +} + +// NewMockPulsarProducer creates a pulsar producer +func NewMockPulsarProducer( + ctx context.Context, + changefeedID model.ChangeFeedID, + pConfig *pulsarConfig.Config, + client pulsar.Client, +) (*PulsarMockProducers, error) { + return &PulsarMockProducers{ + events: map[string][]*pulsar.ProducerMessage{}, + }, nil +} + +// NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer +func NewMockPulsarProducerDDL( + ctx context.Context, + changefeedID model.ChangeFeedID, + pConfig *pulsarConfig.Config, + client pulsar.Client, + sinkConfig *config.SinkConfig, +) (DDLProducer, error) { + return NewMockPulsarProducer(ctx, changefeedID, pConfig, client) +} + +// GetProducerByTopic returns a producer by topic name +func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { + return producer, nil +} + +// Close close all producers +func (p *PulsarMockProducers) Close() { + p.events = make(map[string][]*pulsar.ProducerMessage) +} + +// Flush waits for all the messages in the async producer to be sent to Pulsar. +// Notice: this method is not thread-safe. +// Do not try to call AsyncSendMessage and Flush functions in different threads, +// otherwise Flush will not work as expected. It may never finish or flush the wrong message. +// Because inflight will be modified by mistake. +func (p *PulsarMockProducers) Flush(ctx context.Context) error { + return nil +} + +// GetAllEvents returns the events received by the mock producer. +func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage { + var events []*pulsar.ProducerMessage + for _, v := range p.events { + events = append(events, v...) + } + return events +} + +// GetEvents returns the event filtered by the key. +func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage { + return p.events[topic] +} diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go new file mode 100644 index 00000000000..7d5f1b1ddfa --- /dev/null +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go @@ -0,0 +1,240 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddlproducer + +import ( + "context" + "encoding/json" + "sync" + + "github.com/apache/pulsar-client-go/pulsar" + lru "github.com/hashicorp/golang-lru" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "go.uber.org/zap" +) + +const ( + // DefaultPulsarProducerCacheSize is the default size of the cache for producers + // 10240 producers maybe cost 1.1G memory + DefaultPulsarProducerCacheSize = 10240 +) + +// Assert DDLEventSink implementation +var _ DDLProducer = (*pulsarProducers)(nil) + +// pulsarProducers is a producer for pulsar +type pulsarProducers struct { + client pulsar.Client + pConfig *pulsarConfig.Config + defaultTopicName string + // support multiple topics + producers *lru.Cache + producersMutex sync.RWMutex + id model.ChangeFeedID +} + +// SyncBroadcastMessage pulsar consume all partitions +func (p *pulsarProducers) SyncBroadcastMessage(ctx context.Context, topic string, + totalPartitionsNum int32, message *common.Message, +) error { + // call SyncSendMessage + // pulsar consumer all partitions + return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) +} + +// SyncSendMessage sends a message +// partitionNum is not used,pulsar consume all partitions +func (p *pulsarProducers) SyncSendMessage(ctx context.Context, topic string, + partitionNum int32, message *common.Message, +) error { + p.wrapperSchemaAndTopic(message) + + producer, err := p.GetProducerByTopic(topic) + if err != nil { + log.Error("ddl SyncSendMessage GetProducerByTopic fail", zap.Error(err)) + return err + } + + data := &pulsar.ProducerMessage{ + Payload: message.Value, + Key: message.GetPartitionKey(), + } + mID, err := producer.Send(ctx, data) + if err != nil { + log.Error("ddl producer send fail", zap.Error(err)) + return err + } + + log.Debug("pulsarProducers SyncSendMessage success", + zap.Any("mID", mID), zap.String("topic", topic)) + + return nil +} + +// NewPulsarProducer creates a pulsar producer +func NewPulsarProducer( + ctx context.Context, + changefeedID model.ChangeFeedID, + pConfig *pulsarConfig.Config, + client pulsar.Client, + sinkConfig *config.SinkConfig, +) (DDLProducer, error) { + log.Info("Starting pulsar DDL producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + + topicName := pConfig.GetDefaultTopicName() + + defaultProducer, err := newProducer(pConfig, client, topicName) + if err != nil { + return nil, err + } + + producerCacheSize := DefaultPulsarProducerCacheSize + if sinkConfig.PulsarConfig != nil && sinkConfig.PulsarConfig.PulsarProducerCacheSize != nil { + producerCacheSize = int(*sinkConfig.PulsarConfig.PulsarProducerCacheSize) + } + + producers, err := lru.NewWithEvict(producerCacheSize, func(key interface{}, value interface{}) { + // remove producer + pulsarProducer, ok := value.(pulsar.Producer) + if ok && pulsarProducer != nil { + pulsarProducer.Close() + } + }) + if err != nil { + return nil, err + } + + producers.Add(topicName, defaultProducer) + return &pulsarProducers{ + client: client, + pConfig: pConfig, + producers: producers, + defaultTopicName: topicName, + id: changefeedID, + }, nil +} + +// newProducer creates a pulsar producer +// One topic is used by one producer +func newProducer( + pConfig *pulsarConfig.Config, + client pulsar.Client, + topicName string, +) (pulsar.Producer, error) { + po := pulsar.ProducerOptions{ + Topic: topicName, + } + if pConfig.BatchingMaxMessages > 0 { + po.BatchingMaxMessages = pConfig.BatchingMaxMessages + } + if pConfig.BatchingMaxPublishDelay > 0 { + po.BatchingMaxPublishDelay = pConfig.BatchingMaxPublishDelay + } + if pConfig.CompressionType > 0 { + po.CompressionType = pConfig.CompressionType + po.CompressionLevel = pulsar.Default + } + if pConfig.SendTimeout > 0 { + po.SendTimeout = pConfig.SendTimeout + } + + producer, err := client.CreateProducer(po) + if err != nil { + return nil, err + } + + log.Info("create pulsar producer success", + zap.String("topic:", topicName)) + + return producer, nil +} + +func (p *pulsarProducers) getProducer(topic string) (pulsar.Producer, bool) { + target, ok := p.producers.Get(topic) + if ok { + producer, ok := target.(pulsar.Producer) + if ok { + return producer, true + } + } + return nil, false +} + +// GetProducerByTopic get producer by topicName +func (p *pulsarProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { + getProducer, ok := p.getProducer(topicName) + if ok && getProducer != nil { + return getProducer, nil + } + + if !ok { // create a new producer for the topicName + producer, err = newProducer(p.pConfig, p.client, topicName) + if err != nil { + return nil, err + } + p.producers.Add(topicName, producer) + } + + return producer, nil +} + +// Close close all producers +func (p *pulsarProducers) Close() { + keys := p.producers.Keys() + p.producersMutex.Lock() + defer p.producersMutex.Unlock() + for _, topic := range keys { + p.producers.Remove(topic) // callback func will be called + } +} + +// wrapperSchemaAndTopic wrapper schema and topic +func (p *pulsarProducers) wrapperSchemaAndTopic(m *common.Message) { + if m.Schema == nil { + if m.Protocol == config.ProtocolMaxwell { + mx := &maxwellMessage{} + err := json.Unmarshal(m.Value, mx) + if err != nil { + log.Error("unmarshal maxwell message failed", zap.Error(err)) + return + } + if len(mx.Database) > 0 { + m.Schema = &mx.Database + } + if len(mx.Table) > 0 { + m.Table = &mx.Table + } + } + if m.Protocol == config.ProtocolCanal { // canal protocol set multi schemas in one topic + m.Schema = str2Pointer("multi_schema") + } + } +} + +// maxwellMessage is the message format of maxwell +type maxwellMessage struct { + Database string `json:"database"` + Table string `json:"table"` +} + +// str2Pointer returns the pointer of the string. +func str2Pointer(str string) *string { + return &str +} diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go new file mode 100644 index 00000000000..698c5b004af --- /dev/null +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go @@ -0,0 +1,123 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddlproducer + +import ( + "context" + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/leakutil" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "github.com/stretchr/testify/require" +) + +// TestPulsarSyncSendMessage is a integration test for pulsar producer +func TestPulsarSyncSendMessage(t *testing.T) { + leakutil.VerifyNone(t) + + type args struct { + ctx context.Context + topic string + partition int32 + message *common.Message + changefeedID model.ChangeFeedID + pulsarConfig *pulsarConfig.Config + errCh chan error + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "test SyncSendMessage", + args: args{ + ctx: context.Background(), + topic: "test", + partition: 1, + changefeedID: model.ChangeFeedID{ID: "test", Namespace: "test_namespace"}, + message: &common.Message{ + Value: []byte("this value for test input data"), + PartitionKey: str2Pointer("test_key"), + }, + errCh: make(chan error), + }, + }, + } + for _, tt := range tests { + p, err := NewMockPulsarProducer(tt.args.ctx, tt.args.changefeedID, + tt.args.pulsarConfig, nil) + + require.NoError(t, err) + + err = p.SyncSendMessage(tt.args.ctx, tt.args.topic, + tt.args.partition, tt.args.message) + require.NoError(t, err) + require.Len(t, p.GetEvents(tt.args.topic), 1) + + p.Close() + + } +} + +// TestPulsarSyncBroadcastMessage is a integration test for pulsar producer +func TestPulsarSyncBroadcastMessage(t *testing.T) { + // leakutil.VerifyNone(t) + + type args struct { + ctx context.Context + topic string + partition int32 + message *common.Message + changefeedID model.ChangeFeedID + pulsarConfig *pulsarConfig.Config + errCh chan error + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "test SyncBroadcastMessage", + args: args{ + ctx: context.Background(), + topic: "test", + partition: 1, + changefeedID: model.ChangeFeedID{ID: "test", Namespace: "test_namespace"}, + message: &common.Message{ + Value: []byte("this value for test input data"), + PartitionKey: str2Pointer("test_key"), + }, + errCh: make(chan error), + }, + }, + } + for _, tt := range tests { + p, err := NewMockPulsarProducer(tt.args.ctx, tt.args.changefeedID, + tt.args.pulsarConfig, nil) + + require.NoError(t, err) + + err = p.SyncSendMessage(tt.args.ctx, tt.args.topic, + tt.args.partition, tt.args.message) + require.NoError(t, err) + require.Len(t, p.GetEvents(tt.args.topic), 1) + + p.Close() + + } +} diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go new file mode 100644 index 00000000000..5007706c148 --- /dev/null +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -0,0 +1,109 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/builder" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + tiflowutil "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +// NewPulsarDDLSink will verify the config and create a Pulsar DDL Sink. +func NewPulsarDDLSink( + ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + pulsarTopicManagerCreator manager.PulsarTopicManager, + clientCreator pulsarConfig.FactoryCreator, + producerCreator ddlproducer.PulsarFactory, +) (_ *DDLSink, err error) { + log.Info("Starting pulsar DDL producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + + defaultTopic, err := util.GetTopic(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) + if err != nil { + return nil, errors.Trace(err) + } + + pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + log.Info("Try to create a DDL sink producer", zap.Any("pulsarConfig", pConfig)) + + // NewEventRouter + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic) + if err != nil { + return nil, errors.Trace(err) + } + + encoderConfig, err := util.GetEncoderConfig(sinkURI, protocol, replicaConfig, pConfig.MaxMessageBytes) + if err != nil { + return nil, errors.Trace(err) + } + + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, changefeedID, encoderConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + start := time.Now() + client, err := clientCreator(pConfig, changefeedID, replicaConfig.Sink) + if err != nil { + log.Error("DDL sink producer client create fail", zap.Error(err)) + return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) + } + + p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) + log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + + topicManager, err := pulsarTopicManagerCreator(pConfig, client) + if err != nil { + return nil, errors.Trace(err) + } + + s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) + + return s, nil +} + +// str2Pointer returns the pointer of the string. +func str2Pointer(str string) *string { + return &str +} diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go new file mode 100644 index 00000000000..515bfcd0163 --- /dev/null +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go @@ -0,0 +1,160 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + "testing" + + mm "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/pkg/config" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "github.com/stretchr/testify/require" +) + +const ( + // MockPulsarTopic is the mock topic for pulsar + MockPulsarTopic = "pulsar_test" +) + +// newPulsarConfig set config +func newPulsarConfig(t *testing.T) (*pulsarConfig.Config, *url.URL) { + sinkURL := "pulsar://127.0.0.1:6650/persistent://public/default/test?" + + "protocol=canal-json&pulsar-version=v2.10.0&enable-tidb-extension=true&" + + "authentication-token=eyJhbcGcixxxxxxxxxxxxxx" + + sinkURI, err := url.Parse(sinkURL) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI)) + require.NoError(t, err) + c, err := pulsarConfig.NewPulsarConfig(sinkURI) + require.NoError(t, err) + return c, sinkURI +} + +// TestNewPulsarDDLSink tests the NewPulsarDDLSink +func TestNewPulsarDDLSink(t *testing.T) { + t.Parallel() + + cfg, sinkURI := newPulsarConfig(t) + changefeedID := model.DefaultChangeFeedID("test") + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink = &config.SinkConfig{ + Protocol: str2Pointer(cfg.Protocol.String()), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = context.WithValue(ctx, "testing.T", t) + ddlSink, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig, + manager.NewMockPulsarTopicManager, pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL) + + require.NoError(t, err) + require.NotNil(t, ddlSink) + + checkpointTs := uint64(417318403368288260) + tables := []*model.TableInfo{ + { + TableName: model.TableName{ + Schema: "cdc", + Table: "person", + }, + }, + { + TableName: model.TableName{ + Schema: "cdc", + Table: "person1", + }, + }, + { + TableName: model.TableName{ + Schema: "cdc", + Table: "person2", + }, + }, + } + + err = ddlSink.WriteCheckpointTs(ctx, checkpointTs, tables) + require.NoError(t, err) + + events := ddlSink.producer.(*ddlproducer.PulsarMockProducers).GetAllEvents() + require.Len(t, events, 1, "All topics and partitions should be broadcast") +} + +// TestPulsarDDLSinkNewSuccess tests the NewPulsarDDLSink write a event to pulsar +func TestPulsarDDLSinkNewSuccess(t *testing.T) { + t.Parallel() + + cfg, sinkURI := newPulsarConfig(t) + changefeedID := model.DefaultChangeFeedID("test") + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink = &config.SinkConfig{ + Protocol: str2Pointer(cfg.Protocol.String()), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = context.WithValue(ctx, "testing.T", t) + s, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig, manager.NewMockPulsarTopicManager, + pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL) + require.NoError(t, err) + require.NotNil(t, s) +} + +func TestPulsarWriteDDLEventToZeroPartition(t *testing.T) { + t.Parallel() + + cfg, sinkURI := newPulsarConfig(t) + changefeedID := model.DefaultChangeFeedID("test") + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink = &config.SinkConfig{ + Protocol: str2Pointer(cfg.Protocol.String()), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = context.WithValue(ctx, "testing.T", t) + ddlSink, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig, + manager.NewMockPulsarTopicManager, pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL) + + require.NoError(t, err) + require.NotNil(t, ddlSink) + + ddl := &model.DDLEvent{ + CommitTs: 417318403368288260, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "cdc", Table: "person", + }, + }, + Query: "create table person(id int, name varchar(32), primary key(id))", + Type: mm.ActionCreateTable, + } + err = ddlSink.WriteDDLEvent(ctx, ddl) + require.NoError(t, err) + + err = ddlSink.WriteDDLEvent(ctx, ddl) + require.NoError(t, err) + + require.Len(t, ddlSink.producer.(*ddlproducer.PulsarMockProducers).GetAllEvents(), + 2, "Write DDL 2 Events") +} diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go new file mode 100644 index 00000000000..ddc5542d614 --- /dev/null +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go @@ -0,0 +1,70 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "context" + "sync" + + "github.com/apache/pulsar-client-go/pulsar" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" +) + +// PulsarTopicManager is a manager for pulsar topics. +type PulsarTopicManager func( + cfg *pulsarConfig.Config, + client pulsar.Client, +) (TopicManager, error) + +// pulsarTopicManager is a manager for pulsar topics. +type pulsarTopicManager struct { + client pulsar.Client + partitions sync.Map // key : topic, value : partition-name + cfg *pulsarConfig.Config +} + +// NewPulsarTopicManager creates a new topic manager. +func NewPulsarTopicManager( + cfg *pulsarConfig.Config, + client pulsar.Client, +) (TopicManager, error) { + mgr := &pulsarTopicManager{ + client: client, + cfg: cfg, + partitions: sync.Map{}, + } + + return mgr, nil +} + +// GetPartitionNum spend more time,but no use. +// Neither synchronous nor asynchronous sending of pulsar will use PartitionNum +// but this method is used in mq_ddl_sink.go, so an empty implementation is required +func (m *pulsarTopicManager) GetPartitionNum(ctx context.Context, topic string) (int32, error) { + return 0, nil +} + +// CreateTopicAndWaitUntilVisible no need to create first +func (m *pulsarTopicManager) CreateTopicAndWaitUntilVisible(ctx context.Context, topicName string) (int32, error) { + return 0, nil +} + +// Close +func (m *pulsarTopicManager) Close() { +} + +// str2Pointer returns the pointer of the string. +func str2Pointer(str string) *string { + return &str +} diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go new file mode 100644 index 00000000000..3d556193fdb --- /dev/null +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go @@ -0,0 +1,40 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" +) + +// pulsarTopicManager is a manager for pulsar topics. +type pulsarTopicManagerMock struct { + *pulsarTopicManager +} + +// NewMockPulsarTopicManager creates a new topic manager. +func NewMockPulsarTopicManager( + cfg *pulsarConfig.Config, + client pulsar.Client, +) (TopicManager, error) { + mgr := &pulsarTopicManagerMock{} + return mgr, nil +} + +// GetPartitionNum spend more time,but no use. +// mock 3 partitions +func (m *pulsarTopicManagerMock) GetPartitionNum(ctx context.Context, topic string) (int32, error) { + return 3, nil +} diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager_test.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager_test.go new file mode 100644 index 00000000000..b9364a4c10e --- /dev/null +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "context" + "net/url" + "testing" + + "github.com/pingcap/tiflow/pkg/config" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "github.com/stretchr/testify/require" +) + +func newPulsarConfig(t *testing.T) (*pulsarConfig.Config, *url.URL) { + sinkURL := "pulsar://127.0.0.1:6650/persistent://public/default/test?" + + "protocol=canal-json&pulsar-version=v2.10.0&enable-tidb-extension=true&" + + "authentication-token=eyJhbGciOiJSUzIxxxxxxxxxxxxxxxx" + + sinkURI, err := url.Parse(sinkURL) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI)) + require.NoError(t, err) + c, err := pulsarConfig.NewPulsarConfig(sinkURI) + require.NoError(t, err) + return c, sinkURI +} + +func TestGetPartitionNumMock(t *testing.T) { + t.Parallel() + + cfg, _ := newPulsarConfig(t) + + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink = &config.SinkConfig{ + Protocol: str2Pointer(cfg.Protocol.String()), + } + + ctx := context.Background() + + ctx = context.WithValue(ctx, "testing.T", t) + pm, err := NewMockPulsarTopicManager(cfg, nil) + require.NoError(t, err) + require.NotNil(t, pm) + + pn, err := pm.GetPartitionNum(ctx, "test") + require.NoError(t, err) + require.Equal(t, int32(3), pn) +} diff --git a/docs/design/2023-07-04-ticdc-pulsar-sink.md b/docs/design/2023-07-04-ticdc-pulsar-sink.md index 4d963ad81f6..e24544e4c29 100644 --- a/docs/design/2023-07-04-ticdc-pulsar-sink.md +++ b/docs/design/2023-07-04-ticdc-pulsar-sink.md @@ -39,18 +39,15 @@ ## Introduction - -This document provides a complete design on implementing pulsar sink for TiCDC. +This document provides a complete design on implementing pulsar sink for TiCDC. The pulsar sink is used to distribute the DML change records, and DDL events generated by TiCDC. - ## Motivation or Background Incorporating Pulsar into Ticdc is for the purpose of expanding the downstream MQ distribution channels. Users want to output TiDB events to Pulsar, because they can reuse machines from Pulsar with others, the pulsar easily expanded horizontally etc. - ## Detailed Design #### Protocol-support @@ -58,13 +55,14 @@ the pulsar easily expanded horizontally etc. In order to maintain the consistency of the middleware of the MQ class, we give priority support some of the protocols supported by Kafka: -__CanalJSON__ +**CanalJSON** -__Canal__ +**Canal** -__Maxwell__ +**Maxwell** CanalJSON protocol sample: + ``` for more information, please refer to: https://docs.pingcap.com/tidb/dev/ticdc-canal-json @@ -94,17 +92,19 @@ for more information, please refer to: https://docs.pingcap.com/tidb/dev/ticdc-c - Ensure that there are no incomplete inner-table transactions in Pulsar. - Ensure that every event must be sent to Pulsar at least once. - #### Pulsar Client + ##### Information -https://github.com/apache/pulsar-client-go Version: v0.10.0 -Requirement Golang 1.18+ +https://github.com/apache/pulsar-client-go Version: v0.10.0 +Requirement Golang 1.18+ ##### Different from Kafka + The difference between pulsar and kafka is that the producer in the client of pulsar must be bound to a topic, but kafka does not. ##### Pulsar Client Config + ```api type ClientOptions struct { // Configure the service URL for the Pulsar service. @@ -136,36 +136,37 @@ type ClientOptions struct { **Main Note:** -- URL: like pulsar://127.0.0.1:6650 +- URL: like pulsar://127.0.0.1:6650 - Authentication: We only support token/token-from-file/account-with-password. - MetricsRegisterer: We initialize pulsar MetricsRegisterer with `prometheus.NewRegistry()` from tiflow project `cdc/server/metrics.go` - #### Pulsar Producer + ```go type ProducerOptions struct { // Topic specifies the topic this producer will be publishing on. // This argument is required when constructing the producer. Topic string - + // Properties specifies a set of application defined properties for the producer. // This properties will be visible in the topic stats Properties map[string]string - + //……… others } ``` -- Payload: is carrying real binary data + +- Payload: is carrying real binary data - Value: Value and payload is mutually exclusive, Value for schema message. - Key: The optional key associated with the message (particularly useful for things like topic compaction) **We must cache all producers to the client for different topics Every changefeed of pulsar client have a producer map. Type as `map[string]pulsar.Producer`, the key is topic name, value is producer of pulsar client.** - ##### Producer Message: + ```go type ProducerMessage struct { // Payload for the message @@ -181,7 +182,7 @@ OrderingKey string } ``` -- Payload: is carrying real binary data +- Payload: is carrying real binary data - Value: Value and payload is mutually exclusive, Value for schema message. - Key: The optional key associated with the message (particularly useful for things like topic compaction) - OrderingKey: OrderingKey sets the ordering key of the message.Same as Key, so we do not use it. @@ -195,7 +196,7 @@ OrderingKey string #### Pulsar Route Rule - We support route events to different partitions by changefeed config dispatchers, -refer to `Pulsar Topic Rule` + refer to `Pulsar Topic Rule` - You can set the message-key to any characters. We do not set any characters default, the event will be sent to the partition by hash algorithm. #### Pulsar Topic Rule @@ -203,8 +204,8 @@ refer to `Pulsar Topic Rule` ```yaml dispatchers = [ {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1",partition="table" }, - {matcher = ['test6.*'],topic = "Topic expression 2",partition="ts" } -] + {matcher = ['test6.*'],topic = "Topic expression 2",partition="ts" } +] The topic expression syntax is legal if it meets the following conditions: 1.{schema} and {table} respectively identify the database name and table name that need to be matched, and are required fields. Pulsar support "(persistent|non-persistent)://tenant/namespace/topic" as topic name。 @@ -218,35 +219,43 @@ The topic expression syntax is legal if it meets the following conditions: ``` - #### Produce DDL Event We implement the DDLProducer interface ##### SyncSendMessage Method + It will find a producer by topic name. Send the event to pulsar. Report some metrics . `partitionNum` is not used, because the pulsar server supports set partition num only. + ##### SyncBroadcastMessage Method + It do nothing + ##### Close Method -Close every producers +Close every producers ##### Produce DML Event + We implement the DMLProducer interface + ##### AsyncSendMessage Method + It will find a producer by topic name. Set a callback function to the pulsar producer client. Send the event to pulsar. Report some metrics. `partitionNum` is not used, because the pulsar server supports set partition num only. + ##### Close Method + Close every producers #### Pulsar Metrics - + Pulsar client support metric of `prometheus.Registry` Following are pulsar client metrics @@ -281,17 +290,19 @@ pulsar_client_rpc_count ``` #### User Interface + **Sink-URI** When creating a changefeed, the user can specify the sink-uri like this: cdc cli changefeed create --sink-uri="${scheme}://${address}/${topic-name}?protocol=${protocol}&pulsar-version=${pulsar-version}&authentication-token=${authentication-token} Example: + ``` cdc cli changefeed create --server=http://127.0.0.1:8300 --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/test?protocol=canal-json&pulsar-version=v2.10.0&authentication-token=eyJhbGciOiJSUzIxxxxxxxxxxxxxxxxx" ``` - + ## Test Design Pulsar sink is a new feature, For tests, we focus on the functional tests, scenario tests and benchmark. diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 7c9c1779ef6..8c62f04d3ec 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1620,6 +1620,24 @@ var doc = `{ } } }, + "config.PulsarConfig": { + "type": "object", + "properties": { + "pulsar-producer-cache-size": { + "description": "PulsarProducerCacheSize is the size of the cache of pulsar producers", + "type": "integer" + }, + "tls-certificate-path": { + "type": "string" + }, + "tls-private-key-path": { + "type": "string" + }, + "tls-trust-certs-file-path": { + "type": "string" + } + } + }, "config.SinkConfig": { "type": "object", "properties": { @@ -1682,6 +1700,9 @@ var doc = `{ "description": "Protocol is NOT available when the downstream is DB.", "type": "string" }, + "pulsar-config": { + "$ref": "#/definitions/config.PulsarConfig" + }, "safe-mode": { "description": "SafeMode is only available when the downstream is DB.", "type": "boolean" diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 2b7ab30dd28..b020fec566b 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1601,6 +1601,24 @@ } } }, + "config.PulsarConfig": { + "type": "object", + "properties": { + "pulsar-producer-cache-size": { + "description": "PulsarProducerCacheSize is the size of the cache of pulsar producers", + "type": "integer" + }, + "tls-certificate-path": { + "type": "string" + }, + "tls-private-key-path": { + "type": "string" + }, + "tls-trust-certs-file-path": { + "type": "string" + } + } + }, "config.SinkConfig": { "type": "object", "properties": { @@ -1663,6 +1681,9 @@ "description": "Protocol is NOT available when the downstream is DB.", "type": "string" }, + "pulsar-config": { + "$ref": "#/definitions/config.PulsarConfig" + }, "safe-mode": { "description": "SafeMode is only available when the downstream is DB.", "type": "boolean" diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 8b9b2402147..be2553eac38 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -186,6 +186,18 @@ definitions: write-timeout: type: string type: object + config.PulsarConfig: + properties: + pulsar-producer-cache-size: + description: PulsarProducerCacheSize is the size of the cache of pulsar producers + type: integer + tls-certificate-path: + type: string + tls-private-key-path: + type: string + tls-trust-certs-file-path: + type: string + type: object config.SinkConfig: properties: cloud-storage-config: @@ -236,6 +248,8 @@ definitions: protocol: description: Protocol is NOT available when the downstream is DB. type: string + pulsar-config: + $ref: '#/definitions/config.PulsarConfig' safe-mode: description: SafeMode is only available when the downstream is DB. type: boolean diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 1578e365da9..5fcb266c541 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -155,6 +155,7 @@ type SinkConfig struct { // SafeMode is only available when the downstream is DB. SafeMode *bool `toml:"safe-mode" json:"safe-mode,omitempty"` KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"` + PulsarConfig *PulsarConfig `toml:"pulsar-config" json:"pulsar-config,omitempty"` MySQLConfig *MySQLConfig `toml:"mysql-config" json:"mysql-config,omitempty"` CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"` } @@ -326,6 +327,16 @@ type KafkaConfig struct { LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` } +// PulsarConfig pulsar sink configuration +type PulsarConfig struct { + TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"` + TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"` + TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` + + // PulsarProducerCacheSize is the size of the cache of pulsar producers + PulsarProducerCacheSize *int32 `toml:"pulsar-producer-cache-size" json:"pulsar-producer-cache-size,omitempty"` +} + // MySQLConfig represents a MySQL sink configuration type MySQLConfig struct { WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` diff --git a/pkg/sink/codec/common/message.go b/pkg/sink/codec/common/message.go index 87f356a7d4c..ee6597f0b75 100644 --- a/pkg/sink/codec/common/message.go +++ b/pkg/sink/codec/common/message.go @@ -51,6 +51,9 @@ type Message struct { ClaimCheckFileName string Event *model.RowChangedEvent + + // PartitionKey for pulsar, route messages to one or different partitions + PartitionKey *string } // Length returns the expected size of the Kafka message @@ -80,6 +83,30 @@ func (m *Message) IncRowsCount() { m.rowsCount++ } +// GetSchema returns schema string +func (m *Message) GetSchema() string { + if m.Schema == nil { + return "" + } + return *m.Schema +} + +// GetTable returns the Table string +func (m *Message) GetTable() string { + if m.Table == nil { + return "" + } + return *m.Table +} + +// GetPartitionKey returns the GetPartitionKey +func (m *Message) GetPartitionKey() string { + if m.PartitionKey == nil { + return "" + } + return *m.PartitionKey +} + // NewDDLMsg creates a DDL message. func NewDDLMsg(proto config.Protocol, key, value []byte, event *model.DDLEvent) *Message { return NewMsg( diff --git a/pkg/sink/pulsar/config.go b/pkg/sink/pulsar/config.go index 0ee24a23609..e19cf6011d5 100644 --- a/pkg/sink/pulsar/config.go +++ b/pkg/sink/pulsar/config.go @@ -21,6 +21,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -72,12 +73,6 @@ const ( // Protocol The message protocol type input to pulsar, pulsar currently supports canal-json, canal, maxwell Protocol = "protocol" - // TLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key - TLSCertificatePath = "tls-certificate-path" - - // TLSPrivateKeyPath TLS private key - TLSPrivateKeyPath = "tls-private-key-path" - // OAuth2IssuerURL the URL of the authorization server. OAuth2IssuerURL = "oauth2-issuer-url" // OAuth2Audience the URL of the resource server. @@ -86,6 +81,14 @@ const ( OAuth2PrivateKey = "oauth2-private-key" // OAuth2ClientID the client ID of the application. OAuth2ClientID = "oauth2-client-id" + // OAuth2Scope scope + OAuth2Scope = "auth2-scope" + + // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key + AuthTLSCertificatePath = "auth-tls-certificate-path" + + // AuthTLSPrivateKeyPath auth TLS private key + AuthTLSPrivateKeyPath = "auth-tls-private-key-path" ) // sink config default Value @@ -157,10 +160,10 @@ type Config struct { // BasicPassword with account BasicPassword string - // TLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key - TLSCertificatePath string - // TLSPrivateKeyPath private key - TLSPrivateKeyPath string + // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key + AuthTLSCertificatePath string + // AuthTLSPrivateKeyPath private key + AuthTLSPrivateKeyPath string // Oauth2 include oauth2-issuer-url oauth2-audience oauth2-private-key oauth2-client-id // and 'type' always is 'client_credentials' @@ -202,28 +205,30 @@ func (c *Config) checkSinkURI(sinkURI *url.URL) error { func (c *Config) applyOAuth(params url.Values) { // Go client use Oauth2 authentication // https://pulsar.apache.org/docs/2.10.x/security-oauth2/#authentication-types + // pulsar client now support type as client_credentials only s := params.Get(OAuth2IssuerURL) if len(s) > 0 { - c.OAuth2["issuerUrl"] = s + c.OAuth2[auth.ConfigParamIssuerURL] = s } - s = params.Get(OAuth2Audience) if len(s) > 0 { - c.OAuth2["audience"] = s + c.OAuth2[auth.ConfigParamAudience] = s + } + s = params.Get(OAuth2Scope) + if len(s) > 0 { + c.OAuth2[auth.ConfigParamScope] = s } - s = params.Get(OAuth2PrivateKey) if len(s) > 0 { - c.OAuth2["privateKey"] = s + c.OAuth2[auth.ConfigParamKeyFile] = s } - s = params.Get(OAuth2ClientID) if len(s) > 0 { - c.OAuth2["clientId"] = s + c.OAuth2[auth.ConfigParamClientID] = s } if len(c.OAuth2) >= 4 { - c.OAuth2["type"] = "client_credentials" + c.OAuth2[auth.ConfigParamType] = auth.ConfigParamTypeClientCredentials } else { c.OAuth2 = make(map[string]string) } @@ -335,14 +340,14 @@ func (c *Config) Apply(sinkURI *url.URL) error { c.BasicPassword = s } - s = params.Get(TLSCertificatePath) + s = params.Get(AuthTLSCertificatePath) if len(s) > 0 { - c.TLSCertificatePath = s + c.AuthTLSCertificatePath = s } - s = params.Get(TLSPrivateKeyPath) + s = params.Get(AuthTLSPrivateKeyPath) if len(s) > 0 { - c.TLSPrivateKeyPath = s + c.AuthTLSPrivateKeyPath = s } c.applyOAuth(params) diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go new file mode 100644 index 00000000000..cfefc6ec1ba --- /dev/null +++ b/pkg/sink/pulsar/factory.go @@ -0,0 +1,89 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulsar + +import ( + "fmt" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "go.uber.org/zap" +) + +// FactoryCreator defines the type of factory creator. +type FactoryCreator func(config *Config, changefeedID model.ChangeFeedID, sinkConfig *config.SinkConfig) (pulsar.Client, error) + +// NewCreatorFactory returns a factory implemented based on kafka-go +func NewCreatorFactory(config *Config, changefeedID model.ChangeFeedID, sinkConfig *config.SinkConfig) (pulsar.Client, error) { + co := pulsar.ClientOptions{ + URL: config.URL, + CustomMetricsLabels: map[string]string{ + "changefeed": changefeedID.ID, + "namespace": changefeedID.Namespace, + }, + ConnectionTimeout: config.ConnectionTimeout, + OperationTimeout: config.OperationTimeout, + } + var err error + + co.Authentication, err = setupAuthentication(config) + if err != nil { + log.Error("setup pulsar authentication fail", zap.Error(err)) + return nil, err + } + + // pulsar TLS config + if sinkConfig.PulsarConfig != nil { + sinkPulsar := sinkConfig.PulsarConfig + if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil && + sinkPulsar.TLSTrustCertsFilePath != nil { + co.TLSCertificateFile = *sinkPulsar.TLSCertificateFile + co.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath + co.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath + } + } + + pulsarClient, err := pulsar.NewClient(co) + if err != nil { + log.Error("cannot connect to pulsar", zap.Error(err)) + return nil, err + } + return pulsarClient, nil +} + +// setupAuthentication sets up authentication for pulsar client +func setupAuthentication(config *Config) (pulsar.Authentication, error) { + if len(config.AuthenticationToken) > 0 { + return pulsar.NewAuthenticationToken(config.AuthenticationToken), nil + } else if len(config.TokenFromFile) > 0 { + return pulsar.NewAuthenticationTokenFromFile(config.TokenFromFile), nil + } else if len(config.BasicUserName) > 0 && len(config.BasicPassword) > 0 { + return pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) + } else if len(config.OAuth2) >= 5 { + return pulsar.NewAuthenticationOAuth2(config.OAuth2), nil + } else if len(config.AuthTLSCertificatePath) > 0 && len(config.AuthTLSPrivateKeyPath) > 0 { + return pulsar.NewAuthenticationTLS(config.AuthTLSCertificatePath, config.AuthTLSPrivateKeyPath), nil + } + return nil, fmt.Errorf("no authentication method found") +} + +// NewMockCreatorFactory returns a factory implemented based on kafka-go +func NewMockCreatorFactory(config *Config, changefeedID model.ChangeFeedID, + sinkConfig *config.SinkConfig, +) (pulsar.Client, error) { + log.Info("mock pulsar client factory created", zap.Any("changfeedID", changefeedID)) + return nil, nil +} diff --git a/pkg/sink/sink_type.go b/pkg/sink/sink_type.go index 18b7e18cf8e..d257e87521e 100644 --- a/pkg/sink/sink_type.go +++ b/pkg/sink/sink_type.go @@ -65,6 +65,8 @@ const ( CloudStorageNoopScheme = "noop" // PulsarScheme indicates the scheme is pulsar PulsarScheme = "pulsar" + // PulsarSSLScheme indicates the scheme is pulsar+ssl + PulsarSSLScheme = "pulsar+ssl" ) // IsMQScheme returns true if the scheme belong to mq scheme.