Skip to content

Commit

Permalink
config(ticdc): Fix old value configuration check for maxwell protocol (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 24, 2021
1 parent 07a7e0e commit 1eca2a5
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
20 changes: 20 additions & 0 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ func (p *Protocol) FromString(protocol string) {
}
}

// String converts the Protocol enum type string to string.
func (p Protocol) String() string {
switch p {
case ProtocolDefault:
return "default"
case ProtocolCanal:
return "canal"
case ProtocolAvro:
return "avro"
case ProtocolMaxwell:
return "maxwell"
case ProtocolCanalJSON:
return "canal-json"
case ProtocolCraft:
return "craft"
default:
panic("unreachable")
}
}

type EncoderBuilder interface {
Build(ctx context.Context) (EventBatchEncoder, error)
}
Expand Down
8 changes: 5 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sink

import (
"context"
"fmt"
"net/url"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -68,9 +69,10 @@ func newMqSink(
) (*mqSink, error) {
var protocol codec.Protocol
protocol.FromString(config.Sink.Protocol)
if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON) && !config.EnableOldValue {
log.Error("Old value is not enabled when using Canal protocol. Please update changefeed config")
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled"))
if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !config.EnableOldValue {
log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+
"Please update changefeed config", protocol.String()))
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol.String())))
}

encoderBuilder, err := codec.NewEventBatchEncoderBuilder(protocol, credential, opts)
Expand Down
8 changes: 6 additions & 2 deletions pkg/cmd/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
// forceEnableOldValueProtocols specifies which protocols need to be forced to enable old value.
var forceEnableOldValueProtocols = []string{
"canal",
"canal-json",
"maxwell",
}

Expand Down Expand Up @@ -205,9 +206,12 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co
}

protocol := sinkURIParsed.Query().Get("protocol")
if protocol != "" {
cfg.Sink.Protocol = protocol
}
for _, fp := range forceEnableOldValueProtocols {
if protocol == fp {
log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol))
if cfg.Sink.Protocol == fp {
log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol))
cfg.EnableOldValue = true
break
}
Expand Down

0 comments on commit 1eca2a5

Please sign in to comment.