Skip to content

Commit

Permalink
pulsar (ticdc): pulsar use ticdc log (#9674)
Browse files Browse the repository at this point in the history
ref #9413
  • Loading branch information
asddongmen authored Sep 5, 2023
1 parent 2342b4e commit e1730e5
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 9 deletions.
18 changes: 9 additions & 9 deletions pkg/sink/pulsar/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package pulsar

import (
"fmt"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/apache/pulsar-client-go/pulsar/auth"
"github.com/pingcap/log"
Expand All @@ -30,7 +28,7 @@ type FactoryCreator func(config *config.PulsarConfig, changefeedID model.ChangeF

// NewCreatorFactory returns a factory implemented based on kafka-go
func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFeedID, sinkConfig *config.SinkConfig) (pulsar.Client, error) {
co := pulsar.ClientOptions{
option := pulsar.ClientOptions{
URL: config.GetBrokerURL(),
CustomMetricsLabels: map[string]string{
"changefeed": changefeedID.ID,
Expand All @@ -40,10 +38,11 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee
OperationTimeout: config.OperationTimeout.Duration(),
// add pulsar default metrics
MetricsRegisterer: mq.GetMetricRegistry(),
Logger: NewPulsarLogger(),
}
var err error

co.Authentication, err = setupAuthentication(config)
option.Authentication, err = setupAuthentication(config)
if err != nil {
log.Error("setup pulsar authentication fail", zap.Error(err))
return nil, err
Expand All @@ -54,13 +53,13 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee
sinkPulsar := sinkConfig.PulsarConfig
if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil &&
sinkPulsar.TLSTrustCertsFilePath != nil {
co.TLSCertificateFile = *sinkPulsar.TLSCertificateFile
co.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath
co.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath
option.TLSCertificateFile = *sinkPulsar.TLSCertificateFile
option.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath
option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath
}
}

pulsarClient, err := pulsar.NewClient(co)
pulsarClient, err := pulsar.NewClient(option)
if err != nil {
log.Error("cannot connect to pulsar", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -93,7 +92,8 @@ func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, er
if config.AuthTLSCertificatePath != nil && config.AuthTLSPrivateKeyPath != nil {
return pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil
}
return nil, fmt.Errorf("no authentication method found")
log.Info("No authentication configured for pulsar client")
return nil, nil
}

// NewMockCreatorFactory returns a factory implemented based on kafka-go
Expand Down
163 changes: 163 additions & 0 deletions pkg/sink/pulsar/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pulsar

import (
"github.com/apache/pulsar-client-go/pulsar/log"
plog "github.com/pingcap/log"
"go.uber.org/zap"
)

// Logger wrapper cdc logger to adapt pulsar logger
type Logger struct {
zapLogger *zap.Logger
}

// SubLogger sub
func (p *Logger) SubLogger(fields log.Fields) log.Logger {
subLogger := p.zapLogger
for k, v := range fields {
subLogger = subLogger.With(zap.Any(k, v))
}
return &Logger{subLogger}
}

// WithFields with fields
func (p *Logger) WithFields(fields log.Fields) log.Entry {
return &LoggerEntry{
fields: fields,
logger: p.zapLogger,
}
}

// WithField with field
func (p *Logger) WithField(name string, value interface{}) log.Entry {
return &LoggerEntry{
fields: log.Fields{name: value},
logger: p.zapLogger,
}
}

// WithError error
func (p *Logger) WithError(err error) log.Entry {
return &LoggerEntry{
fields: log.Fields{"error": err},
logger: p.zapLogger,
}
}

// Debug debug
func (p *Logger) Debug(args ...interface{}) {
p.zapLogger.Sugar().Debug(args...)
}

// Info info
func (p *Logger) Info(args ...interface{}) {
p.zapLogger.Sugar().Info(args...)
}

// Warn warn
func (p *Logger) Warn(args ...interface{}) {
p.zapLogger.Sugar().Warn(args...)
}

// Error error
func (p *Logger) Error(args ...interface{}) {
p.zapLogger.Sugar().Error(args...)
}

// Debugf debugf
func (p *Logger) Debugf(format string, args ...interface{}) {
p.zapLogger.Sugar().Debugf(format, args...)
}

// Infof infof
func (p *Logger) Infof(format string, args ...interface{}) {
p.zapLogger.Sugar().Infof(format, args...)
}

// Warnf warnf
func (p *Logger) Warnf(format string, args ...interface{}) {
p.zapLogger.Sugar().Warnf(format, args...)
}

// Errorf errorf
func (p *Logger) Errorf(format string, args ...interface{}) {
p.zapLogger.Sugar().Errorf(format, args...)
}

// NewPulsarLogger new pulsar logger
func NewPulsarLogger() *Logger {
return &Logger{
zapLogger: plog.L(),
}
}

// LoggerEntry pulsar logger entry
type LoggerEntry struct {
fields log.Fields
logger *zap.Logger
}

// WithFields with fields
func (p *LoggerEntry) WithFields(fields log.Fields) log.Entry {
p.fields = fields
return p
}

// WithField with field
func (p *LoggerEntry) WithField(name string, value interface{}) log.Entry {
p.fields[name] = value
return p
}

// Debug debug
func (p *LoggerEntry) Debug(args ...interface{}) {
p.logger.Sugar().Debug(args...)
}

// Info info
func (p *LoggerEntry) Info(args ...interface{}) {
p.logger.Sugar().Info(args...)
}

// Warn warn
func (p *LoggerEntry) Warn(args ...interface{}) {
p.logger.Sugar().Warn(args...)
}

// Error error
func (p *LoggerEntry) Error(args ...interface{}) {
p.logger.Sugar().Error(args...)
}

// Debugf debugf
func (p *LoggerEntry) Debugf(format string, args ...interface{}) {
p.logger.Sugar().Debugf(format, args...)
}

// Infof infof
func (p *LoggerEntry) Infof(format string, args ...interface{}) {
p.logger.Sugar().Infof(format, args...)
}

// Warnf warnf
func (p *LoggerEntry) Warnf(format string, args ...interface{}) {
p.logger.Sugar().Warnf(format, args...)
}

// Errorf errorf
func (p *LoggerEntry) Errorf(format string, args ...interface{}) {
p.logger.Sugar().Errorf(format, args...)
}

0 comments on commit e1730e5

Please sign in to comment.