Skip to content

Commit

Permalink
sink/codc(ticdc): fix schema-registry options in cli & config file (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyangyu authored May 19, 2022
1 parent 7473aeb commit 5e07c8b
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 14 deletions.
4 changes: 2 additions & 2 deletions cdc/sink/mq/codec/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (c *Config) Apply(sinkURI *url.URL, config *config.ReplicaConfig) error {
c.avroBigintUnsignedHandlingMode = s
}

if config.SchemaRegistry != "" {
c.avroSchemaRegistry = config.SchemaRegistry
if config.Sink != nil && config.Sink.SchemaRegistry != "" {
c.avroSchemaRegistry = config.Sink.SchemaRegistry
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mq/codec/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestConfigApplyValidate(t *testing.T) {
c := NewConfig(p)
require.Equal(t, config.ProtocolCanalJSON, c.protocol)

replicaConfig := &config.ReplicaConfig{}
replicaConfig := config.GetDefaultReplicaConfig()
err = c.Apply(sinkURI, replicaConfig)
require.NoError(t, err)
require.True(t, c.enableTiDBExtension)
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestConfigApplyValidate(t *testing.T) {
err = c.Validate()
require.ErrorContains(t, err, `Avro protocol requires parameter "schema-registry"`)

replicaConfig.SchemaRegistry = "this-is-a-uri"
replicaConfig.Sink.SchemaRegistry = "this-is-a-uri"
err = c.Apply(sinkURI, replicaConfig)
require.NoError(t, err)
require.Equal(t, "this-is-a-uri", c.avroSchemaRegistry)
Expand Down
3 changes: 3 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,9 @@ var doc = `{
},
"protocol": {
"type": "string"
},
"schema-registry": {
"type": "string"
}
}
},
Expand Down
3 changes: 3 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,9 @@
},
"protocol": {
"type": "string"
},
"schema-registry": {
"type": "string"
}
}
},
Expand Down
2 changes: 2 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ definitions:
type: array
protocol:
type: string
schema-registry:
type: string
type: object
model.Capture:
properties:
Expand Down
11 changes: 7 additions & 4 deletions pkg/cmd/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type changefeedCommonOptions struct {
noConfirm bool
targetTs uint64
sinkURI string
schemaRegistry string
configFile string
opts []string
sortEngine string
Expand Down Expand Up @@ -84,6 +85,8 @@ func (o *changefeedCommonOptions) addFlags(cmd *cobra.Command) {
cmd.PersistentFlags().BoolVar(&o.cyclicSyncDDL, "cyclic-sync-ddl", true, "(Experimental) Cyclic replication sync DDL of changefeed")
cmd.PersistentFlags().BoolVar(&o.syncPointEnabled, "sync-point", false, "(Experimental) Set and Record syncpoint in replication(default off)")
cmd.PersistentFlags().DurationVar(&o.syncPointInterval, "sync-interval", 10*time.Minute, "(Experimental) Set the interval for syncpoint in replication(default 10min)")
cmd.PersistentFlags().
StringVar(&o.schemaRegistry, "schema-registry", "", "Avro Schema Registry URI")
_ = cmd.PersistentFlags().MarkHidden("sort-dir")
}

Expand Down Expand Up @@ -113,7 +116,6 @@ type createChangefeedOptions struct {
disableGCSafePointCheck bool
startTs uint64
timezone string
schemaRegistry string

cfg *config.ReplicaConfig
}
Expand All @@ -137,8 +139,6 @@ func (o *createChangefeedOptions) addFlags(cmd *cobra.Command) {
cmd.PersistentFlags().BoolVarP(&o.disableGCSafePointCheck, "disable-gc-check", "", false, "Disable GC safe point check")
cmd.PersistentFlags().Uint64Var(&o.startTs, "start-ts", 0, "Start ts of changefeed")
cmd.PersistentFlags().StringVar(&o.timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)")
cmd.PersistentFlags().
StringVar(&o.schemaRegistry, "schema-registry", "", "Avro Schema Registry URI")
}

// complete adapts from the command line args to the data and client required.
Expand Down Expand Up @@ -232,6 +232,10 @@ func (o *createChangefeedOptions) completeCfg(
}
}

if o.commonChangefeedOptions.schemaRegistry != "" {
cfg.Sink.SchemaRegistry = o.commonChangefeedOptions.schemaRegistry
}

switch o.commonChangefeedOptions.sortEngine {
case model.SortInMemory:
case model.SortInFile:
Expand Down Expand Up @@ -271,7 +275,6 @@ func (o *createChangefeedOptions) completeCfg(
// TODO(neil) enable ID bucket.
}
}
cfg.SchemaRegistry = o.schemaRegistry
// Complete cfg.
o.cfg = cfg

Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/cli/cli_changefeed_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func (o *updateChangefeedOptions) applyChanges(oldInfo *model.ChangeFeedInfo, cm
if err = o.commonChangefeedOptions.strictDecodeConfig("TiCDC changefeed", cfg); err != nil {
log.Error("decode config file error", zap.Error(err))
}
case "schema-registry":
newInfo.Config.Sink.SchemaRegistry = o.commonChangefeedOptions.schemaRegistry
case "opts":
for _, opt := range o.commonChangefeedOptions.opts {
s := strings.SplitN(opt, "=", 2)
Expand Down
21 changes: 20 additions & 1 deletion pkg/cmd/cli/cli_changefeed_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

Expand Down Expand Up @@ -68,7 +69,25 @@ func (s *changefeedUpdateSuite) TestApplyChanges(c *check.C) {
c.Assert(newInfo.SortDir, check.Equals, ".")
file, err := os.ReadFile(filename)
c.Assert(err, check.IsNil)
c.Assert(strings.Contains(string(file), "this flag cannot be updated and will be ignored"), check.IsTrue)
c.Assert(
strings.Contains(string(file), "this flag cannot be updated and will be ignored"),
check.IsTrue,
)

// Test schema registry update
oldInfo = &model.ChangeFeedInfo{Config: config.GetDefaultReplicaConfig()}
c.Assert(oldInfo.Config.Sink.SchemaRegistry, check.Equals, "")
c.Assert(
cmd.ParseFlags([]string{"--schema-registry=https://username:password@localhost:8081"}),
check.IsNil,
)
newInfo, err = o.applyChanges(oldInfo, cmd)
c.Assert(err, check.IsNil)
c.Assert(
newInfo.Config.Sink.SchemaRegistry,
check.Equals,
"https://username:password@localhost:8081",
)
}

func initTestLogger(filename string) (func(), error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ const (
"b"
]
}
]
],
"schema-registry": ""
},
"cyclic-replication": {
"enable": false,
Expand All @@ -181,8 +182,7 @@ const (
"max-log-size": 64,
"flush-interval": 1000,
"storage": ""
},
"schema-registry": ""
}
}`

testCfgTestReplicaConfigMarshal2 = `{
Expand Down
2 changes: 0 additions & 2 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ var defaultReplicaConfig = &ReplicaConfig{
FlushIntervalInMs: 1000,
Storage: "",
},
SchemaRegistry: "",
}

// ReplicaConfig represents some addition replication config for a changefeed
Expand All @@ -62,7 +61,6 @@ type replicaConfig struct {
Sink *SinkConfig `toml:"sink" json:"sink"`
Cyclic *CyclicConfig `toml:"cyclic-replication" json:"cyclic-replication"`
Consistent *ConsistentConfig `toml:"consistent" json:"consistent"`
SchemaRegistry string `toml:"schema-registry" json:"schema-registry"`
}

// Marshal returns the json marshal format of a ReplicationConfig
Expand Down
1 change: 1 addition & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type SinkConfig struct {
DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers"`
Protocol string `toml:"protocol" json:"protocol"`
ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors"`
SchemaRegistry string `toml:"schema-registry" json:"schema-registry"`
}

// DispatchRule represents partition rule for a table
Expand Down

0 comments on commit 5e07c8b

Please sign in to comment.