Skip to content

Commit

Permalink
pulsar (ticdc): fix tls encryption and oauth2.0 (#10611)
Browse files Browse the repository at this point in the history
close #10602
  • Loading branch information
asddongmen authored Feb 19, 2024
1 parent 3d185a4 commit 2eadc08
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func New(
bs := blackhole.NewDMLSink()
s.rowSink = bs
s.category = CategoryBlackhole
case sink.PulsarScheme:
case sink.PulsarScheme, sink.PulsarSSLScheme:
mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh,
manager.NewPulsarTopicManager,
pulsarConfig.NewCreatorFactory, dmlproducer.NewPulsarDMLProducer)
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ invalid pulsar client ID '%s'

["CDC:ErrPulsarInvalidConfig"]
error = '''
pulsar config invalid
pulsar config invalid %s
'''

["CDC:ErrPulsarInvalidPartitionNum"]
Expand Down
8 changes: 2 additions & 6 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,8 @@ func (o *OAuth2) validate() (err error) {

// PulsarConfig pulsar sink configuration
type PulsarConfig struct {
TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"`
TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"`
TLSKeyFilePath *string `toml:"tls-key-file-path" json:"tls-key-file-path,omitempty"`
TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-certificate-file,omitempty"`
TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"`

// PulsarProducerCacheSize is the size of the cache of pulsar producers
Expand Down Expand Up @@ -603,11 +603,7 @@ func (c *PulsarConfig) validate() (err error) {
if err = c.OAuth2.validate(); err != nil {
return err
}
if c.TLSTrustCertsFilePath == nil {
return fmt.Errorf("oauth2 is not empty but tls-trust-certs-file-path is empty")
}
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ var (
errors.RFCCodeText("CDC:ErrPulsarInvalidVersion"),
)
ErrPulsarInvalidConfig = errors.Normalize(
"pulsar config invalid",
"pulsar config invalid %s",
errors.RFCCodeText("CDC:ErrPulsarInvalidConfig"),
)
ErrPulsarCreateTopic = errors.Normalize(
Expand Down
54 changes: 41 additions & 13 deletions pkg/sink/pulsar/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics/mq"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

Expand All @@ -40,22 +41,41 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee
MetricsRegisterer: mq.GetMetricRegistry(),
Logger: NewPulsarLogger(log.L()),
}
var err error

option.Authentication, err = setupAuthentication(config)
var err error
// ismTLSAuthentication is true if it is mTLS authentication
var ismTLSAuthentication bool
ismTLSAuthentication, option.Authentication, err = setupAuthentication(config)
if err != nil {
log.Error("setup pulsar authentication fail", zap.Error(err))
return nil, err
}
// When mTLS authentication is enabled, trust certs file path is required.
if ismTLSAuthentication {
if sinkConfig.PulsarConfig != nil && sinkConfig.PulsarConfig.TLSTrustCertsFilePath != nil {
option.TLSTrustCertsFilePath = *sinkConfig.PulsarConfig.TLSTrustCertsFilePath
} else {
return nil, cerror.ErrPulsarInvalidConfig.
GenWithStackByArgs("pulsar tls trust certs file path is not set when mTLS authentication is enabled")
}
}

// pulsar TLS config
// Check and set pulsar TLS config
if sinkConfig.PulsarConfig != nil {
sinkPulsar := sinkConfig.PulsarConfig
if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil &&
sinkPulsar.TLSTrustCertsFilePath != nil {
// Note(dongmen): If pulsar cluster set `tlsRequireTrustedClientCertOnConnect=false`,
// provide the TLS trust certificate file is enough.
if sinkPulsar.TLSTrustCertsFilePath != nil {
option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath
log.Info("pulsar tls trust certificate file is set, tls encryption enable")
}
// Note(dongmen): If pulsar cluster set `tlsRequireTrustedClientCertOnConnect=true`,
// then the client must set the TLS certificate and key.
// Otherwise, a error like "remote error: tls: certificate required" will be returned.
if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil {
option.TLSCertificateFile = *sinkPulsar.TLSCertificateFile
option.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath
option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath
log.Info("pulsar tls certificate file and tls key file path is set, tls ")
}
}

Expand All @@ -68,15 +88,21 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee
}

// setupAuthentication sets up authentication for pulsar client
func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, error) {
// returns true if authentication is tls authentication , and the authentication object
func setupAuthentication(config *config.PulsarConfig) (bool, pulsar.Authentication, error) {
if config.AuthenticationToken != nil {
return pulsar.NewAuthenticationToken(*config.AuthenticationToken), nil
log.Info("pulsar token authentication is set, use toke authentication")
return false, pulsar.NewAuthenticationToken(*config.AuthenticationToken), nil
}
if config.TokenFromFile != nil {
return pulsar.NewAuthenticationTokenFromFile(*config.TokenFromFile), nil
log.Info("pulsar token from file authentication is set, use toke authentication")
res := pulsar.NewAuthenticationTokenFromFile(*config.TokenFromFile)
return false, res, nil
}
if config.BasicUserName != nil && config.BasicPassword != nil {
return pulsar.NewAuthenticationBasic(*config.BasicUserName, *config.BasicPassword)
log.Info("pulsar basic authentication is set, use basic authentication")
res, err := pulsar.NewAuthenticationBasic(*config.BasicUserName, *config.BasicPassword)
return false, res, err
}
if config.OAuth2 != nil {
oauth2 := map[string]string{
Expand All @@ -87,13 +113,15 @@ func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, er
auth.ConfigParamClientID: config.OAuth2.OAuth2ClientID,
auth.ConfigParamType: auth.ConfigParamTypeClientCredentials,
}
return pulsar.NewAuthenticationOAuth2(oauth2), nil
log.Info("pulsar oauth2 authentication is set, use oauth2 authentication")
return false, pulsar.NewAuthenticationOAuth2(oauth2), nil
}
if config.AuthTLSCertificatePath != nil && config.AuthTLSPrivateKeyPath != nil {
return pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil
log.Info("pulsar mTLS authentication is set, use mTLS authentication")
return true, pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil
}
log.Info("No authentication configured for pulsar client")
return nil, nil
return false, nil, nil
}

// NewMockCreatorFactory returns a factory implemented based on kafka-go
Expand Down

0 comments on commit 2eadc08

Please sign in to comment.