diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 530e078ee58..2e674cea5b7 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -112,7 +112,7 @@ func New( bs := blackhole.NewDMLSink() s.rowSink = bs s.category = CategoryBlackhole - case sink.PulsarScheme: + case sink.PulsarScheme, sink.PulsarSSLScheme: mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh, manager.NewPulsarTopicManager, pulsarConfig.NewCreatorFactory, dmlproducer.NewPulsarDMLProducer) diff --git a/errors.toml b/errors.toml index 7179ba0949a..700d45917b5 100755 --- a/errors.toml +++ b/errors.toml @@ -758,7 +758,7 @@ invalid pulsar client ID '%s' ["CDC:ErrPulsarInvalidConfig"] error = ''' -pulsar config invalid +pulsar config invalid %s ''' ["CDC:ErrPulsarInvalidPartitionNum"] diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 2d01a13fb02..7d3348e6cd0 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -522,8 +522,8 @@ func (o *OAuth2) validate() (err error) { // PulsarConfig pulsar sink configuration type PulsarConfig struct { - TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"` - TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"` + TLSKeyFilePath *string `toml:"tls-key-file-path" json:"tls-key-file-path,omitempty"` + TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-certificate-file,omitempty"` TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` // PulsarProducerCacheSize is the size of the cache of pulsar producers @@ -603,11 +603,7 @@ func (c *PulsarConfig) validate() (err error) { if err = c.OAuth2.validate(); err != nil { return err } - if c.TLSTrustCertsFilePath == nil { - return fmt.Errorf("oauth2 is not empty but tls-trust-certs-file-path is empty") - } } - return nil } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index caf5fc7f864..a7255cabd8f 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -246,7 +246,7 @@ var ( errors.RFCCodeText("CDC:ErrPulsarInvalidVersion"), ) ErrPulsarInvalidConfig = errors.Normalize( - "pulsar config invalid", + "pulsar config invalid %s", errors.RFCCodeText("CDC:ErrPulsarInvalidConfig"), ) ErrPulsarCreateTopic = errors.Normalize( diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go index 3f7c1e88e88..7b8f7aeefdd 100644 --- a/pkg/sink/pulsar/factory.go +++ b/pkg/sink/pulsar/factory.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/metrics/mq" "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) @@ -40,22 +41,41 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee MetricsRegisterer: mq.GetMetricRegistry(), Logger: NewPulsarLogger(log.L()), } - var err error - option.Authentication, err = setupAuthentication(config) + var err error + // ismTLSAuthentication is true if it is mTLS authentication + var ismTLSAuthentication bool + ismTLSAuthentication, option.Authentication, err = setupAuthentication(config) if err != nil { log.Error("setup pulsar authentication fail", zap.Error(err)) return nil, err } + // When mTLS authentication is enabled, trust certs file path is required. + if ismTLSAuthentication { + if sinkConfig.PulsarConfig != nil && sinkConfig.PulsarConfig.TLSTrustCertsFilePath != nil { + option.TLSTrustCertsFilePath = *sinkConfig.PulsarConfig.TLSTrustCertsFilePath + } else { + return nil, cerror.ErrPulsarInvalidConfig. + GenWithStackByArgs("pulsar tls trust certs file path is not set when mTLS authentication is enabled") + } + } - // pulsar TLS config + // Check and set pulsar TLS config if sinkConfig.PulsarConfig != nil { sinkPulsar := sinkConfig.PulsarConfig - if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil && - sinkPulsar.TLSTrustCertsFilePath != nil { + // Note(dongmen): If pulsar cluster set `tlsRequireTrustedClientCertOnConnect=false`, + // provide the TLS trust certificate file is enough. + if sinkPulsar.TLSTrustCertsFilePath != nil { + option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath + log.Info("pulsar tls trust certificate file is set, tls encryption enable") + } + // Note(dongmen): If pulsar cluster set `tlsRequireTrustedClientCertOnConnect=true`, + // then the client must set the TLS certificate and key. + // Otherwise, a error like "remote error: tls: certificate required" will be returned. + if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil { option.TLSCertificateFile = *sinkPulsar.TLSCertificateFile option.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath - option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath + log.Info("pulsar tls certificate file and tls key file path is set, tls ") } } @@ -68,15 +88,21 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee } // setupAuthentication sets up authentication for pulsar client -func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, error) { +// returns true if authentication is tls authentication , and the authentication object +func setupAuthentication(config *config.PulsarConfig) (bool, pulsar.Authentication, error) { if config.AuthenticationToken != nil { - return pulsar.NewAuthenticationToken(*config.AuthenticationToken), nil + log.Info("pulsar token authentication is set, use toke authentication") + return false, pulsar.NewAuthenticationToken(*config.AuthenticationToken), nil } if config.TokenFromFile != nil { - return pulsar.NewAuthenticationTokenFromFile(*config.TokenFromFile), nil + log.Info("pulsar token from file authentication is set, use toke authentication") + res := pulsar.NewAuthenticationTokenFromFile(*config.TokenFromFile) + return false, res, nil } if config.BasicUserName != nil && config.BasicPassword != nil { - return pulsar.NewAuthenticationBasic(*config.BasicUserName, *config.BasicPassword) + log.Info("pulsar basic authentication is set, use basic authentication") + res, err := pulsar.NewAuthenticationBasic(*config.BasicUserName, *config.BasicPassword) + return false, res, err } if config.OAuth2 != nil { oauth2 := map[string]string{ @@ -87,13 +113,15 @@ func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, er auth.ConfigParamClientID: config.OAuth2.OAuth2ClientID, auth.ConfigParamType: auth.ConfigParamTypeClientCredentials, } - return pulsar.NewAuthenticationOAuth2(oauth2), nil + log.Info("pulsar oauth2 authentication is set, use oauth2 authentication") + return false, pulsar.NewAuthenticationOAuth2(oauth2), nil } if config.AuthTLSCertificatePath != nil && config.AuthTLSPrivateKeyPath != nil { - return pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil + log.Info("pulsar mTLS authentication is set, use mTLS authentication") + return true, pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil } log.Info("No authentication configured for pulsar client") - return nil, nil + return false, nil, nil } // NewMockCreatorFactory returns a factory implemented based on kafka-go