Skip to content

Commit

Permalink
codec(ticdc): open protocol decoder support handle only message (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Aug 29, 2023
1 parent 1a883bd commit a2aee0c
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 227 deletions.
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram

switch c.codecConfig.Protocol {
case config.ProtocolOpen, config.ProtocolDefault:
decoder, err = open.NewBatchDecoder(ctx, c.codecConfig)
decoder, err = open.NewBatchDecoder(ctx, c.codecConfig, c.upstreamTiDB)
case config.ProtocolCanalJSON:
decoder, err = canal.NewBatchDecoder(ctx, c.codecConfig, c.upstreamTiDB)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/builder/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func BenchmarkJsonDecoding(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, message := range codecJSONEncodedRowChanges {
codecConfig := common.NewConfig(config.ProtocolOpen)
decoder, err := open.NewBatchDecoder(context.Background(), codecConfig)
decoder, err := open.NewBatchDecoder(context.Background(), codecConfig, nil)
require.NoError(b, err)
if err := decoder.AddKeyValue(message.Key, message.Value); err != nil {
panic(err)
Expand Down
Loading

0 comments on commit a2aee0c

Please sign in to comment.