Skip to content

Commit

Permalink
pulsar (ticdc): fix tls encryption and oauth2.0 (#10611) (#10614)
Browse files Browse the repository at this point in the history
close #10602
  • Loading branch information
ti-chi-bot authored Feb 19, 2024
1 parent 2d9a053 commit b7e10a3
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 102 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func New(
bs := blackhole.NewDMLSink()
s.rowSink = bs
s.category = CategoryBlackhole
case sink.PulsarScheme:
case sink.PulsarScheme, sink.PulsarSSLScheme:
mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh,
manager.NewPulsarTopicManager,
pulsarConfig.NewCreatorFactory, dmlproducer.NewPulsarDMLProducer)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pulsar-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func NewPulsarConsumer(option *ConsumerOption) (pulsar.Consumer, pulsar.Client)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: pulsarURL,
Logger: tpulsar.NewPulsarLogger(),
Logger: tpulsar.NewPulsarLogger(log.L()),
})
if err != nil {
log.Fatal("can't create pulsar client: %v", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ invalid pulsar client ID '%s'

["CDC:ErrPulsarInvalidConfig"]
error = '''
pulsar config invalid
pulsar config invalid %s
'''

["CDC:ErrPulsarInvalidPartitionNum"]
Expand Down
8 changes: 2 additions & 6 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,8 @@ func (o *OAuth2) validate() (err error) {

// PulsarConfig pulsar sink configuration
type PulsarConfig struct {
TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"`
TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"`
TLSKeyFilePath *string `toml:"tls-key-file-path" json:"tls-key-file-path,omitempty"`
TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-certificate-file,omitempty"`
TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"`

// PulsarProducerCacheSize is the size of the cache of pulsar producers
Expand Down Expand Up @@ -551,11 +551,7 @@ func (c *PulsarConfig) validate() (err error) {
if err = c.OAuth2.validate(); err != nil {
return err
}
if c.TLSTrustCertsFilePath == nil {
return fmt.Errorf("oauth2 is not empty but tls-trust-certs-file-path is empty")
}
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ var (
errors.RFCCodeText("CDC:ErrPulsarInvalidVersion"),
)
ErrPulsarInvalidConfig = errors.Normalize(
"pulsar config invalid",
"pulsar config invalid %s",
errors.RFCCodeText("CDC:ErrPulsarInvalidConfig"),
)
ErrPulsarCreateTopic = errors.Normalize(
Expand Down
56 changes: 42 additions & 14 deletions pkg/sink/pulsar/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics/mq"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

Expand All @@ -38,24 +39,43 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee
OperationTimeout: config.OperationTimeout.Duration(),
// add pulsar default metrics
MetricsRegisterer: mq.GetMetricRegistry(),
Logger: NewPulsarLogger(),
Logger: NewPulsarLogger(log.L()),
}
var err error

option.Authentication, err = setupAuthentication(config)
var err error
// ismTLSAuthentication is true if it is mTLS authentication
var ismTLSAuthentication bool
ismTLSAuthentication, option.Authentication, err = setupAuthentication(config)
if err != nil {
log.Error("setup pulsar authentication fail", zap.Error(err))
return nil, err
}
// When mTLS authentication is enabled, trust certs file path is required.
if ismTLSAuthentication {
if sinkConfig.PulsarConfig != nil && sinkConfig.PulsarConfig.TLSTrustCertsFilePath != nil {
option.TLSTrustCertsFilePath = *sinkConfig.PulsarConfig.TLSTrustCertsFilePath
} else {
return nil, cerror.ErrPulsarInvalidConfig.
GenWithStackByArgs("pulsar tls trust certs file path is not set when mTLS authentication is enabled")
}
}

// pulsar TLS config
// Check and set pulsar TLS config
if sinkConfig.PulsarConfig != nil {
sinkPulsar := sinkConfig.PulsarConfig
if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil &&
sinkPulsar.TLSTrustCertsFilePath != nil {
// Note(dongmen): If pulsar cluster set `tlsRequireTrustedClientCertOnConnect=false`,
// provide the TLS trust certificate file is enough.
if sinkPulsar.TLSTrustCertsFilePath != nil {
option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath
log.Info("pulsar tls trust certificate file is set, tls encryption enable")
}
// Note(dongmen): If pulsar cluster set `tlsRequireTrustedClientCertOnConnect=true`,
// then the client must set the TLS certificate and key.
// Otherwise, a error like "remote error: tls: certificate required" will be returned.
if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil {
option.TLSCertificateFile = *sinkPulsar.TLSCertificateFile
option.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath
option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath
log.Info("pulsar tls certificate file and tls key file path is set, tls ")
}
}

Expand All @@ -68,15 +88,21 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee
}

// setupAuthentication sets up authentication for pulsar client
func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, error) {
// returns true if authentication is tls authentication , and the authentication object
func setupAuthentication(config *config.PulsarConfig) (bool, pulsar.Authentication, error) {
if config.AuthenticationToken != nil {
return pulsar.NewAuthenticationToken(*config.AuthenticationToken), nil
log.Info("pulsar token authentication is set, use toke authentication")
return false, pulsar.NewAuthenticationToken(*config.AuthenticationToken), nil
}
if config.TokenFromFile != nil {
return pulsar.NewAuthenticationTokenFromFile(*config.TokenFromFile), nil
log.Info("pulsar token from file authentication is set, use toke authentication")
res := pulsar.NewAuthenticationTokenFromFile(*config.TokenFromFile)
return false, res, nil
}
if config.BasicUserName != nil && config.BasicPassword != nil {
return pulsar.NewAuthenticationBasic(*config.BasicUserName, *config.BasicPassword)
log.Info("pulsar basic authentication is set, use basic authentication")
res, err := pulsar.NewAuthenticationBasic(*config.BasicUserName, *config.BasicPassword)
return false, res, err
}
if config.OAuth2 != nil {
oauth2 := map[string]string{
Expand All @@ -87,13 +113,15 @@ func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, er
auth.ConfigParamClientID: config.OAuth2.OAuth2ClientID,
auth.ConfigParamType: auth.ConfigParamTypeClientCredentials,
}
return pulsar.NewAuthenticationOAuth2(oauth2), nil
log.Info("pulsar oauth2 authentication is set, use oauth2 authentication")
return false, pulsar.NewAuthenticationOAuth2(oauth2), nil
}
if config.AuthTLSCertificatePath != nil && config.AuthTLSPrivateKeyPath != nil {
return pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil
log.Info("pulsar mTLS authentication is set, use mTLS authentication")
return true, pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil
}
log.Info("No authentication configured for pulsar client")
return nil, nil
return false, nil, nil
}

// NewMockCreatorFactory returns a factory implemented based on kafka-go
Expand Down
88 changes: 10 additions & 78 deletions pkg/sink/pulsar/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package pulsar

import (
"github.com/apache/pulsar-client-go/pulsar/log"
plog "github.com/pingcap/log"
"go.uber.org/zap"
)

Expand All @@ -25,36 +24,27 @@ type Logger struct {
}

// SubLogger sub
func (p *Logger) SubLogger(fields log.Fields) log.Logger {
subLogger := p.zapLogger
for k, v := range fields {
subLogger = subLogger.With(zap.Any(k, v))
func (p *Logger) SubLogger(pulsarFields log.Fields) log.Logger {
zapFields := make([]zap.Field, 0, len(pulsarFields))
for k, v := range pulsarFields {
zapFields = append(zapFields, zap.Any(k, v))
}
return &Logger{subLogger}
return &Logger{p.zapLogger.With(zapFields...)}
}

// WithFields with fields
func (p *Logger) WithFields(fields log.Fields) log.Entry {
return &LoggerEntry{
fields: fields,
logger: p.zapLogger,
}
return p.SubLogger(fields)
}

// WithField with field
func (p *Logger) WithField(name string, value interface{}) log.Entry {
return &LoggerEntry{
fields: log.Fields{name: value},
logger: p.zapLogger,
}
return &Logger{p.zapLogger.With(zap.Any(name, value))}
}

// WithError error
func (p *Logger) WithError(err error) log.Entry {
return &LoggerEntry{
fields: log.Fields{"error": err},
logger: p.zapLogger,
}
return &Logger{p.zapLogger.With(zap.Error(err))}
}

// Debug debug
Expand Down Expand Up @@ -98,66 +88,8 @@ func (p *Logger) Errorf(format string, args ...interface{}) {
}

// NewPulsarLogger new pulsar logger
func NewPulsarLogger() *Logger {
func NewPulsarLogger(base *zap.Logger) *Logger {
return &Logger{
zapLogger: plog.L(),
zapLogger: base.WithOptions(zap.AddCallerSkip(1)),
}
}

// LoggerEntry pulsar logger entry
type LoggerEntry struct {
fields log.Fields
logger *zap.Logger
}

// WithFields with fields
func (p *LoggerEntry) WithFields(fields log.Fields) log.Entry {
p.fields = fields
return p
}

// WithField with field
func (p *LoggerEntry) WithField(name string, value interface{}) log.Entry {
p.fields[name] = value
return p
}

// Debug debug
func (p *LoggerEntry) Debug(args ...interface{}) {
p.logger.Sugar().Debug(args...)
}

// Info info
func (p *LoggerEntry) Info(args ...interface{}) {
p.logger.Sugar().Info(args...)
}

// Warn warn
func (p *LoggerEntry) Warn(args ...interface{}) {
p.logger.Sugar().Warn(args...)
}

// Error error
func (p *LoggerEntry) Error(args ...interface{}) {
p.logger.Sugar().Error(args...)
}

// Debugf debugf
func (p *LoggerEntry) Debugf(format string, args ...interface{}) {
p.logger.Sugar().Debugf(format, args...)
}

// Infof infof
func (p *LoggerEntry) Infof(format string, args ...interface{}) {
p.logger.Sugar().Infof(format, args...)
}

// Warnf warnf
func (p *LoggerEntry) Warnf(format string, args ...interface{}) {
p.logger.Sugar().Warnf(format, args...)
}

// Errorf errorf
func (p *LoggerEntry) Errorf(format string, args ...interface{}) {
p.logger.Sugar().Errorf(format, args...)
}
92 changes: 92 additions & 0 deletions pkg/sink/pulsar/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pulsar

import (
"errors"
"runtime"
"testing"

"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)

// TestPulsarLog ensures Pulsar's use of our logger behaves as expected of a typical zap logger:
// 1. all fields injected via .With*() are present in the final log
// 2. the [file:line] points to the actual call site rather than `logger.go`.
func TestPulsarLog(t *testing.T) {
core, observedLogs := observer.New(zapcore.DebugLevel)
logger := NewPulsarLogger(zap.New(core).WithOptions(zap.AddCaller()))

pc, file, line, ok := runtime.Caller(0)
assert.True(t, ok)
functionName := runtime.FuncForPC(pc).Name()

logger.Infof("1 + 2 = %d", 1+2)
logger.WithError(errors.ErrUnsupported).Warn("3", "+", "4", "=", 3+4)
logger.WithFields(log.Fields{"x": 1234, "y": 9.75}).Info("connected")

allEntries := observedLogs.AllUntimed()
// we can't reliably test the `pc` address of the caller information. remove them.
for i := range allEntries {
allEntries[i].Caller.PC = 0
}

assert.Len(t, allEntries, 3)
assert.Equal(t, allEntries[0:2], []observer.LoggedEntry{
{
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
Message: "1 + 2 = 3",
Caller: zapcore.EntryCaller{
Defined: true,
File: file,
Line: line + 4,
Function: functionName,
},
},
Context: []zap.Field{},
},
{
Entry: zapcore.Entry{
Level: zapcore.WarnLevel,
Message: "3+4=7",
Caller: zapcore.EntryCaller{
Defined: true,
File: file,
Line: line + 5,
Function: functionName,
},
},
Context: []zap.Field{zap.Error(errors.ErrUnsupported)},
},
})
assert.Equal(t, allEntries[2].Entry, zapcore.Entry{
Level: zapcore.InfoLevel,
Message: "connected",
Caller: zapcore.EntryCaller{
Defined: true,
File: file,
Line: line + 6,
Function: functionName,
},
})
assert.ElementsMatch(t, allEntries[2].Context, []zap.Field{
zap.Int("x", 1234),
zap.Float64("y", 9.75),
})
}

0 comments on commit b7e10a3

Please sign in to comment.