diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 530e078ee58..2e674cea5b7 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -112,7 +112,7 @@ func New( bs := blackhole.NewDMLSink() s.rowSink = bs s.category = CategoryBlackhole - case sink.PulsarScheme: + case sink.PulsarScheme, sink.PulsarSSLScheme: mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh, manager.NewPulsarTopicManager, pulsarConfig.NewCreatorFactory, dmlproducer.NewPulsarDMLProducer) diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go index 475507d301f..86eeca89ce0 100644 --- a/cmd/pulsar-consumer/main.go +++ b/cmd/pulsar-consumer/main.go @@ -244,7 +244,7 @@ func NewPulsarConsumer(option *ConsumerOption) (pulsar.Consumer, pulsar.Client) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: pulsarURL, - Logger: tpulsar.NewPulsarLogger(), + Logger: tpulsar.NewPulsarLogger(log.L()), }) if err != nil { log.Fatal("can't create pulsar client: %v", zap.Error(err)) diff --git a/errors.toml b/errors.toml index 1abb08b2b16..9f454de2dfc 100755 --- a/errors.toml +++ b/errors.toml @@ -753,7 +753,7 @@ invalid pulsar client ID '%s' ["CDC:ErrPulsarInvalidConfig"] error = ''' -pulsar config invalid +pulsar config invalid %s ''' ["CDC:ErrPulsarInvalidPartitionNum"] diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 80f4d87731a..9aa6ba195e4 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -470,8 +470,8 @@ func (o *OAuth2) validate() (err error) { // PulsarConfig pulsar sink configuration type PulsarConfig struct { - TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"` - TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"` + TLSKeyFilePath *string `toml:"tls-key-file-path" json:"tls-key-file-path,omitempty"` + TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-certificate-file,omitempty"` TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` // PulsarProducerCacheSize is the size of the cache of pulsar producers @@ -551,11 +551,7 @@ func (c *PulsarConfig) validate() (err error) { if err = c.OAuth2.validate(); err != nil { return err } - if c.TLSTrustCertsFilePath == nil { - return fmt.Errorf("oauth2 is not empty but tls-trust-certs-file-path is empty") - } } - return nil } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index c8882a76651..69bb2bcc90d 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -246,7 +246,7 @@ var ( errors.RFCCodeText("CDC:ErrPulsarInvalidVersion"), ) ErrPulsarInvalidConfig = errors.Normalize( - "pulsar config invalid", + "pulsar config invalid %s", errors.RFCCodeText("CDC:ErrPulsarInvalidConfig"), ) ErrPulsarCreateTopic = errors.Normalize( diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go index 979964203aa..7b8f7aeefdd 100644 --- a/pkg/sink/pulsar/factory.go +++ b/pkg/sink/pulsar/factory.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/metrics/mq" "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) @@ -38,24 +39,43 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee OperationTimeout: config.OperationTimeout.Duration(), // add pulsar default metrics MetricsRegisterer: mq.GetMetricRegistry(), - Logger: NewPulsarLogger(), + Logger: NewPulsarLogger(log.L()), } - var err error - option.Authentication, err = setupAuthentication(config) + var err error + // ismTLSAuthentication is true if it is mTLS authentication + var ismTLSAuthentication bool + ismTLSAuthentication, option.Authentication, err = setupAuthentication(config) if err != nil { log.Error("setup pulsar authentication fail", zap.Error(err)) return nil, err } + // When mTLS authentication is enabled, trust certs file path is required. + if ismTLSAuthentication { + if sinkConfig.PulsarConfig != nil && sinkConfig.PulsarConfig.TLSTrustCertsFilePath != nil { + option.TLSTrustCertsFilePath = *sinkConfig.PulsarConfig.TLSTrustCertsFilePath + } else { + return nil, cerror.ErrPulsarInvalidConfig. + GenWithStackByArgs("pulsar tls trust certs file path is not set when mTLS authentication is enabled") + } + } - // pulsar TLS config + // Check and set pulsar TLS config if sinkConfig.PulsarConfig != nil { sinkPulsar := sinkConfig.PulsarConfig - if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil && - sinkPulsar.TLSTrustCertsFilePath != nil { + // Note(dongmen): If pulsar cluster set `tlsRequireTrustedClientCertOnConnect=false`, + // provide the TLS trust certificate file is enough. + if sinkPulsar.TLSTrustCertsFilePath != nil { + option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath + log.Info("pulsar tls trust certificate file is set, tls encryption enable") + } + // Note(dongmen): If pulsar cluster set `tlsRequireTrustedClientCertOnConnect=true`, + // then the client must set the TLS certificate and key. + // Otherwise, a error like "remote error: tls: certificate required" will be returned. + if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil { option.TLSCertificateFile = *sinkPulsar.TLSCertificateFile option.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath - option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath + log.Info("pulsar tls certificate file and tls key file path is set, tls ") } } @@ -68,15 +88,21 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee } // setupAuthentication sets up authentication for pulsar client -func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, error) { +// returns true if authentication is tls authentication , and the authentication object +func setupAuthentication(config *config.PulsarConfig) (bool, pulsar.Authentication, error) { if config.AuthenticationToken != nil { - return pulsar.NewAuthenticationToken(*config.AuthenticationToken), nil + log.Info("pulsar token authentication is set, use toke authentication") + return false, pulsar.NewAuthenticationToken(*config.AuthenticationToken), nil } if config.TokenFromFile != nil { - return pulsar.NewAuthenticationTokenFromFile(*config.TokenFromFile), nil + log.Info("pulsar token from file authentication is set, use toke authentication") + res := pulsar.NewAuthenticationTokenFromFile(*config.TokenFromFile) + return false, res, nil } if config.BasicUserName != nil && config.BasicPassword != nil { - return pulsar.NewAuthenticationBasic(*config.BasicUserName, *config.BasicPassword) + log.Info("pulsar basic authentication is set, use basic authentication") + res, err := pulsar.NewAuthenticationBasic(*config.BasicUserName, *config.BasicPassword) + return false, res, err } if config.OAuth2 != nil { oauth2 := map[string]string{ @@ -87,13 +113,15 @@ func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, er auth.ConfigParamClientID: config.OAuth2.OAuth2ClientID, auth.ConfigParamType: auth.ConfigParamTypeClientCredentials, } - return pulsar.NewAuthenticationOAuth2(oauth2), nil + log.Info("pulsar oauth2 authentication is set, use oauth2 authentication") + return false, pulsar.NewAuthenticationOAuth2(oauth2), nil } if config.AuthTLSCertificatePath != nil && config.AuthTLSPrivateKeyPath != nil { - return pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil + log.Info("pulsar mTLS authentication is set, use mTLS authentication") + return true, pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil } log.Info("No authentication configured for pulsar client") - return nil, nil + return false, nil, nil } // NewMockCreatorFactory returns a factory implemented based on kafka-go diff --git a/pkg/sink/pulsar/logger.go b/pkg/sink/pulsar/logger.go index 835f0830199..2b07ece570c 100644 --- a/pkg/sink/pulsar/logger.go +++ b/pkg/sink/pulsar/logger.go @@ -15,7 +15,6 @@ package pulsar import ( "github.com/apache/pulsar-client-go/pulsar/log" - plog "github.com/pingcap/log" "go.uber.org/zap" ) @@ -25,36 +24,27 @@ type Logger struct { } // SubLogger sub -func (p *Logger) SubLogger(fields log.Fields) log.Logger { - subLogger := p.zapLogger - for k, v := range fields { - subLogger = subLogger.With(zap.Any(k, v)) +func (p *Logger) SubLogger(pulsarFields log.Fields) log.Logger { + zapFields := make([]zap.Field, 0, len(pulsarFields)) + for k, v := range pulsarFields { + zapFields = append(zapFields, zap.Any(k, v)) } - return &Logger{subLogger} + return &Logger{p.zapLogger.With(zapFields...)} } // WithFields with fields func (p *Logger) WithFields(fields log.Fields) log.Entry { - return &LoggerEntry{ - fields: fields, - logger: p.zapLogger, - } + return p.SubLogger(fields) } // WithField with field func (p *Logger) WithField(name string, value interface{}) log.Entry { - return &LoggerEntry{ - fields: log.Fields{name: value}, - logger: p.zapLogger, - } + return &Logger{p.zapLogger.With(zap.Any(name, value))} } // WithError error func (p *Logger) WithError(err error) log.Entry { - return &LoggerEntry{ - fields: log.Fields{"error": err}, - logger: p.zapLogger, - } + return &Logger{p.zapLogger.With(zap.Error(err))} } // Debug debug @@ -98,66 +88,8 @@ func (p *Logger) Errorf(format string, args ...interface{}) { } // NewPulsarLogger new pulsar logger -func NewPulsarLogger() *Logger { +func NewPulsarLogger(base *zap.Logger) *Logger { return &Logger{ - zapLogger: plog.L(), + zapLogger: base.WithOptions(zap.AddCallerSkip(1)), } } - -// LoggerEntry pulsar logger entry -type LoggerEntry struct { - fields log.Fields - logger *zap.Logger -} - -// WithFields with fields -func (p *LoggerEntry) WithFields(fields log.Fields) log.Entry { - p.fields = fields - return p -} - -// WithField with field -func (p *LoggerEntry) WithField(name string, value interface{}) log.Entry { - p.fields[name] = value - return p -} - -// Debug debug -func (p *LoggerEntry) Debug(args ...interface{}) { - p.logger.Sugar().Debug(args...) -} - -// Info info -func (p *LoggerEntry) Info(args ...interface{}) { - p.logger.Sugar().Info(args...) -} - -// Warn warn -func (p *LoggerEntry) Warn(args ...interface{}) { - p.logger.Sugar().Warn(args...) -} - -// Error error -func (p *LoggerEntry) Error(args ...interface{}) { - p.logger.Sugar().Error(args...) -} - -// Debugf debugf -func (p *LoggerEntry) Debugf(format string, args ...interface{}) { - p.logger.Sugar().Debugf(format, args...) -} - -// Infof infof -func (p *LoggerEntry) Infof(format string, args ...interface{}) { - p.logger.Sugar().Infof(format, args...) -} - -// Warnf warnf -func (p *LoggerEntry) Warnf(format string, args ...interface{}) { - p.logger.Sugar().Warnf(format, args...) -} - -// Errorf errorf -func (p *LoggerEntry) Errorf(format string, args ...interface{}) { - p.logger.Sugar().Errorf(format, args...) -} diff --git a/pkg/sink/pulsar/logger_test.go b/pkg/sink/pulsar/logger_test.go new file mode 100644 index 00000000000..952b6f4e3d5 --- /dev/null +++ b/pkg/sink/pulsar/logger_test.go @@ -0,0 +1,92 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulsar + +import ( + "errors" + "runtime" + "testing" + + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" +) + +// TestPulsarLog ensures Pulsar's use of our logger behaves as expected of a typical zap logger: +// 1. all fields injected via .With*() are present in the final log +// 2. the [file:line] points to the actual call site rather than `logger.go`. +func TestPulsarLog(t *testing.T) { + core, observedLogs := observer.New(zapcore.DebugLevel) + logger := NewPulsarLogger(zap.New(core).WithOptions(zap.AddCaller())) + + pc, file, line, ok := runtime.Caller(0) + assert.True(t, ok) + functionName := runtime.FuncForPC(pc).Name() + + logger.Infof("1 + 2 = %d", 1+2) + logger.WithError(errors.ErrUnsupported).Warn("3", "+", "4", "=", 3+4) + logger.WithFields(log.Fields{"x": 1234, "y": 9.75}).Info("connected") + + allEntries := observedLogs.AllUntimed() + // we can't reliably test the `pc` address of the caller information. remove them. + for i := range allEntries { + allEntries[i].Caller.PC = 0 + } + + assert.Len(t, allEntries, 3) + assert.Equal(t, allEntries[0:2], []observer.LoggedEntry{ + { + Entry: zapcore.Entry{ + Level: zapcore.InfoLevel, + Message: "1 + 2 = 3", + Caller: zapcore.EntryCaller{ + Defined: true, + File: file, + Line: line + 4, + Function: functionName, + }, + }, + Context: []zap.Field{}, + }, + { + Entry: zapcore.Entry{ + Level: zapcore.WarnLevel, + Message: "3+4=7", + Caller: zapcore.EntryCaller{ + Defined: true, + File: file, + Line: line + 5, + Function: functionName, + }, + }, + Context: []zap.Field{zap.Error(errors.ErrUnsupported)}, + }, + }) + assert.Equal(t, allEntries[2].Entry, zapcore.Entry{ + Level: zapcore.InfoLevel, + Message: "connected", + Caller: zapcore.EntryCaller{ + Defined: true, + File: file, + Line: line + 6, + Function: functionName, + }, + }) + assert.ElementsMatch(t, allEntries[2].Context, []zap.Field{ + zap.Int("x", 1234), + zap.Float64("y", 9.75), + }) +}